程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> Oracle數據庫 >> Oracle教程 >> Oracle高級隊列介紹

Oracle高級隊列介紹

編輯:Oracle教程

Oracle高級隊列介紹


oracle高級隊列介紹


高級隊列Advanced Queuing(AQ)在oracle多個版本都可得到。他是oracle原生消息軟件並且在每一個版本都在加強。
這篇文章提供了一個AQ的高級概覽。尤其是我們將看到如何啟動一個隊列並進行入列--出列操作,還有通過通知創建異步出列。


注意AQ支持數據庫意外的消息監聽(例如JMS消息隊列)。本文僅涉及數據庫內部消息通信。


要求:


本例要求指定的角色和權限(除了標准CREATE SESSION/TABLE/PROCEDURE/TYPE和表空間配額外)


1、AQ_ADMINISTRATOR_ROLE: 用於創建隊列表和隊列;
2、EXECUTE ON DBMS_AQ:用於通知案例中啟用PLSQL存儲過程編譯


另外,需要入列/出列消息的標准應用用戶要求AQ權限通過DBMS_AQADM[GRANT|REVOKE]_QUEUE_PRIVILIEGE API提供。


以下例子可以運行在任何擁有以上權限的用戶下。


1、創建並啟動一個隊列


AQ處理的消息稱為"有效負荷"(payloads) 。消息格式可以是用戶自定義對象或XMLType或ANYDATA。當我們創建一個隊列,需要告訴oracle
有效負荷的結構,所以我們先創建一個簡單對象類型。
CREATE TYPE demo_queue_payload_type AS OBJECT
( message VARCHAR2(4000) );
/


我們有效負荷類型包含一個屬性,而現實中可能更復雜。下面創建隊列表用於存儲隊列消息直到永久出列。
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type'
);
END;
/



接著創建隊列並啟動:

BEGIN


DBMS_AQADM.CREATE_QUEUE (
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);


DBMS_AQADM.START_QUEUE (
queue_name => 'demo_queue'
);


END;
/


至此,我們已經創建了隊列有效負荷,隊列表和隊列。來看下有哪些相關對象:


SELECT object_name, object_type
FROM user_objects
WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';

OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE


我們看到一個隊列帶出了一系列自動生成對象,有些是被後面直接用到的。不過有趣的是,創建了第二個隊列。這就是所謂的異常隊列(exception
queue)。如果AQ無法從我們的隊列接收消息,將記錄在該異常隊列中。


2、入列消息(enqueuing messages)
我們已經准備好使用DBMS_AQ.ENQUEUE API去入列一個消息。接下來的例子,我們使用ENQUEUE過程入列一個單條消息。
DBMS_AQ有大范圍的記錄和數組類型來支持其接口並使我們去修改其行為(我們將在下面的例子看到2個此類引用)。

DECLARE


r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;


BEGIN


o_payload := demo_queue_payload_type('Here is a message');


DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);


COMMIT;


END;
/


可以看到入列消息很簡單。入列操作是一個基本的事務(就像往隊列表Insert),因此我們需要提交。


3、浏覽消息(browsing messages)
在我們出列消息之前,我們將"浏覽"隊列內容。首先我們可以查詢AQ$DEMO_QUEUE_TABLE視圖看到多少消息已經入列。正如我們早些看到的,
該視圖是在前面DBMS_AQADM.CREATE_QUEUE創建隊列自動生成的。

SELECT COUNT(*)
FROM aq$demo_queue_table;


COUNT(*)
----------
1



和我們預期一樣,隊列中只有一條消息。我們有2種方法可以浏覽消息內容:


1)直接查詢視圖:
SELECT user_data
FROM aq$demo_queue_table;


USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here is a message')



2)我們可以使用DBMS_AQ.DEQUEUE API浏覽。根據名字可以看出,該過程用於出列消息。為了達到只浏覽不刪除的目的,我們可以使用
DBMS_AQ.BROWSE修改出列屬性(默認是DBMS_AQ.REMOVE)。


DECLARE


r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;


BEGIN


r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;


DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);


DBMS_OUTPUT.PUT_LINE(
'*** Browsed message is [' || o_payload.message || '] ***'
);


END;
/
*** Browsed message is [Here is a message] ***

再次查詢視圖可以確定消息確實沒被移除:

SELECT user_data
FROM aq$demo_queue_table;


USER_DATA(MESSAGE)
------------------------------------------------------------
DEMO_QUEUE_PAYLOAD_TYPE('Here is a message')


4、出列消息(dequeuing messages)


現在我們將實際出列消息。該操作不要求在同一會話進行(記住入列是AQ基於表的提交事務)。像入列,出列也是一個事務(從隊列表
移除消息)。

DECLARE


r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;


BEGIN


DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);


DBMS_OUTPUT.PUT_LINE(
'*** Dequeued message is [' || o_payload.message || '] ***'
);


COMMIT;


END;
/


*** Dequeued message is [Here is a message] ***


PL/SQL procedure successfully completed.


再次查詢視圖發現消息確已出列:
SELECT COUNT(*)
FROM aq$demo_queue_table;


COUNT(*)
----------
0

5、通知(notification)


文章的剩余部分,我們將看一下通過通知自動出列。通過這種方式無論消息何時入列,Oracle都將通知一個代理執行一個注冊的PLSQL
"回調"(callback)過程(可選擇地,代理還可以通知一個郵箱地址或HTTP://地址)。


為了說明,我們將創建和注冊一個PLSQL過程以通過通知方式管理我們的出列。這個回調過程將出列消息並寫到一個數據庫表,以模擬
標准數據庫操作。


作為開始,我們清理之前創建的對象。

BEGIN
DBMS_AQADM.STOP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'demo_queue_table'
);
END;
/


現在我們重新創建隊列表以允許多個消費者(consumers)。一個消費者是一個出列消息代理(agent)啟用多個消費者是自動通知實現的
前提條件。


BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type',
multiple_consumers => TRUE
);
END;
/



接著重新創建並啟動我們的隊列。
BEGIN
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'demo_queue',
queue_table => 'demo_queue_table'
);
DBMS_AQADM.START_QUEUE (
queue_name => 'demo_queue'
);
END;
/



為了證明通知的異步特點,我們將把出列消息存在一個應用表中。
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );




現在我們有一個應用表,我們可以創建回調PL/SQL。這個過程將出列觸發了通知的入列消息。程序參數必須命名並類型化。入列消息將
包含入列時間戳,這樣插入到應用表中我們將看到消息入列和通知出列的異步延遲。

CREATE PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS


r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;


BEGIN


r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;


DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);


INSERT INTO demo_queue_message_table ( message )
VALUES ( 'Message [' || o_payload.message || '] ' ||
'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
COMMIT;


END;
/


我們還未完成通知步驟。我們需要向隊列增加一個訂閱者(subsriber)並注冊訂閱者接到通知時的動作(例如將執行我們的回調過程)。


BEGIN


DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'demo_queue',
subscriber => SYS.AQ$_AGENT(
'demo_queue_subscriber',
NULL,
NULL )
);


DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
HEXTORAW('FF')
)
),
1
);
END;
/


現在我們可以通過入列消息來測試。這個消息將僅包含一個時間戳以便我們對比入列和自動出列發生的時間差。

DECLARE


r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;


BEGIN


o_payload := demo_queue_payload_type(
TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
);


DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);


COMMIT;


END;
/

為了查看我們的小時是否自動出列,我們將檢查應用表(DEMO_QUEUE_MESSAGE_TABLE)。
SELECT message
FROM demo_queue_message_table;


MESSAGE
---------------------------------------------------------------------------
Message [21-JUL-2005 21:54:51.156] dequeued at [21-JUL-2005 21:54:56.015]



我們可以看到通過通知異步出列大約在入列操作後5秒後發生。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved