Skip Headers
Oracle® Data Provider for .NET Developer's Guide
11g Release 2 (11.2)

Part Number E12249-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
View PDF

Oracle Streams Advanced Queuing Support

Oracle Streams Advanced Queuing (AQ) provides database-integrated message queuing functionality. Oracle Streams AQ is built on top of Oracle Streams and leverages the functions of Oracle Database so that messages can be stored persistently, propagated between queues on different computers and databases, and transmitted using Oracle Net Services and HTTP(S).

As Oracle Streams AQ is implemented in database tables, all operational benefits of high availability, scalability, and reliability are also applicable to queue data. Oracle Streams AQ supports standard database features such as recovery, restart, and security.

The following items discuss Oracle Streams AQ concepts:

Using ODP.NET for Advanced Queuing

.NET applications can use ODP.NET to access all the operational features of AQ such as Enqueuing, Dequeuing, Listen, and Notification.

Table 3-25 maps the AQ features to their corresponding ODP.NET implementation.

Table 3-25 Mapping AQ Features with their ODP.NET Implementation

Functionality ODP.NET Implementation

Create a Message

Create an OracleAQMessage object

Enqueue a single message

Specify the message as OracleAQMessage, queue as OracleAQQueue and enqueue options on OracleAQQueue, call OracleAQQueue.Enqueue

Enqueue multiple messages

Specify the messages as an OracleAQMessage array in OracleAQQueue.EnqueueArray

Dequeue a single message

Specify dequeue options on OracleAQQueue and call OracleAQQueue.Dequeue

Dequeue multiple messages

Call OracleAQQueue.DequeueArray

Listen for messages on Queue(s)

Call OracleAQQueue.Listen.To listen on multiple queues use static Listen method of OracleAQQueue

Message Notifications

Use OracleAQQueue.MessageAvailable Event along with the NotificationConsumers property


Note:

AQ samples are provided in the ORACLE_BASE\ORACLE_HOME\ODP.NET\Samples directory.

Enqueuing and Dequeuing Example

The following example demonstrates enqueuing and dequeuing messages using a single consumer queue. The first part of the example performs the requisite database setup for the database user, SCOTT. The second part of the example demonstrates enqueuing and dequeuing messages.

-- Part I: Database setup required for this demo
 
------------------------------------------------------------------
-- SQL to grant appropriate privilege to database user, SCOTT
------------------------------------------------------------------
SQL> ALTER USER SCOTT ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
SQL> GRANT ALL ON DBMS_AQADM TO scott;
 
------------------------------------------------------------------
-- PL/SQL to create queue-table and queue and start queue for SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'RAW', 
    multiple_consumers=>FALSE);
 
  DBMS_AQADM.CREATE_QUEUE(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/
 
------------------------------------------------------------------
-- PL/SQL to stop queue and drop queue & queue-table from SCOTT
------------------------------------------------------------------
BEGIN
  DBMS_AQADM.STOP_QUEUE('scott.test_q');
 
  DBMS_AQADM.DROP_QUEUE(
    queue_name => 'scott.test_q', 
    auto_commit => TRUE);
 
  DBMS_AQADM.DROP_QUEUE_TABLE(
    queue_table => 'scott.test_q_tab',
    force => FALSE, 
    auto_commit => TRUE);
END;
/
-- End of Part I, database setup.

//Part II: Enqueuing and dequeuing messages
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
 
namespace ODPSample
{
  /// <summary>
  /// Demonstrates Enqueuing and Dequeuing raw message 
  /// using a single consumer queue
  /// </summary>
  class EnqueueDequeue
  {
    static void Main(string[] args)
    {
      // Create connection
      string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
      OracleConnection con = new OracleConnection(constr);
 
      // Create queue
      OracleAQQueue queue = new OracleAQQueue("scott.test_q", con);
 
      try
      {
        // Open connection
        con.Open();
 
        // Begin txn for enqueue
        OracleTransaction txn = con.BeginTransaction();
 
        // Set message type for the queue
        queue.MessageType = OracleAQMessageType.Raw;
 
        // Prepare message and RAW payload
        OracleAQMessage enqMsg = new OracleAQMessage();
        byte[] bytePayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqMsg.Payload = bytePayload;
 
        // Prepare to Enqueue
        queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
 
        // Enqueue message
        queue.Enqueue(enqMsg);
 
        Console.WriteLine("Enqueued Message Payload      : "
          + ByteArrayToString(enqMsg.Payload as byte[]));
        Console.WriteLine("MessageId of Enqueued Message : "
          + ByteArrayToString(enqMsg.MessageId));
 
        // Enqueue txn commit
        txn.Commit();
 
        // Begin txn for Dequeue
        txn = con.BeginTransaction();
 
        // Prepare to Dequeue
        queue.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
        queue.DequeueOptions.Wait = 10;
 
        // Dequeue message
        OracleAQMessage deqMsg = queue.Dequeue();
 
        Console.WriteLine("Dequeued Message Payload      : "
          + ByteArrayToString(deqMsg.Payload as byte[]));
        Console.WriteLine("MessageId of Dequeued Message : "
          + ByteArrayToString(deqMsg.MessageId));
 
        // Dequeue txn commit
        txn.Commit();
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: {0}", e.Message);
      }
      finally
      {
        // Close/Dispose objects
        queue.Dispose();
        con.Close();
        con.Dispose();
      }
    }
 
    // Function to convert byte[] to string
    static private string ByteArrayToString(byte[] byteArray)
    {
      StringBuilder sb = new StringBuilder();
      for (int n = 0; n < byteArray.Length; n++)
      {
        sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
      }
      return sb.ToString();
    }
  }
}