程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫

oracle queue aq

編輯:Oracle數據庫基礎

1 前言 為了提高系統的性能,縮短系統等待時間,引入隊列技術。為了減少用戶的等待時間,應用程序可以讓說明需要後台處理的消息排入隊列。然後就可以從頁面的呈遞過程中去掉該處理任務。由一個後台進程來讀取並隊列處理這些消息,或者甚至可以交由一個單獨的系統來處理它們。
隊列可以實現各個系統之間的數據共享,消息通信。
2 功能概述
書寫本文的目的:利用Oracle高級隊列實現pl/sql代碼,為其它語言實現高級隊列的功能作接口。
Oracle高級隊列有一下好處:
1 高級隊列管理是Oracle數據庫的一個特性,它提供消息隊列管理功能。這是一個非常可靠、安全和可伸縮的消息管理系統,因為它使用與其他基於 Oracle技術的應用程序相同的數據庫特性。
2 高級隊列管理的一個很大優點是它可以通過PL/SQL、Java或C來訪問,這樣你就可以把來自一個Java servlet的消息入隊列和使PL/SQL存儲過程中的相同消息出隊列。
3高級隊列管理的另一個優點是你可以利用這一軟件通過Oracle Net Services (SQL*Net)、HTTP(S)和SMTP,在遠程節點之間傳播消息。高級隊列甚至可以通過消息網關與非Oracle的消息管理系統(如IBM MQSerIEs)相集成。
4 Oracle高級隊列管理提供了單消費者隊列和多消費者隊列。單消費者隊列只面向單一的接收者。多消費者隊列可以被多個接收者使用。當把消息放入多消費者隊列時,應用程序的程序員必須顯式地在消息屬性中指定這些接收者,或者建立決定每條消息的接收者的基於規則的訂閱過程。具體開發步驟如下:
1 首先確定應用的需求,是否適合使用高級隊列?使用高級隊列預計提高性能的預期值
2確定隊列包體結構。
3 隊列管理。
4 隊列操作。
2 確定隊列包體結構
底層消息的通信Oracle已經實現,並且封裝了,現在需要執行消息包體的結構,消息的發送和消息接收要通過一定的消息結構完成。
現在以短信push為例子,完成高級隊列在短信push的應用。
安裝好Oracle數據庫後,對smpuser授權
Grant aq_administrator_role To smpuser;
注意:需要把dbms_aq and dbms_aq_admin 包的調用功能權限付給smpuser.
CREATE OR REPLACE Type mt_struc As Object 功能:push 的mt結構,和T_OUTBOX_PUSH結構一致
日期:2006-09-20
write by hancy (
OP_ID NUMBER ,
DEST_MOBILENO VARCHAR2(32) ,
FEE_MOBILENO VARCHAR2(32) ,
LONG_NO VARCHAR2(21) ,
MT_CONTENT VARCHAR2(500),
SERVICE_ID VARCHAR2(10) ,
MT_FLAG NUMBER(1) ,
MSG_FMT NUMBER(1) ,
FEE_TYPE NUMBER(1) ,
FEE_VALUE NUMBER(4) ,
BEG_REPORT NUMBER(1) ,
CLIENT_ID NUMBER(4) ,
CLIENT_NAME VARCHAR2(10) ,
MT_TIME DATE ,
MT_ID VARCHAR2(40) ,
PLAN_ID NUMBER ,
PUSH_TYPE VARCHAR2(20) ,
SEND_DATE DATE
)
4 隊列管理
4.1建立隊列表 創建下行和上行隊列表:
exec dbms_aqadm.create_queue_table(queue_table=>'sms_mt_tab', queue_payload_type=>'mt_struc'); 具體參數如下:
SQL> desc dbms_aqadm.create_queue_table
Parameter Type Mode Default? QUEUE_TABLE VARCHAR2 IN
QUEUE_PAYLOAD_TYPE VARCHAR2 IN
STORAGE_CLAUSE VARCHAR2 IN Y
SORT_LIST VARCHAR2 IN Y
MULTIPLE_CONSUMERS BOOLEAN IN Y
MESSAGE_GROUPING BINARY_INTEGER IN Y
COMMENT VARCHAR2 IN Y
AUTO_COMMIT BOOLEAN IN Y
PRIMARY_INSTANCE BINARY_INTEGER IN Y
SECONDARY_INSTANCE BINARY_INTEGER IN Y
COMPATIBLE VARCHAR2 IN Y
NON_REPUDIATION BINARY_INTEGER IN Y
SECURE BOOLEAN IN Y
參考dbms_aqadm.create_queue_table
1.存儲參數storage_clause 可以是 MAXTRANS,LOB等
2.sort_list 可以是PRIORITY,enq_time這2個參數或者其中一個。具體可以察看對應的 隊列表的數據結構。例如下面的例子
--創建隊列表,‘sms_aq_tab’ 隊列表名,' sms_aq_tab '隊列對象名(充當消息體的對象名稱),以下同
dbms_aqadm.create_queue_table('WDZAQTABLE','WDZAQMSG'); dbms_aqadm.create_queue_table('WDZSQRTAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time');
3。消息分組參數 message_grouping 可以是 NONE,或 TRANSACTIONAL
-- message grouping
dbms_aqadm.TRANSACTIONAL CONSTANT BINARY_INTEGER := 1;
dbms_aqadm.NONE CONSTANT BINARY_INTEGER := 0;後者表示一個與事務相關的消息分成1組,在提取消息的時候可以當成一組相關消息來提取。例如: dbms_aqadm.create_queue_table('WDZGROUPAQTABLE','WDZAQMSG',
sort_list => 'PRIORITY,enq_time',
message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/ );
4.multiple_consumers表示消息接受者是否為多個用戶。默認是只有1個接收者。如果要多個用戶可以接受消息,需要設置=true dbms_aqadm.create_queue_table('WDZMUTIAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time',
message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/
,multiple_consumers => true);
4.2刪除隊列表
Exec dbms_aqadm.drop_queue_table(queue_table => 'sms_mt_tab');
Exec dbms_aqadm.drop_queue_table(queue_table => 'sms_mo_tab');
4.3建立隊列
建立隊列代碼如下:
exec dbms_aqadm.create_queue(queue_name=>'sms_mt_queue', queue_table=>'sms_mt_tab'); execdbms_aqadm.create_queue(queue_name=>'sms_mt_queue_exception',queue_table=>'sms_mt_tab',queue_type=>dbms_aqadm.EXCEPTION_QUEUE);
exec dbms_aqadm.create_queue(queue_name=>'sms_mt_queue_backup', queue_table=>'sms_mt_tab'); 參考 dbms_aqadm.create_queue
--創建隊列, sms_mt_queue為隊列的名稱,' sms_mt_tab ' 隊列表名
dbms_aqadm.create_queue('WDZQUEUE','WDZAQTABLE',queue_type => dbms_aqadm.NORMAL_QUEUE);參數queue_type 表示隊列是否為正常隊列還是異常隊列,參數值為dbms_aqadm.NORMAL_QUEUE 或者dbms_aqadm.EXCEPTION_QUEUE --創建多用戶隊列, 'WDZQUEUE' 為隊列的名稱,'WDZAQTABLE' 隊列表名
dbms_aqadm.create_queue('WDZMUTIQUEUE','WDZMUTIAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
--創建分組隊列, 'WDZGROUPQUEUE' 為隊列的名稱,'WDZAQTABLE' 隊列表名
dbms_aqadm.create_queue('WDZGROUPQUEUE', 'WDZAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
4.4刪除隊列
代碼如下:
Exec dbms_aqadm.drop_queue('sms_mt_queue');
4.5隊列參數調整
A 創建非持久隊列參考 dbms_aqadm.create_np_queue
dbms_aqadm. create_np_queue ('sms_mt_queue', multiple_consumers=>false);
B 啟動一個隊列,' sms_mt_queue ' 為隊列的名稱
dbms_aqadm.start_queue('sms_mt_queue',enqueue=>true, dequeue=> true);
C 停止隊列 dbms_aqadm.stop_queue(sms_mt_queue ',enqueue=>false, dequeue=>true);
5 隊列操作 1 入隊
P100_MT_ENQUEUE p_equeue_name In varchar2, --隊列名單大寫字母,主隊列:SMS_MT_QUEUE 備份隊列:SMS_MT_QUEUE_BACKUP
p_body In t_outbox_push%Rowtype,--入參,記錄類型,數據源頭直接調用
p_level In Number:=3, --優先級別1-5,數值越小越快
p_delay In Number:=0, --入隊延遲時間,單位:秒
p_res_str OUT VARCHAR2, --0 成功 其它失敗
p_msg_id OUT Varchar2 --返回的msgid,主鍵 2 出隊列
p102_mt_dequeue p_equeue_name In varchar2, --隊列名單大寫字母,主隊列:SMS_MT_QUEUE 備份隊列:SMS_MT_QUEUE_BACKUP
p_clIEnt_id Out Number, --短信內部網關
p_res_str Out varchar2, --返回值 0 成功 -2 隊列為空其它失敗
p_label out varchar2, --標簽
p_body out varchar2) --數據包體
3 數據格式轉換 p103_change_label_body_str p_dequeue_body In Mt_Struc,
p_res_str Out Varchar2,
p_label Out varchar2,
p_body Out varchar2 功能:把出隊列的字符串翻譯成有規則的label,和body
日期:2006-09-20
write by hancy 4 測試入隊
procedure P101_MT_ENQUEUE_TEST p_equeue_name In varchar2,--隊列名單大寫字母,主隊列:SMS_MT_QUEUE 備份隊列:SMS_MT_QUEUE_BACKUP
p_res_str out Varchar2,
p_msg_id OUT Varchar2 5.1入隊
create or replace procedure P100_MT_ENQUEUE p_equeue_name In varchar2, --隊列名單大寫字母,主隊列:SMS_MT_QUEUE 備份隊列:SMS_MT_QUEUE_BACKUP
p_body In t_outbox_push%Rowtype,--入參,記錄類型,數據源頭直接調用
p_level In Number:=3, --優先級別1-5,數值越小越快
p_delay In Number:=0, --入隊延遲時間,單位:秒
p_res_str OUT VARCHAR2, --0 成功 其它失敗
p_msg_id OUT Varchar2 --返回的msgid,主鍵功能:數據源頭,push,的入隊操作
日期:2006-09-20
write by hancy
*/
Is
v_enqueue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_propertIEs_t;
v_message_handle raw(16);
v_body mt_struc;---寫日志區域
vPid NUMBER:=100;
vProName VARCHAR2(50):='P100_MT_ENQUEUE';
vProTip VARCHAR2(255);
vErrorCode VARCHAR2(20);
vErrorMsg VARCHAR2(2000);
Begin
--1初始化
p_res_str:='-1'; --2 P_BODY 到 v_body 的轉化 把一個字符串轉化為一個結構體
v_body:=mt_struc(
p_body.OP_ID ,
p_body.DEST_MOBILENO ,
p_body.FEE_MOBILENO ,
p_body.LONG_NO ,
p_body.MT_CONTENT ,
p_body.SERVICE_ID ,
p_body.MT_FLAG ,
p_body.MSG_FMT ,
p_body.FEE_TYPE ,
p_body.FEE_VALUE ,
p_body.BEG_REPORT ,
p_body.CLIENT_ID ,
p_body.CLIENT_NAME ,
p_body.MT_TIME ,
p_body.MT_ID ,
p_body.PLAN_ID ,
p_body.PUSH_TYPE ,
p_body.SEND_DATE );
--4設置屬性和參數--- v_message_properties.exception_queue:='SMS_MT_QUEUE_EXCEPTION'; v_message_properties.priority :=p_level; v_message_propertIEs.delay :=p_delay;
--5 開始入隊
dbms_aqadm.start_queue(p_equeue_name,enqueue=>true, dequeue=> true);
dbms_aq.enqueue(queue_name=>p_equeue_name,
enqueue_options=>v_enqueue_options,
message_properties=>v_message_propertIEs,
payload=>v_body,
msgid=>v_message_handle);
P_MSG_ID:=v_message_handle ;
--6 commit
Commit;
p_res_str:='0';
Exception
When Others Then
p_res_str:='-100';
vProTip:='入隊過程異常!';
vErrorCode:=Sqlcode;
vErrorMsg:=SQLERRM;
p_pub_write_error_log(vPid,vProName,'','',
vProTip,vErrorCode,vErrorMsg,1);
Rollback;
end P100_MT_ENQUEUE;
create or replace procedure P101_MT_ENQUEUE_TEST p_equeue_name In varchar2,--隊列名單大寫字母,主隊列:SMS_MT_QUEUE 備份隊列:SMS_MT_QUEUE_BACKUP
p_res_str out Varchar2,
p_msg_id OUT Varchar2 功能:測試入隊
日期:2006-09-20
write by hancy
*/
Is
Cursor cur_push Is
Select * From t_outbox_push;
v_row_push t_outbox_push%Rowtype;
v_exe_res varchar2(200);
v_message_handle raw(16);
begin
p_res_str:='-1';
Open cur_push;
Loop
Fetch cur_push Into v_row_push;
Exit When cur_push%Notfound;
P100_MT_ENQUEUE(p_equeue_name,v_row_push,3,0,v_exe_res,v_message_handle);
End Loop;
Close cur_push;
p_res_str:=v_exe_res;
p_msg_id:=v_message_handle;
Exception
When Others Then
p_res_str:=Sqlerrm;
Rollback;
end P101_MT_ENQUEUE_TEST;5.2出隊
create or replace procedure p102_mt_dequeue p_equeue_name In varchar2, --隊列名單大寫字母,主隊列:SMS_MT_QUEUE 備份隊列:SMS_MT_QUEUE_BACKUP
p_clIEnt_id Out Number, --短信內部網關
p_res_str Out varchar2, --返回值 0 成功 -2 隊列為空其它失敗
p_label out varchar2, --標簽
p_body out varchar2) --數據包體
Is/*
功能: 出隊列,返回給Java程序
日期:2006-09-20
write by hancy v_Dequeue_Options Dbms_Aq.Dequeue_Options_t;
v_Message_Properties Dbms_Aq.Message_PropertIEs_t;
v_Message_Handle Raw(16);
v_Body_queue Mt_Struc;
v_label Varchar2(200);
v_body varchar2(1000);
res_str varchar2(20); vPid NUMBER:=102;
vProName VARCHAR2(50):='p102_mt_dequeue';
vProTip VARCHAR2(255);
vErrorCode VARCHAR2(20);
vErrorMsg VARCHAR2(2000);
v_count Number;
Begin
--1 初始化
p_res_str:='-1' ;
--2 出隊列設置屬性 --3 執行出隊列操作
Select Count(0) Into v_count From sms_mt_tab Where q_name=p_equeue_name ;
If v_count >0 Then
dbms_aqadm.start_queue(p_equeue_name,enqueue=>true, dequeue=> true);
Dbms_Aq.Dequeue(Queue_Name => p_equeue_name,
Dequeue_Options => v_Dequeue_Options,
Message_Properties => v_Message_PropertIEs,
Payload => v_Body_queue,
Msgid => v_Message_Handle);
-- 組和 label= 消息類型(CommandID)+運營商標識+地區標識(AreaID)+時間標識
-- 組合 body
--4 返回給Java的包體
p103_change_label_body_str(v_Body_queue,res_str,v_label,v_body);
If res_str='0' Then
p_label :=v_label;
p_body :=v_body;
p_client_id:=v_Body_queue.CLIENT_ID;
--提交
Commit;
p_res_str:='0' ;
Else
p_res_str:='出隊列後組串失敗!' ;
End If;
Else
p_res_str:='隊列裡面沒有數據,休息一下';
End If;
Exception
When Others Then
p_res_str:='-100';
vProTip:='出隊過程異常!';
vErrorCode:=SQLCODE;
vErrorMsg:=SQLERRM;
p_pub_write_error_log(vPid,vProName,'','',
vProTip,vErrorCode,vErrorMsg,1);
Rollback;
end p102_mt_dequeue;create or replace procedure p103_change_label_body_str p_dequeue_body In Mt_Struc,
p_res_str Out Varchar2,
p_label Out varchar2,
p_body Out varchar2 功能:把出隊列的字符串翻譯成有規則的label,和body
日期:2006-09-20
write by hancy
*/
Is vPid NUMBER:=103;
vProName VARCHAR2(50):='p103_change_label_body_str';
vProTip VARCHAR2(255);
vErrorCode VARCHAR2(20);
vErrorMsg VARCHAR2(2000);
--label
v_label Varchar2(50);
v_clIEnt_id Number;
v_date_str varchar2(20);
--body
v_PRDID varchar2(10);
v_CHANNELID varchar2(10);
v_ActionID varchar2(20);
v_Result varchar2(20);
v_MSGID varchar2(20);
v_Pk_total varchar2(20);
v_Pk_Number varchar2(20);
v_RegisteredDelivery varchar2(20);
v_ServiceID varchar2(20);
v_FeeUserType varchar2(20);
v_FeeTerminalID varchar2(35);
v_MsgFmt varchar2(20);
v_FeeType varchar2(20);
v_FeeCode varchar2(20);
v_SrcID varchar2(30);
v_DestID varchar2(35);
v_LinkID varchar2(20);
v_MTFlag varchar2(20);
v_FixedCode varchar2(20);
v_RepeatNum varchar2(20);
v_InfoID varchar2(20);
v_MsgContent varchar2(150); v_body varchar2(2000);
begin
p_res_str:='-1';
v_Result :='';
v_MSGID :='';
v_Pk_total :='';
v_Pk_Number:='';
--label
Select to_char(Sysdate,'YY-MM-DD HH:MM:SS') Into v_date_str From dual ;
If p_dequeue_body.CLIENT_ID>150 Then
v_label:='022000000'||v_date_str;
Else
v_label:='021000000'||v_date_str;
End If;
--body
v_PRDID:='0000' ; v_CHANNELID:='0000'; v_ActionID:='0';
v_Result :=lpad(' ',8,' ');
v_MSGID :=lpad(' ',16,' ');
v_Pk_total :=lpad(' ',1,' ');
v_Pk_Number:=lpad(' ',1,' ');
--v_RegisteredDelivery
v_RegisteredDelivery:=lpad(p_dequeue_body.BEG_REPORT,1,' ');
--v_ServiceID
v_ServiceID:=lpad(p_dequeue_body.SERVICE_ID,10,' ');
--v_FeeUserType
v_FeeUserType:='0';
--v_FeeTerminalID
v_FeeTerminalID:=lpad(p_dequeue_body.FEE_MOBILENO,32,' ');
--v_MsgFmt
v_MsgFmt:=lpad(p_dequeue_body.MSG_FMT,2,' ');
--v_FeeType
v_FeeType:=lpad(p_dequeue_body.FEE_TYPE,2,' ');
--v_FeeCode
v_FeeCode:=lpad(p_dequeue_body.FEE_VALUE,6,' ');
--v_SrcID
v_SrcID:=lpad(p_dequeue_body.LONG_NO,21,' ');
--v_DestID
v_DestID:=lpad(p_dequeue_body.DEST_MOBILENO,32,' ');
--v_LinkID
v_LinkID:=lpad(' ',20,' ');
--v_MTFlag varchar2(20);
v_MTFlag:=lpad(p_dequeue_body.MT_FLAG,2,' ');
---v_FixedCode varchar2(20);
v_FixedCode:=lpad(' ',6,' ');
--v_RepeatNum varchar2(20);
v_RepeatNum:='1';
--v_InfoID varchar2(20);
v_InfoID:=lpad(' ',16,' ');
--v_MsgContent varchar2(150);
v_MsgContent:=p_dequeue_body.MT_CONTENT; --add
v_body:=
(v_PRDID
||v_CHANNELID
||v_ActionID
||v_Result
||v_MSGID
||v_Pk_total
||v_Pk_Number
||v_RegisteredDelivery
||v_ServiceID
||v_FeeUserType
||v_FeeTerminalID
||v_MsgFmt
||v_FeeType
||v_FeeCode
||v_SrcID
||v_DestID
||v_LinkID
||v_MTFlag
||v_FixedCode
||v_RepeatNum
||v_InfoID
||v_MsgContent) ;
--to value
p_label:=v_label;
p_body:=v_body ;
--result
p_res_str:='0';
Exception
When Others Then
p_res_str:=Sqlerrm;
vProTip:='把出隊列的字符串翻譯異常!';
vErrorCode:=SQLCODE;
vErrorMsg:=SQLERRM;
p_pub_write_error_log(vPid,vProName,'','',
vProTip,vErrorCode,vErrorMsg,1);
Rollback;
end p103_change_label_body_str;6 注意
1 權限問題
把dbms_aq and dbms_aq_admin 包的execute權限授權給smpuser用戶. 辦法: 授權:
Grant aq_administrator_role To smpuser;
2 用戶表空間限制問題 ORA-00604: error occurred at recursive SQL level 1
ORA-01536: space quota exceeded for tablespace 'SMP_DB' 修改smpuser的限額:從<無> 修改為:<無限制>就搞定了!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved