oracle – dbms_aq.dequeue_array,第一条消息返回两次
介绍
使用Oracle Advanced Queuing方法时,我在Oracle SQL Server上遇到了一个非常奇怪的行为(确切地说:Oracle Database 11g企业版11.2.0.4.0版 – 64位生产版). 问题 错误是我将X消息排队,但是dequeue_array返回X 1消息,第一条消息重复(如MessageId所示). 复制: 我能够编写一些简单的PoC来重现错误.本代码非常简单,入队/出队的东西是标准的Oracle AQ.代码执行以下步骤两次(测试运行): >清除队列表 在新连接上运行POC时,第一次运行成功而没有错误,但每次后续运行都失败.之后,当使用相同的连接时,每次执行脚本时,它都会在两次测试运行中失败. 到目前为止我尝试了什么: >在“新”连接上运行此脚本:仅第一次运行失败 结论: 我既不能解释这种行为,也不能在我的代码中找到错误.请看一下它,它应该可以在你最喜欢的sql客户端中直接执行(用PL / SQL Developer测试). 如果您需要任何进一步的信息或在让PoC工作时遇到问题,请问,我会定期检查这个帖子.我试图使PoC尽可能可读,包括关于正在发生的事情的详细输出. 码: declare C_QueueName constant varchar2(32767) := 'TEST_QUEUE'; C_QueueTable constant varchar2(32767) := 'TEST_Q_TABLE'; C_MsgCount constant pls_integer := 1; C_TestRuns constant pls_integer := 2; C_DequeueArraySize constant pls_integer := 10; /* * Create the queue and the queue table used for theses tests */ procedure CreateQueueIfMissing is L_Present pls_integer; begin dbms_output.put_line('START CreateQueueIfMissing'); execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present; if L_Present = 1 then dbms_output.put_line('Skipping queue creation,already present.'); dbms_output.put_line('END CreateQueueIfMissing'); return; end if; dbms_output.put_line(' Creating queue table ' || C_QueueTable); DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable,storage_clause => 'LOGGING NOCACHE NOPARALLEL MONITORING',sort_list => 'priority,enq_time',multiple_consumers => false,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE',comment => 'Queue for messages'); dbms_output.put_line(' Creating queue ' || C_QueueName); DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName,queue_table => C_QueueTable,max_retries => 8640,retry_delay => 30,comment => 'Queue for messages'); dbms_output.put_line(' Starting queue ' || C_QueueName); DBMS_AQADM.START_QUEUE(queue_name => C_QueueName); dbms_output.put_line('END CreateQueueIfMissing'); end CreateQueueIfMissing; -- ================================================================================================ /* * This procedure is the root of all evil. * The error only occurs when using the purge_queue_tables procedure. * When using a normal "delete from <queue_table>" then everything is just fine. */ procedure CleanQueueTable is L_PurgeOptions dbms_aqadm.aq$_purge_options_t; L_Count pls_integer; begin dbms_output.put_line('START CleanQueueTable'); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table BEFORE purge: ' || L_Count); dbms_aqadm.purge_queue_table(queue_table => C_QueueTable,purge_condition => null,purge_options => L_PurgeOptions); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table AFTER purge: ' || L_Count); dbms_output.put_line('END CleanQueueTable'); end CleanQueueTable; -- ================================================================================================ /* * Enqueue the configured count of messages on the queue */ procedure EnqueueMessages is L_BodyId pls_integer; L_Msg sys.aq$_jms_bytes_message; L_MsgId raw(16); L_Count pls_integer; L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T; L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T; begin dbms_output.put_line('START EnqueueMessages'); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table BEFORE enqueue: ' || L_Count); for i in 1 .. C_MsgCount loop dbms_output.put_line(' Construct #' || i); L_Msg := sys.aq$_jms_bytes_message.construct; -- set the JMS header L_Msg.set_type('JmsBytesMessage'); L_Msg.set_userid(1); L_Msg.set_appid('test'); L_Msg.set_groupid('cs'); L_Msg.set_groupseq(1); -- set JMS message content L_BodyId := L_Msg.clear_body(-1); L_Msg.write_bytes(L_BodyId,to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>'))); L_Msg.flush(L_BodyId); L_Msg.clean(L_BodyId); dbms_output.put_line(' Enqueue #' || i); DBMS_AQ.ENQUEUE (queue_name => C_QueueName,enqueue_options => L_EnqueueOptions,message_properties => L_MessageProperties,payload => L_Msg,msgid => L_MsgId); end loop; execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table AFTER enqueue: ' || L_Count); dbms_output.put_line('END EnqueueMessages'); end EnqueueMessages; -- ================================================================================================ /* * Dequeues messages using dequeue_array from the configured queue. */ procedure DequeueMessages is L_DequeueOptions dbms_aq.dequeue_options_t; L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t(); L_PayloadArr sys.aq$_jms_bytes_messages; L_MsgIdArr dbms_aq.msgid_array_t; L_MsgCnt pls_integer := 0; L_Count pls_integer; begin dbms_output.put_line('START DequeueMessages'); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table BEFORE dequeue: ' || L_Count); L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName,dequeue_options => L_DequeueOptions,array_size => C_DequeueArraySize,message_properties_array => L_MsgPropArr,payload_array => L_PayloadArr,msgid_array => L_MsgIdArr); execute immediate 'select count(*) from ' || C_QueueTable into L_Count; dbms_output.put_line(' Messages in queue table AFTER dequeue: ' || L_Count); dbms_output.put_line(' Expected: ' || C_MsgCount || ',Received: ' || L_MsgCnt); if C_MsgCount != L_MsgCnt then dbms_output.put_line(' *****************************************'); dbms_output.put_line(' TOO MANY ITEMS DEQUEUED?!?'); dbms_output.put_line(' *****************************************'); for i in 1 .. L_MsgCnt loop dbms_output.put_line(' #' || i || ' MsdId=' || L_MsgIdArr(i)); end loop; end if; dbms_output.put_line('END DequeueMessages'); end DequeueMessages; -- ================================================================================================ /* * This is the testcase */ procedure RunTestCase is begin CreateQueueIfMissing; for i in 1 .. C_TestRuns loop dbms_output.put_line(null); dbms_output.put_line('=========== START test run #' || i || '==========='); CleanQueueTable; EnqueueMessages; DequeueMessages; end loop; end; -- ================================================================================================ begin RunTestCase; end; 示例输出: START CreateQueueIfMissing Skipping queue creation,already present. END CreateQueueIfMissing =========== START test run #1=========== START CleanQueueTable Messages in queue table BEFORE purge: 0 Messages in queue table AFTER purge: 0 END CleanQueueTable START EnqueueMessages Messages in queue table BEFORE enqueue: 0 Construct #1 Enqueue #1 Messages in queue table AFTER enqueue: 1 END EnqueueMessages START DequeueMessages Messages in queue table BEFORE dequeue: 1 Messages in queue table AFTER dequeue: 0 Expected: 1,Received: 1 END DequeueMessages =========== START test run #2=========== START CleanQueueTable Messages in queue table BEFORE purge: 0 Messages in queue table AFTER purge: 0 END CleanQueueTable START EnqueueMessages Messages in queue table BEFORE enqueue: 0 Construct #1 Enqueue #1 Messages in queue table AFTER enqueue: 1 END EnqueueMessages START DequeueMessages Messages in queue table BEFORE dequeue: 1 Messages in queue table AFTER dequeue: 0 Expected: 1,Received: 2 ***************************************** TOO MANY ITEMS DEQUEUED?!? ***************************************** #1 MsdId=2949A0FF2EE456A7E0540010E0467A30 #2 MsdId=2949A0FF2EE456A7E0540010E0467A30 END DequeueMessages 解决方法这看起来像 bug 20659700.在 document 2002148.1中有更多信息.您(或您的DBA)应提出服务请求以确认,并查看您的平台是否有可用的修补程序. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |