??
Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing
-- Part I: Database setup required for this demo
------------------------------------------------------------------ -- SQL to grant appropriate privilege to database user,SCOTT ------------------------------------------------------------------ SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct; User altered. GRANT ALL ON DBMS_AQADM TO scott;
------------------------------------------------------------------ -- PLSQL to create queue-table and queue and start queue for SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table=>'scott.test_q_tab', queue_payload_type=>'RAW', multiple_consumers=>FALSE);
DBMS_AQADM.CREATE_QUEUE( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab');
DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q'); END; /
------------------------------------------------------------------ -- PLSQL to stop queue and drop queue & queue-table from SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.STOP_QUEUE('scott.test_q');
DBMS_AQADM.DROP_QUEUE( queue_name => 'scott.test_q', auto_commit => TRUE);
DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'scott.test_q_tab', force => FALSE, auto_commit => TRUE); END; / -- End of Part I,database setup.
//Part II: Demonstrates using the Listen method //C# using System; using System.Text; using Oracle.DataAccess.Client; using Oracle.DataAccess.Types; using System.Threading;
namespace ODPSample { /// <summary> /// Demonstrates how a thread can listen and wait until a message is enqueued. /// Once a message is enqueued,the listening thread returns from the /// blocked Listen() method invocation and dequeues the message. /// </summary> class EnqueueDequeue { static bool s_bListenReturned = false;
static void Main(string[] args) { // Create connection string constr = "user id=scott;password=Pwd4Sct;data source=oracle"; OracleConnection con = new OracleConnection(constr);
// Create queue OracleAQQueue queue = new OracleAQQueue("scott.test_q",con);
try { // Open connection con.Open();
// Set message type for the queue queue.MessageType = OracleAQMessageType.Raw;
// Spawning a thread which will listen for a message ThreadStart ts = new ThreadStart(TestListen); Thread t = new Thread(ts); t.Start();
System.Threading.Thread.Sleep(2000);
// Begin transaction for enqueue OracleTransaction txn = con.BeginTransaction();
// Prepare message and RAW payload OracleAQMessage enqMsg = new OracleAQMessage(); byte[] bytePayload = { 0,1,2,3,4,5,6,7,8,9 }; enqMsg.Payload = bytePayload;
// Prepare to Enqueue queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
Console.WriteLine("[Main Thread] Enqueuing a message..."); Console.WriteLine("[Main Thread] Enqueued Message Payload : " + ByteArrayToString(enqMsg.Payload as byte[])); Console.WriteLine();
// Enqueue message queue.Enqueue(enqMsg);
// Enqueue transaction commit txn.Commit();
// Loop till Listen returns while (!s_bListenReturned) System.Threading.Thread.Sleep(1000); } catch (Exception e) { Console.WriteLine("Error: {0}",e.Message); } finally { // Close/Dispose objects queue.Dispose(); con.Close(); con.Dispose(); } }
static void TestListen() { // Create connection string constr = "user id=scott;password=Pwd4Sct;data source=oracle"; OracleConnection conListen = new OracleConnection(constr);
// Create queue OracleAQQueue queueListen = new OracleAQQueue("scott.test_q",conListen);
try { // Open the connection for Listen thread. // Connection blocked on Listen thread can not be used for other DB // operations conListen.Open();
// Set message type for the queue queueListen.MessageType = OracleAQMessageType.Raw;
// Listen queueListen.Listen(null);
Console.WriteLine("[Listen Thread] Listen returned... Dequeuing...");
// Begin txn for Dequeue OracleTransaction txn = conListen.BeginTransaction();
// Prepare to Dequeue queueListen.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit; queueListen.DequeueOptions.Wait = 10;
// Dequeue message OracleAQMessage deqMsg = queueListen.Dequeue(); Console.WriteLine("[Listen Thread] Dequeued Message Payload : " + ByteArrayToString(deqMsg.Payload as byte[]));
// Dequeue txn commit txn.Commit();
// Allow the main thread to exit s_bListenReturned = true; } catch (Exception e) { Console.WriteLine("Error: {0}",e.Message); } finally { // Close/Dispose objects queueListen.Dispose(); conListen.Close(); conListen.Dispose(); } }
// Function to convert byte[] to string static private string ByteArrayToString(byte[] byteArray) { StringBuilder sb = new StringBuilder(); for (int n = 0; n < byteArray.Length; n++) { sb.Append((int.Parse(byteArray[n].ToString())).ToString("X")); } return sb.ToString(); } } }
-- Part I: Database setup required for this demo
------------------------------------------------------------------ -- SQL to grant appropriate privilege to database user,SCOTT ------------------------------------------------------------------ SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct; User altered. SQL> GRANT ALL ON DBMS_AQADM TO scott;
------------------------------------------------------------------ -- PLSQL to create queue-table and queue and start queue for SCOTT ------------------------------------------------------------------ BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table=>'scott.test_q_tab',database setup.
//Part II: Demonstrates application notification//C#using System;using System.Text;using Oracle.DataAccess.Client;using Oracle.DataAccess.Types;namespace ODPSample{ /// <summary> /// Demonstrates how the application can be notified when a message is /// available in a queue. /// </summary> class Notification { static bool isNotified = false; static void Main(string[] args) { // Create connection string constr = "user id=scott;password=Pwd4Sct;data source=oracle"; OracleConnection con = new OracleConnection(constr); // Create queue OracleAQQueue queue = new OracleAQQueue("scott.test_q",con); try { // Open connection con.Open(); // Set message type for the queue queue.MessageType = OracleAQMessageType.Raw; // Add the event handler to handle the notification. The // MsgReceived method will be invoked when a message is enqueued queue.MessageAvailable += new OracleAQMessageAvailableEventHandler(Notification.MsgReceived); Console.WriteLine("Notification registered..."); // Begin txn for enqueue OracleTransaction txn = con.BeginTransaction(); Console.WriteLine("Now enqueuing message..."); // Prepare message and RAW payload OracleAQMessage enqMsg = new OracleAQMessage(); byte[] bytePayload = { 0,9 }; enqMsg.Payload = bytePayload; // Prepare to Enqueue queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit; // Enqueue message queue.Enqueue(enqMsg); Console.WriteLine("Enqueued Message Payload : " + ByteArrayToString(enqMsg.Payload as byte[])); Console.WriteLine("MessageId of Enqueued Message : " + ByteArrayToString(enqMsg.MessageId)); Console.WriteLine(); // Enqueue txn commit txn.Commit(); // Loop while waiting for notification while (isNotified == false) { System.Threading.Thread.Sleep(2000); } } catch (Exception e) { Console.WriteLine("Error: {0}",e.Message); } finally { // Close/Dispose objects queue.Dispose(); con.Close(); con.Dispose(); } } static void MsgReceived(object src,OracleAQMessageAvailableEventArgs arg) { try { Console.WriteLine("Notification Received..."); Console.WriteLine("QueueName : {0}",arg.QueueName); Console.WriteLine("Notification Type : {0}",arg.NotificationType); //following type-cast to "byte[]" is required only for .NET 1.x byte[] notifiedMsgId = (byte[]) arg.MessageId[0]; Console.WriteLine("MessageId of Notified Message : " + ByteArrayToString(notifiedMsgId)); isNotified = true; } catch (Exception e) { Console.WriteLine("Error: {0}",e.Message); } } // Function to convert byte[] to string static private string ByteArrayToString(byte[] byteArray) { StringBuilder sb = new StringBuilder(); for (int n = 0; n < byteArray.Length; n++) { sb.Append((int.Parse(byteArray[n].ToString())).ToString("X")); } return sb.ToString(); } }} (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|