Oracle9i Application Developer's Guide - Advanced Queuing Release 2 (9.2) Part Number A96587-01 |
|
In this chapter we describe the operational interface to Oracle Advanced Queuing in terms of use cases. That is, we discuss each operation (such as "Enqueue a Message") as a use case by that name. The table listing all the use cases is provided at the head of the chapter (see "Use Case Model: Operational Interface -- Basic Operations" on page 11-2).
A summary figure, "Use Case Diagram: Operational Interface -- Basic Operations", locates all the use cases in a single drawing. If you are using the HTML version of this document, you can use this figure to navigate to the use case in which you are interested by clicking on the relevant use case title.
Each use case is laid out as follows:
Table 11-1, " Use Case Model: Operational Interface" indicates with a + where examples are provided for specific use cases and in which programmatic environment.
The table refers to programmatic environments with the following abbreviations:
Use Case | P | O | V | J | JM |
---|---|---|---|---|---|
- |
- |
- |
- |
- |
|
+ |
- |
+ |
- |
+ |
|
+ |
- |
+ |
- |
+ |
|
Enqueuing a Message [Specify Message Properties [Specify Sender ID]] |
+ |
- |
+ |
- |
+ |
+ |
- |
+ |
- |
+ |
|
+ |
+ |
+ |
- |
- |
|
+ |
+ |
+ |
- |
- |
|
- |
- |
- |
- |
- |
|
Dequeuing a Message from a Single-Consumer Queue [SpecifyOptions] |
+ |
- |
+ |
- |
+ |
Dequeuing a Message from a Multiconsumer Queue [Specify Options] |
+ |
- |
+ |
- |
+ |
- |
- |
- |
- |
- |
|
Registering for Notification [Specifying Subscription Name--Single-Consumer Queue] |
- |
+ |
- |
- |
- |
Registering for Notification [Specifying Subscription Name--Multiconsumer Queue] |
- |
+ |
- |
- |
- |
+ |
+ |
- |
- |
- |
|
- |
- |
- |
- |
- |
|
- |
- |
- |
- |
- |
See Also:
|
Adds a message to the specified queue.
If a message is enqueued to a multiconsumer queue with no recipient and the queue has no subscribers (or rule-based subscribers that match this message), then the Oracle error ORA 24033 is raised. This is a warning that the message will be discarded since there are no recipients or subscribers to whom it can be delivered.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
See Also:
|
To specify the options available for the enqueue operation.
Do not use the immediate
option when you want to use LOB locators since LOB locators are valid only for the duration of the transaction. As the immediate
option automatically commits the transaction, your locator will not be valid.
sequence deviation
parameter in enqueue options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue options parameter relative msgid. The relationship is identified by the sequence deviation
parameter.
Specifying sequence deviation
for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message has to be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message has to be greater than or equal to the priority of the message before which this message is to be enqueued.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
See Also:
|
The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
See Also:
|
To identify the sender (producer) of a message.
Not applicable.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
See Also:
|
To store a payload of type RAW
, AQ will create a queue table with LOB
column as the payload repository. The maximum size of the payload is determined by which programmatic environment you use to access AQ. For PL/SQL, Java and precompilers the limit is 32K; for the OCI the limit is 4G.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
/* Enqueue to msg_queue: */ DECLARE Enqueue_options DBMS_AQ.enqueue_options_t; Message_properties DBMS_AQ.message_properties_t; Message_handle RAW(16); Message aq.message_typ; BEGIN Message := aq.message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', Enqueue_options => enqueue_options, Message_properties => message_properties, Payload => message, Msgid => message_handle); COMMIT; END;
/* The queue namepriority_msg_queue
is defined as an object type queue table. The payload object type ismessage
. The schema of the queue isaq
. */ /* Enqueue a message with priority 30: */ DECLARE Enqueue_options dbms_aq.enqueue_options_t; Message_properties dbms_aq.message_properties_t; Message_handle RAW(16); Message aq.Message_typ; BEGIN Message := Message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); message_properties.priority := 30; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
/* Enqueue to msg_queue: */ DECLARE Enqueue_options DBMS_AQ.enqueue_options_t; Message_properties DBMS_AQ.message_properties_t; Message_handle RAW(16); Message aq.message_typ; BEGIN Message := aq.message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', Enqueue_options => enqueue_options, Message_properties => message_properties, transformation => 'AQ.MSG_MAP', Payload => message, Msgid => message_handle); COMMIT; END;
Where MSG_MAP
was created as follows:
BEGIN DBMS.TRANSFORM.CREATE_TRANSFORMATION ( schema => 'AQ', name => 'MSG_MAP', from_schema => 'AQ', from_type => 'PO_ORDER1', to_schema => 'AQ', to_type => 'PO_ORDER2', transformation => 'AQ.MAP_PO_ORDER (source.user_data)'), END;
/* Setup */ connect system/manager create user aq identified by aq; grant aq_administrator_role to aq; public static void setup(AQSession aq_sess) throws AQException { AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue; AQAgent agent; qtable_prop = new AQQueueTableProperty("RAW"); q_table = aq_sess.createQueueTable ("aq", "rawmsgs_qtab", qtable_prop); queue_prop = new AQQueueProperty(); queue = aq_sess.createQueue (q_table, "msg_queue", queue_prop); queue.start(); qtable_prop = new AQQueueTableProperty("RAW"); qtable_prop.setMultiConsumer(true); qtable_prop.setSortOrder("priority,enq_time"); q_table = aq_sess.createQueueTable ("aq", "rawmsgs_qtab2", qtable_prop); queue_prop = new AQQueueProperty(); queue = aq_sess.createQueue (q_table, "priority_msg_queue", queue_prop); queue.start(); agent = new AQAgent("subscriber1", null); queue.addSubscriber(agent, null); } /* Enqueue a message */ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to the queue */ queue = aq_sess.getQueue ("aq", "msg_queue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); db_conn.commit(); } /* Enqueue a message with priority = 5 */ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQMessageProperty msg_prop; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "priority message"; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to the queue */ queue = aq_sess.getQueue ("aq", "msg_queue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get Message property */ msg_prop = message.getMessageProperty(); /* Set priority */ msg_prop.setPriority(5); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); db_conn.commit(); }
Enqueuing messages of type objects
'Prepare the message. MESSAGE_TYPE is a user defined type ' in the "AQ" schema Set OraMsg = Q.AQMsg(1, "MESSAGE_TYPE") Set OraObj = DB.CreateOraObject("MESSAGE_TYPE") OraObj("subject").Value = "Greetings from OO4O" OraObj("text").Value = "Text of a message originated from OO4O" Set OraMsg.Value = OraObj Msgid = Q.Enqueue
Enqueuing messages of type RAW
'Create an OraAQ object for the queue "DBQ" Dim Q as object Dim Msg as object Dim OraSession as object Dim DB as object Set OraSession = CreateObject("OracleInProcServer.XOraSession") Set OraDatabase = OraSession.OpenDatabase(mydb, "scott/tiger" 0&) Set Q = DB.CreateAQ("DBQ") 'Get a reference to the AQMsg object Set Msg = Q.AQMsg Msg.Value = "Enqueue the first message to a RAW queue." 'Enqueue the message Q.Enqueue() 'Enqueue another message. Msg.Value = "Another message" Q.Enqueue() 'Enqueue a message with nondefault properties. Msg.Priority = ORAQMSG_HIGH_PRIORITY Msg.Delay = 5 Msg.Value = "Urgent message" Q.Enqueue() Msg.Value = "The visibility option used in the enqueue call is ORAAQ_ENQ_IMMEDIATE" Q.Visible = ORAAQ_ENQ_IMMEDIATE Msgid = Q.Enqueue 'Enqueue Ahead of message Msgid_1 Msg.Value = "First Message to test Relative Message id" Msg.Correlation = "RELATIVE_MESSAGE_ID" Msgid_1 = Q.Enqueue Msg.Value = "Second message to test RELATIVE_MESSAGE_ID is queued ahead of the First Message " OraAq.relmsgid = Msgid_1 Msgid = Q.Enqueue
See Also:
|
The call takes a list of agents as an argument. You specify the queue to be monitored in the address field of each agent listed. You also must specify the name of the agent when monitoring multiconsumer queues. For single-consumer queues, an agent name must not be specified. Only local queues are supported as addresses. Protocol is reserved for future use.
This is a blocking call that returns when there is a message ready for consumption for an agent in the list. If there are messages for more than one agent, only the first agent listed is returned. If there are no messages found when the wait time expires, an error is raised.
A successful return from the listen
call is only an indication that there is a message for one of the listed agents in one the specified queues. The interested agent must still dequeue the relevant message.
Note that you cannot call listen
on nonpersistent queues.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
/* The listen call allows you to monitor a list of queues for messages for specific agents. You need to have dequeue privileges for all the queues you wish to monitor. */
DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multiconsumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent(NULL, 'scott.SCQ1', NULL); Qlist(2):= aq$_agent(NULL, 'SCQ2', NULL); Qlist(3):= aq$_agent(NULL, 'SCQ3', NULL); /* Listen with a time-out of zero: */ DBMS_AQ.LISTEN( Agent_list => My_agent_list, Wait => 0, Agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address); DBMS_OUTPUT.PUT_LINE(''); END;
public static void monitor_status_queue(Connection db_conn) { AQSession aq_sess; AQAgent[] agt_list = null; AQAgent ret_agt = null; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); /* Construct the waiters list: */ agt_list = new AQAgent[3]; agt_list[0] = new AQAgent(null, "scott.SCQ1",0); agt_list[1] = new AQAgent (null, "SCQ2",0); agt_list[2] = new AQAgent (null, "SCQ3",0); /* Wait for order status messages for 120 seconds: */ ret_agt = aq_sess.listen(agt_list, 120); System.out.println("Message available for agent: " + ret_agt.getName() + " " + ret_agt.getAddress()); } catch (AQException aqex) { System.out.println("Exception-1: " + aqex); } catch (Exception ex) { System.out.println("Exception-2: " + ex); }
}
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) LNOCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ void SetAgent(agent, appname, queue,errhp) LNOCIAQAgent *agent; text *appname; text *queue; LNOCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) LNOCIAQAgent *agent; LNOCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) LNOCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ /* void SetAgent(agent, appname, queue) */ void SetAgent(agent, appname, queue,errhp) LNOCIAQAgent *agent; text *appname; text *queue; LNOCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) LNOCIAQAgent *agent; LNOCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
See Also:
|
See the usage notes in "Listening to One or More Single-Consumer Queues".
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
/* The listen call allows you to monitor a list of queues for messages for specific agents. You need to have dequeue privileges for all the queues you wish to monitor. */
DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multiconsumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent('agent1', 'MCQ1', NULL); Qlist(2):= aq$_agent('agent2', 'scott.MCQ2', NULL); Qlist(3):= aq$_agent('agent3', 'scott.MCQ3', NULL); /* Listen with a time-out of zero: */ DBMS_AQ.LISTEN( agent_list => My_agent_list, wait => 0, agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address); DBMS_OUTPUT.PUT_LINE(''); END; /
DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multiconsumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent('agent1', 'MCQ1', NULL); Qlist(2):= aq$_agent(NULL, 'scott.SQ1', NULL); Qlist(3):= aq$_agent('agent3', 'scott.MCQ3', NULL); /* Listen with a time-out of 100 seconds */ DBMS_AQ.LISTEN( Agent_list => My_agent_list, Wait => 100, Agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address || 'for agent' || agent_w_msg.name); DBMS_OUTPUT.PUT_LINE(''); END; /
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) LNOCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } void SetAgent(OCIAQAgent *agent, text *appname, text *queue, OCIError *errhp, OCIEnv *envhp); void GetAgent(OCIAQAgent *agent, OCIError *errhp); /*----------------------------------------------------------------*/ /* OCI Listen examples for multiconsumers */ /* */ void SetAgent(agent, appname, queue, errhp) LNOCIAQAgent *agent; text *appname; text *queue; LNOCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) LNOCIAQAgent *agent; LNOCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } /* main from AQ Listen to multiconsumer Queues */ /* int main() */ int main(char *argv, int argc) { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent; int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **)0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * MCQ1, MCQ2, MCQ3 are multiconsumer queues in SCOTT's schema */ /* Listening to Multiconsumer Queues with Zero Timeout */ SetAgent(agent_list[0], "app1", "MCQ1", errhp); SetAgent(agent_list[1], "app2", "MCQ2", errhp); SetAgent(agent_list[2], "app3", "MCQ3", errhp); checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); /* Listening to Multiconsumer Queues with Timeout of 120 Seconds */ SetAgent(agent_list[0], "app1", "SCOTT.MCQ1", errhp); SetAgent(agent_list[1], "app2", "SCOTT.MCQ2", errhp); SetAgent(agent_list[2], "app3", "SCOTT.MCQ3", errhp); checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); /* Listening to a Mixture of Single and Multiconsumer Queues * with a Timeout of 100 Seconds */ SetAgent(agent_list[0], "app1", "SCOTT.MCQ1", errhp); SetAgent(agent_list[1], "app2", "SCOTT.MCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 100, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
See Also:
|
Dequeues a message from the specified queue.
READY
state are dequeued unless a msgid is specified.BROWSE
call may not see a message that is enqueued after the beginning of the browsing transaction.The default NAVIGATION
parameter during dequeue is NEXT MESSAGE
. This means that subsequent dequeues will retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command will be processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST MESSAGE n
avigation option when the first message in the queue needs to be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already-enqueued are being processed.
LOCKED
or REMOVE
mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group will lock the entire group. This is useful when all the messages in a group need to be processed as an atomic unit.NEXT TRANSACTION
to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue will time-out after the specified WAIT
period.See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
See Also:
|
To specify the options available for the dequeue operation.
Typically, you expect the consumer of messages to access messages using the dequeue interface. You can view processed messages or messages still to be processed by browsing by message id or by using SELECT
s.
The transformation, if specified, is applied before returning the message to the caller. The transformation should be defined to map the queue ADT type to the return type desired by the caller.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
/* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
/* Dequeue a message with correlation id = 'RUSH' */ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue = aq_sess.getQueue ("aq", "msg_queue"); /* Create a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); deq_option.setCorrelation("RUSH"); /* Dequeue a message */ message = queue.dequeue(deq_option); System.out.println("Successful dequeue"); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); db_conn.commit(); }
Dequeuing messages of RAW type
'Dequeue the first message available Q.Dequeue() Set Msg = Q.QMsg 'Display the message content MsgBox Msg.Value 'Dequeue the first message available without removing it ' from the queue Q.DequeueMode = ORAAQ_DEQ_BROWSE 'Dequeue the first message with the correlation identifier ' equal to "RELATIVE_MSG_ID" Q.Navigation = ORAAQ_DQ_FIRST_MSG Q.correlate = "RELATIVE_MESSAGE_ID" Q.Dequeue 'Dequeue the next message with the correlation identifier ' of "RELATIVE_MSG_ID" Q.Navigation = ORAAQ_DQ_NEXT_MSG Q.Dequeue() 'Dequeue the first high priority message Msg.Priority = ORAQMSG_HIGH_PRIORITY Q.Dequeue() 'Dequeue the message enqueued with message id of Msgid_1 Q.DequeueMsgid = Msgid_1 Q.Dequeue() 'Dequeue the message meant for "ANDY" Q.consumer = "ANDY" Q.Dequeue() 'Return immediately if there is no message on the queue Q.wait = ORAAQ_DQ_NOWAIT Q.Dequeue()
Dequeuing messages of Oracle object type
Set OraObj = DB.CreateOraObject("MESSAGE_TYPE") Set QMsg = Q.AQMsg(1, "MESSAGE_TYPE") 'Dequeue the first message available without removing it Q.Dequeue() OraObj = QMsg.Value 'Display the subject and data MsgBox OraObj!subject & OraObj!Data
See Also:
|
To specify the options available for the dequeue operation.
Not applicable.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
Examples in the following programmatic environments are provided:
/* Dequeue a message for subscriber1 in browse mode*/ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue = aq_sess.getQueue ("aq", "priority_msg_queue"); /* Create a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); /* Set dequeue mode to BROWSE */ deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE); /* Dequeue messages for subscriber1 */ deq_option.setConsumerName("subscriber1"); /* Dequeue a message: */ message = queue.dequeue(deq_option); System.out.println("Successful dequeue"); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); db_conn.commit(); }
See Also:
|
To register a callback for message notification.
message_receive
(dequeue) to retrieve the message.LNOCI_SUBSCR_NAMESPACE_AQ
.schema
.queue
' if the registration is for a single-consumer queue and 'schema.queue:consumer_name
' if the registration is for a multiconsumer queues.LNOCIAQListen
(), LNOCISubscriptionDisable
(), LNOCISubscriptionEnable
(), LNOCISubscriptionUnRegister
()
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Examples in the following programmatic environments are provided:
See Also:
|
See Also:
|
Not applicable.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment.
/* OCIRegister can be used by the client to register to receive notifications when messages are enqueued into non-persistent and normal queues. */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static OCIEnv *envhp; static OCIServer *srvhp; static OCIError *errhp; static OCISvcCtx *svchp; /* The callback that gets invoked on notification */ ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; LNOCISubscription *subscrhp; /* subscription handle */ dvoid *pay; /* payload */ ub4 payl; /* payload length */ dvoid *desc; /* the AQ notification descriptor */ ub4 mode; { text *subname; ub4 size; ub4 *number = (ub4 *)ctx; text *queue; text *consumer; OCIRaw *msgid; OCIAQMsgProperties *msgprop; (*number)++; /* Get the subscription name */ OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, (dvoid *)&subname, &size, OCI_ATTR_SUBSCR_NAME, errhp); printf("got notification number %d for %.*s %d \n", *number, size, subname, payl); /* Get the queue name from the AQ notify descriptor */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, OCI_ATTR_QUEUE_NAME, errhp); /* Get the consumer name for which this notification was received */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, OCI_ATTR_CONSUMER_NAME, errhp); /* Get the message id of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, OCI_ATTR_NFY_MSGID, errhp); /* Get the message properties of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, OCI_ATTR_MSG_PROP, errhp); } int main(argc, argv) int argc; char *argv[]; { OCISession *authp = (OCISession *) 0; /* The subscription handles */ OCISubscription *subscrhp[5]; /* Registrations are for AQ namespace */ ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; /* The context fot the callback */ ub4 ctx[5] = {0,0,0,0,0}; printf("Initializing OCI Process\n"); /* The OCI Process Environment must be initialized with OCI_EVENTS */ /* OCI_OBJECT flag is set to enable us dequeue */ (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0 ); printf("Initialization successful\n"); /* The standard OCI setup */ printf("Initializing OCI Env\n"); (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 ); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0); /* Server contexts */ (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX, (size_t) 0, (dvoid **) 0); printf("connecting to server\n"); (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0); printf("connect successful\n"); /* Set attribute server context in the service context */ (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp); (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "scott", (ub4) strlen("scott"), (ub4) OCI_ATTR_USERNAME, errhp); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin ( svchp, errhp, authp, OCI_CRED_RDBMS, (ub4) OCI_DEFAULT)); (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) authp, (ub4) 0, (ub4) OCI_ATTR_SESSION, errhp); /* Setting the subscription handle for notification on a NORMAL single-consumer queue */ printf("allocating subscription handle\n"); subscrhp[0] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); printf("setting subscription context \n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a NORMAL multiconsumer consumer queue */ subscrhp[1] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.MCQ1:APP1", (ub4) strlen("SCOTT.MCQ1:APP1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a non-persistent single-consumer queue */ subscrhp[2] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.NP_SCQ1", (ub4) strlen("SCOTT.NP_SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[2], (ub4)sizeof(ctx[2]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a non-persistent multi consumer queue */ /* Waiting on user specified recipient */ subscrhp[3] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.NP_MCQ1", (ub4) strlen("SCOTT.NP_MCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[3], (ub4)sizeof(ctx[3]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("Registering for all the subscriptiosn \n"); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 4, errhp, OCI_DEFAULT)); printf("Waiting for notifcations \n"); /* wait for minutes for notifications */ sleep(300); printf("Exiting\n"); }
See Also:
Table 11-1 for a list of operational interface basic operations |
To post to a list of anonymous subscriptions so clients registered for the subscription get notifications.
Several subscriptions can be posted to at one time. Posting to a subscription involves identifying the subscription name and the payload, if desired. It is possible for no payload to be associated with this call. This call provides a best-effort guarantee. A notification goes to registered clients at most once.
This call is primarily used for lightweight notification and is useful in the case of several system events. If an application needs more rigid guarantees, it can use AQ functionality by enqueuing to a queue.
When using OCI, the user must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_ANONYMOUS
.
When using PL/SQL, the namespace attribute in aq$_post_info
must be set to DBMS_AQ.NAMESPACE_ANONYMOUS
.
Related functions: LNOCIAQListen(), OCISvcCtxToLda(), LNOCISubscriptionEnable(), OCISubscriptionRegister(), LNOCISubscriptionUnRegister(), dbms_aq.register, dbms_aq.unregister.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment.
-- Register for notification DECLARE reginfo sys.aq$_reg_info; reginfolist sys.aq$_reg_info_list; BEGIN -- Register for anonymous subscription PUBSUB1.ANONSTR, consumer_name ADMIN -- The PL/SQL callback pubsub1.mycallbk will be invoked -- when a notification is received reginfo := sys.aq$_reg_info('PUBSUB1.ANONSTR:ADMIN', DBMS_AQ.NAMESPACE_ANONYMOUS, 'plsql://PUBSUB1.mycallbk', HEXTORAW('FF')); reginfolist := sys.aq$_reg_info_list(reginfo); sys.dbms_aq.register(reginfolist, 1); commit; END; / -- Post to an anonymous subscription DECLARE postinfo sys.aq$_post_info; postinfolist sys.aq$_post_info_list; BEGIN -- Post to the anonymous subscription PUBSUB1.ANONSTR, consumer_name ADMIN postinfo := sys.aq$_post_info('PUBSUB1.ANONSTR:ADMIN',0,HEXTORAW('FF')); postinfolist := sys.aq$_post_info_list(postinfo); sys.dbms_aq.post(postinfolist, 1); commit; END; /
See Also:
Table 11-1 for a list of operational interface basic operations |
To add an agent to the LDAP server.
This call takes an agent and an optional certificate location as the arguments, and adds the agent entry to the LDAP server. The certificate location parameter is the distinguished name of the LDAP entry that contains the digital certificate which the agent will use. If the agent does not have a digital certificate, this parameter will be defaulted to null.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment.
See Also:
Table 11-1 for a list of operational interface basic operations |
To remove an agent from the LDAP server.
This call takes an agent as the argument, and removes the corresponding agent entry in the LDAP server.
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment. Use the following syntax references for each programmatic environment:
See Chapter 3, "AQ Programmatic Environments" for a list of available functions in each programmatic environment.
|
Copyright © 1996, 2002 Oracle Corporation. All Rights Reserved. |
|