Oracle® Streams Advanced Queuing User's Guide and Reference 10g Release 2 (10.2) Part Number B14257-01 |
|
|
View PDF |
This appendix presents syntax and examples for the Oracle Streams Advanced Queuing (AQ) OCI operational interface.
This appendix contains these topics:
sword OCIAQEnq ( OCISvcCtx *svch, OCIError *errh, text *queue_name, OCIAQEnqOptions *enqueue_options, OCIAQMsgProperties *message_properties, OCIType *payload_tdo, dvoid **payload, dvoid **payload_ind, OCIRaw **msgid, ub4 flags );
This call is used for an Oracle Streams AQ enqueue.
See Also: "OCIAQEnq()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
Example C-1 through Example C-5 demonstrate the use of OCIAQEnq()
and OCIAQDeq()
in several different situations.
Example C-1 Enqueuing and Dequeuing an Object Payload
struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main( argc, argv) int argc; char * argv[]; { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *,size_t)) 0, dvoid * (*)(dvoid *, dvoid *, size_t)) 0, (void (*)(dvoid *, dvoid *)) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, (const text *)"AQ", (ub4) strlen("AQ"), (const text *) "AQ", (ub4) strlen("AQ"), (const text *)0, 0); /* obtain TDO of message_type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4) strlen("AQ"), (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", (ub4) strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp,(CONST text *)"OCI ENQUEUE", (ub4) strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (text *)"msg_queue", (OCIAQDeqOptions *)0, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); return 0; }
Example C-2 Enqueuing and Dequeuing Using Correlation Identifiers
struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main( argc, argv) int argc; char * argv[]; { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIRaw*firstmsg = (OCIRaw *)0; OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; text correlation1[30], correlation2[30]; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *,size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t)) 0, (void (*)(dvoid *, dvoid *)) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0); /* allocate message properties descriptor */ OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); strcpy((char *) correlation1, "1st message"); OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&correlation1, (ub4) strlen((const char*) correlation1), OCI_ATTR_CORRELATION, errhp); /* obtain TDO of message_type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4)strlen("AQ"), (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL ENQUEUE1", (ub4) strlen("NORMAL ENQUEUE1"), &mesg->subject); OCIStringAssignText(envhp, errhp,(CONST text *)"OCI ENQUEUE", (ub4) strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* enqueue into the msg_queue, store the message id into firstmsg */ OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, &firstmsg, 0); /* enqueue into the msg_queue with a different correlation id */ strcpy((char *)correlation2, "2nd message"); OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid*)&correlation2, (ub4) strlen((const char *)correlation2), OCI_ATTR_CORRELATION, errhp); OCIStringAssignText(envhp, errhp, (text *)"NORMAL ENQUEUE2", (ub4) strlen("NORMAL ENQUEUE2"), &mesg->subject); OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* first dequeue by correlation id "2nd message" */ /* allocate dequeue options descriptor and set the correlation option */ OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0); OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)correlation2, (ub4) strlen((const char *)correlation2), OCI_ATTR_CORRELATION, errhp); /* dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (text *)"msg_queue", deqopt, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); /* second dequeue by message id */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&firstmsg, OCIRawSize(envhp, firstmsg), OCI_ATTR_DEQ_MSGID, errhp); /* clear correlation id option */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)correlation2, 0, OCI_ATTR_CORRELATION, errhp); /* dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (text *)"msg_queue", deqopt, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); return 0; }
Example C-3 Enqueuing and Dequeuing Messages by Correlation and Message ID Using OCI
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
Example C-4 Enqueuing and Dequeuing of a RAW Payload
int main( argc, argv) int argc; char * argv[]; { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; char msg_text[100]; OCIRaw *mesg = (OCIRaw *)0; OCIRaw*deqmesg = (OCIRaw *)0; OCIInd ind = 0; dvoid *indptr = (dvoid *)&ind; int i; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t)) 0, (void (*)(dvoid *, dvoid *)) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0); /* obtain the TDO of the RAW datatype */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"SYS", (ub4) strlen("SYS"), (CONST text *)"RAW", (ub4) strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ strcpy(msg_text, "Enqueue to a RAW queue"); OCIRawAssignBytes(envhp, errhp, (const ub1 *)msg_text, (ub4) strlen(msg_text), &mesg); /* enqueue the message into raw_msg_queue */ OCIAQEnq(svchp, errhp, (text *)"raw_msg_queue", (OCIAQEnqOptions *)0, (OCIAQMsgProperties *) 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, (OCIRaw **)0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue the same message into C variable deqmesg */ OCIAQDeq(svchp, errhp, (text *)"raw_msg_queue", (OCIAQDeqOptions *)0, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, (OCIRaw **)0, 0); for (i = 0; i < OCIRawSize(envhp, deqmesg); i++) printf("%c", *(OCIRawPtr(envhp, deqmesg) + i)); OCITransCommit(svchp, errhp, (ub4) 0); return 0; }
Example C-5 Enqueuing and Dequeuing Using OCIAQAgent
struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main( argc, argv) int argc; char * argv[]; { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0; OCIAQAgent *agents[2]; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = OCI_DEQ_NO_WAIT; ub4 navigation = OCI_DEQ_FIRST_MSG; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *,size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t)) 0, (void (*)(dvoid *, dvoid *)) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0); /* obtain TDO of message_type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4) strlen("AQ"), (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 1", (ub4) strlen("MESSAGE 1"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for queue subscribers", (ub4) strlen("mesg for queue subscribers"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* enqueue MESSAGE 1 for subscribers to the queue for RED and GREEN */ OCIAQEnq(svchp, errhp, (text *)"msg_queue_multiple", (OCIAQEnqOptions *)0, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0); /* enqueue MESSAGE 2 for specified recipients for RED and BLUE */ /* prepare message payload */ OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 2", (ub4) strlen("MESSAGE 2"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for two recipients", (ub4) strlen("mesg for two recipients"), &mesg->data); /* allocate AQ message properties and agent descriptors */ OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[0], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[1], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); /* prepare the recipient list, RED and BLUE */ OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, (dvoid *) "RED", (ub4) strlen("RED"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, (dvoid *)"BLUE", (ub4) strlen("BLUE"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2, OCI_ATTR_RECIPIENT_LIST, errhp); OCIAQEnq(svchp, errhp, (text *)"msg_queue_multiple", (OCIAQEnqOptions *)0, msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* now dequeue the messages using different consumer names */ /* allocate dequeue options descriptor to set the dequeue options */ OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0); /* set wait parameter to NO_WAIT so that the dequeue returns immediately */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp); /* set navigation to FIRST_MESSAGE so that the dequeue resets the position */ /* after a new consumer_name is set in the dequeue options */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp); /* dequeue from the msg_queue_multiple as consumer BLUE */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", (ub4)strlen("BLUE"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt, (OCIAQMsgProperties *) 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **) 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue_multiple as consumer RED */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", (ub4)strlen("RED"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue_multiple as consumer GREEN */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN", (ub4) strlen("GREEN"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt, (OCIAQMsgProperties *)0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); return 0; }
Example C-6 Enqueuing and Dequeuing Messages for a Multiconsumer Queue
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>
struct message
{
OCIString *subject;
OCIString *data;
};
typedef struct message message;
struct null_message
{
OCIInd null_adt;
OCIInd null_subject;
OCIInd null_data;
};
typedef struct null_message null_message;
int main()
{
OCIEnv *envhp;
OCIServer *srvhp;
OCIError *errhp;
OCISvcCtx *svchp;
dvoid *tmp;
OCIType *mesg_tdo = (OCIType *) 0;
message msg;
null_message nmsg;
message *mesg = &msg;
null_message *nmesg = &nmsg;
message *deqmesg = (message *)0;
null_message *ndeqmesg = (null_message *)0;
OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0;
OCIAQAgent *agents[2];
OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0;
ub4 wait = OCI_DEQ_NO_WAIT;
ub4 navigation = OCI_DEQ_FIRST_MSG;
OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0,
(dvoid * (*)()) 0, (void (*)()) 0 );
OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
52, (dvoid **) &tmp);
OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp );
OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
52, (dvoid **) &tmp);
OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
52, (dvoid **) &tmp);
OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
52, (dvoid **) &tmp);
OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
(ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);
/* Obtain TDO of message_typ */
OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"),
(CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"),
(text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);
/* Prepare the message payload */
mesg->subject = (OCIString *)0;
mesg->data = (OCIString *)0;
OCIStringAssignText(envhp, errhp,
(CONST text *)"MESSAGE 1", strlen("MESSAGE 1"),
&mesg->subject);
OCIStringAssignText(envhp, errhp,
(CONST text *)"mesg for queue subscribers",
strlen("mesg for queue subscribers"), &mesg->data);
nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;
/* Enqueue MESSAGE 1 for subscribers to the queue. */
OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, 0,
mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);
/* Enqueue MESSAGE 2 for specified recipients. */
/* prepare message payload */
OCIStringAssignText(envhp, errhp,
(CONST text *)"MESSAGE 2", strlen("MESSAGE 2"),
&mesg->subject);
OCIStringAssignText(envhp, errhp,
(CONST text *)"mesg for two recipients",
strlen("mesg for two recipients"), &mesg->data);
/* Allocate AQ message properties and agent descriptors */
OCIDescriptorAlloc(envhp, (dvoid **)&msgprop,
OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
OCIDescriptorAlloc(envhp, (dvoid **)&agents[0],
OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
OCIDescriptorAlloc(envhp, (dvoid **)&agents[1],
OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
/* Prepare the recipient list, RED and BLUE */
OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, "RED", strlen("RED"),
OCI_ATTR_AGENT_NAME, errhp);
OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, "BLUE", strlen("BLUE"),
OCI_ATTR_AGENT_NAME, errhp);
OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2,
OCI_ATTR_RECIPIENT_LIST, errhp);
OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, msgprop,
mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);
OCITransCommit(svchp, errhp, (ub4) 0);
/* Now dequeue the messages using different consumer names */
/* Allocate dequeue options descriptor to set the dequeue options */
OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0,
(dvoid **)0);
/* Set wait parameter to NO_WAIT so that the dequeue returns immediately */
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0,
OCI_ATTR_WAIT, errhp);
/* Set navigation to FIRST_MESSAGE so that the dequeue resets the position */
/* after a new consumer_name is set in the dequeue options */
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0,
OCI_ATTR_NAVIGATION, errhp);
/* Dequeue from the msg_queue_multiple as consumer BLUE */
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", strlen("BLUE"),
OCI_ATTR_CONSUMER_NAME, errhp);
while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0)
== OCI_SUCCESS)
{
printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
}
OCITransCommit(svchp, errhp, (ub4) 0);
/* Dequeue from the msg_queue_multiple as consumer RED */
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", strlen("RED"),
OCI_ATTR_CONSUMER_NAME, errhp);
while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0)
== OCI_SUCCESS)
{
printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
}
OCITransCommit(svchp, errhp, (ub4) 0);
/* Dequeue from the msg_queue_multiple as consumer GREEN */
OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN",strlen("GREEN"),
OCI_ATTR_CONSUMER_NAME, errhp);
while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0)
== OCI_SUCCESS)
{
printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
}
OCITransCommit(svchp, errhp, (ub4) 0);
}
sword OCIAQEnqArray ( OCISvcCtx *svchp, OCIError *errhp, OraText *queue_name, OCIAQEnqOptions *enqopt, ub4 *iters, OCIAQMsgProperties **msgprop, OCIType *payload_tdo, dvoid **payload, dvoid **payload_ind, OCIRaw **msgid, dvoid *ctxp, OCICallbackAQEnq (cbfp) ( dvoid *ctxp, dvoid **payload, dvoid **payload_ind ), ub4 flags );
This call enqueues an array of messages to a queue. The array of messages is enqueued with the same options and has the same payload column TDO.
See Also: "OCIAQEnqArray()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
Example C-7 Enqueuing an Array of Messages
struct message{ OCIString *data; }; typedef struct message message; struct null_message{ OCIInd null_adt; OCIInd null_data; }; typedef struct null_message null_message; int main( argc, argv) int argc ; char **argv ;{ OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message mesg[NMESGS]; message *mesgp[NMESGS]; null_message nmesg[NMESGS]; null_message *nmesgp[NMESGS]; int i, j, k; OCIInd ind[NMESGS]; dvoid *indptr[NMESGS]; ub4 priority; OCIAQEnqOptions *enqopt = (OCIAQEnqOptions *)0; OCIAQMsgProperties *msgprop= (OCIAQMsgProperties *)0; ub4 wait = 1; ub4 navigation = OCI_DEQ_NEXT_MSG; ub4 iters = 2; text *qname ; text mesgdata[30]; ub4 payload_size = 5; text *payload = (text *)0; ub4 batch_size = 2; ub4 enq_size = 2; printf("session start\n"); /* establish a session */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); printf("server attach\n"); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* get descriptor for enqueue options */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&enqopt, OCI_DTYPE_AQENQ_OPTIONS, 0, (dvoid **)0)); printf("enq options set\n"); /* set enqueue options - for consumer name, wait and navigation */ /* construct null terminated payload string */ payload = (text *)malloc(payload_size+1); for (k=0 ; k < payload_size ; k++) payload[k] = 'a'; payload[payload_size] = '\0'; for (k=0 ; k < batch_size ; k++) { indptr[k] = &ind[k]; mesgp[k] = &mesg[k]; nmesgp[k] = &nmesg[k]; nmesg[k].null_adt = nmesg[k].null_data = OCI_IND_NOTNULL; mesg[k].data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (const unsigned char *)payload, strlen((const char *)payload), &(mesg[k].data)); } printf("check message tdo\n"); checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQUSER", strlen("AQUSER"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); k=0; while (k < iters) { enq_size = batch_size; checkerr(errhp, OCIAQEnqArray(svchp, errhp, (dvoid *)"AQUSER.MY_QUEUE", (OCIAQEnqOptions *)0, &enq_size, 0, mesg_tdo, (dvoid **)&mesgp, (dvoid **)&nmesgp, 0, 0, 0, 0)); k+=batch_size; } checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT)); return 0;}
sword OCIAQListen (OCISvcCtx *svchp, OCIError *errhp, OCIAQAgent **agent_list, ub4 num_agents, sb4 wait, OCIAQAgent **agent, ub4 flags);
This call listens on one or more queues on behalf of a list of agents.
See Also: "OCIAQListen()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
Example C-8 Listening for Single-Consumer Queues with Zero Timeout (OCI)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ void SetAgent(agent, appname, queue,errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
Example C-9 Listening for Single-Consumer Queues with 120 Second Timeout (OCI)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ /* void SetAgent(agent, appname, queue) */ void SetAgent(agent, appname, queue,errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
sword OCIAQDeq ( OCISvcCtx *svch, OCIError *errh, text *queue_name, OCIAQDeqOptions *dequeue_options, OCIAQMsgProperties *message_properties, OCIType *payload_tdo, dvoid **payload, dvoid **payload_ind, OCIRaw **msgid, ub4 flags );
This call is used for an Oracle Streams AQ dequeue. Dequeue examples are in "Enqueuing Messages".
See Also: "OCIAQDeq()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
sword OCIAQDeqArray ( OCISvcCtx *svchp, OCIError *errhp, OraText *queue_name, OCIAQDeqOptions *deqopt, ub4 *iters, OCIAQMsgProperties **msgprop, OCIType *payload_tdo, dvoid **payload, dvoid **payload_ind, OCIRaw **msgid, dvoid *ctxp, OCICallbackAQDeq (cbfp) ( dvoid *ctxp, dvoid **payload, dvoid **payload_ind ), ub4 flags );
This call dequeues an array of messages from a queue. The array of messages is all dequeued with the same option and has the same queue table payload column TDO.
See Also: "OCIAQDeqArray()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
Example C-10 Array Dequeuing from a Queue of Type Message (OCI)
struct message{ OCIString *data; }; typedef struct message message; struct null_message{ OCIInd null_adt; OCIInd null_data; }; typedef struct null_message null_message; int main(argc, argv) int argc; char **argv;{ OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; dvoid *tmp; message *mesgp[NMESGS]; int i, j, k; null_message *nmesgp[NMESGS]; ub4 priority = 0; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 iters = 2; OCIType *mesg_tdo = (OCIType *) 0; ub4 batch_size = 2; ub4 deq_size = batch_size; printf("session start\n"); /* establish a session */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); printf("server attach\n"); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* get descriptor for dequeue options */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); printf("deq options set\n"); /* set dequeue options - for consumer name, wait and navigation */ checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"SUB1", (ub4)strlen("SUB1"), OCI_ATTR_CONSUMER_NAME, errhp)); for (k=0 ; k < NMESGS ; k++) { mesgp[k] = 0; nmesgp[k] = 0; } printf("check message tdo\n"); checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQUSER", strlen("AQUSER"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); k=0; while (k < iters) { deq_size = batch_size; checkerr(errhp, OCIAQDeqArray(svchp, errhp, (text *)"AQUSER.MY_QUEUE", (OCIAQDeqOptions *)deqopt, &deq_size, 0, mesg_tdo, (dvoid **)mesgp, (dvoid **)nmesgp, 0, 0, 0, 0)); k+=batch_size; } checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT)); return 0;}
ub4 OCISubscriptionRegister ( OCISvcCtx *svchp, OCISubscription **subscrhpp, ub2 count, OCIError *errhp ub4 mode );
This call registers a callback for message notification.
This interface is only valid for the asynchronous mode of message delivery. In this mode, a subscriber applies a registration call which specifies a callback. When messages are received that match the subscription criteria, the callback is invoked. The callback can then apply an explicit message_receive
(dequeue) to retrieve the message.
The user must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_AQ
.
See Also: "OCISubscriptionRegister()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
Example C-11 Registering for Notifications (OCI)
/* OCISubscriptionRegister can be used by the client to register to receive notifications when messages are enqueued. */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static OCIEnv *envhp; static OCIServer *srvhp; static OCIError *errhp; static OCISvcCtx *svchp; /* The callback that gets invoked on notification */ ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; /* subscription handle */ dvoid *pay; /* payload */ ub4 payl; /* payload length */ dvoid *desc; /* the AQ notification descriptor */ ub4 mode; { text *subname; ub4 size; ub4 *number = (ub4 *)ctx; text *queue; text *consumer; OCIRaw *msgid; OCIAQMsgProperties *msgprop; (*number)++; /* Get the subscription name */ OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, (dvoid *)&subname, &size, OCI_ATTR_SUBSCR_NAME, errhp); printf("got notification number %d for %.*s %d \n", *number, size, subname, payl); /* Get the queue name from the AQ notify descriptor */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, OCI_ATTR_QUEUE_NAME, errhp); /* Get the consumer name for which this notification was received */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, OCI_ATTR_CONSUMER_NAME, errhp); /* Get the message ID of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, OCI_ATTR_NFY_MSGID, errhp); /* Get the message properties of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, OCI_ATTR_MSG_PROP, errhp); } int main(argc, argv) int argc; char *argv[]; { OCISession *authp = (OCISession *) 0; /* The subscription handles */ OCISubscription *subscrhp[5]; /* Registrations are for AQ namespace */ ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; /* The context fot the callback */ ub4 ctx[5] = {0,0,0,0,0}; printf("Initializing OCI Process\n"); /* The OCI Process Environment must be initialized with OCI_EVENTS */ /* OCI_OBJECT flag is set to enable us dequeue */ (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0 ); printf("Initialization successful\n"); /* The standard OCI setup */ printf("Initializing OCI Env\n"); (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 ); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0); /* Server contexts */ (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX, (size_t) 0, (dvoid **) 0); printf("connecting to server\n"); (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0); printf("connect successful\n"); /* Set attribute server context in the service context */ (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp); (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "scott", (ub4) strlen("scott"), (ub4) OCI_ATTR_USERNAME, errhp); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin ( svchp, errhp, authp, OCI_CRED_RDBMS, (ub4) OCI_DEFAULT)); (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) authp, (ub4) 0, (ub4) OCI_ATTR_SESSION, errhp); /* Setting the subscription handle for notification on a NORMAL single-consumer queue */ printf("allocating subscription handle\n"); subscrhp[0] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); printf("setting subscription context \n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a NORMAL multiconsumer consumer queue */ subscrhp[1] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.MCQ1:APP1", (ub4) strlen("SCOTT.MCQ1:APP1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);
ub4 OCISubscriptionEnable ( OCISubscription *subscrhp, OCIError *errhp ub4 mode );
This call enables a subscription registration that has been disabled. This turns on all notifications.
See Also: "OCISubscriptionEnable()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
ub4 OCISubscriptionUnRegister ( OCISvcCtx *svchp, OCISubscription *subscrhp, OCIError *errhp ub4 mode );
This call unregisters a subscription, which turns off notifications.
See Also: "OCISubscriptionUnRegister()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
ub4 OCISubscriptionPost ( OCISvcCtx *svchp, OCISubscription **subscrhpp, ub2 count, OCIError *errhp ub4 mode );
This call posts to a subscription which allows all clients who are registered for the subscription to get notifications.
See Also: "OCISubscriptionPost()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes |
You must set up the following data structures for certain examples to work:
CONNECT system/manager; DROP USER aqadm CASCADE; GRANT DBA TO aqadm; CREATE USER aqadm IDENTIFIED BY aqadm; GRANT EXECUTE ON DBMS_AQADM TO aqadm; GRANT Aq_administrator_role TO aqadm; DROP USER aq CASCADE; CREATE USER aq IDENTIFIED BY aq; GRANT DBA TO aq; GRANT EXECUTE ON dbms_aq TO aq; EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'aq.qtable', queue_payload_type => 'RAW'); EXECUTE DBMS_AQADM.CREATE_QUEUE( queue_name => 'aq.aqsqueue', queue_table => 'aq.qtable'); EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'aq.aqsqueue');
The following example illustrates how to deploy Oracle Streams AQ with XA.
Example C-12 Deploying Oracle Streams AQ with XA
/* * The program uses the XA interface to enqueue 100 messages and then * dequeue them. * Login: aq/aq * Requires: AQ_USER_ROLE to be granted to aq * a RAW queue called "aqsqueue" to be created in aqs schema * (preceding steps can be performed by running aqaq.sql) * Message Format: Msgno: [0-1000] HELLO, WORLD! * Author: schandra@us.oracle.com */ #ifndef OCI_ORACLE #include <oci.h> #endif #include <xa.h> /* XA open string */ char xaoinfo[] = "oracle_xa+ACC=P/AQ/AQ+SESTM=30+Objects=T"; /* template for generating XA XIDs */ XID xidtempl = { 0x1e0a0a1e, 12, 8, "GTRID001BQual001" }; /* Pointer to Oracle XA function table */ extern struct xa_switch_t xaosw; /* Oracle XA switch */ static struct xa_switch_t *xafunc = &xaosw; /* dummy stubs for ax_reg and ax_unreg */ int ax_reg(rmid, xid, flags) int rmid; XID *xid; long flags; { xid->formatID = -1; return 0; } int ax_unreg(rmid, flags) int rmid; long flags; { return 0; } /* generate an XID */ void xidgen(xid, serialno) XID *xid; int serialno; { char seq [11]; sprintf(seq, "%d", serialno); memcpy((void *)xid, (void *)&xidtempl, sizeof(XID)); strncpy((&xid->data[5]), seq, 3); } /* check if XA operation succeeded */ #define checkXAerr(action, funcname) \ if ((action) != XA_OK) \ { \ printf("%s failed!\n", funcname); \ exit(-1); \ } else /* check if OCI operation succeeded */ static void checkOCIerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; if (status == OCI_ERROR) { OCIErrorGet((dvoid *) errhp, 1, (text *)0, &errcode, errbuf, (ub4)sizeof(errbuf), OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); } else printf("Error - %d\n", status); exit (-1); } void main(argc, argv) int argc; char **argv; { int msgno = 0; /* message being enqueued */ OCIEnv *envhp; /* OCI environment handle */ OCIError *errhp; /* OCI Error handle */ OCISvcCtx *svchp; /* OCI Service handle */ char message[128]; /* message buffer */ ub4 mesglen; /* length of message */ OCIRaw *rawmesg = (OCIRaw *)0; /* message in OCI RAW format */ OCIInd ind = 0; /* OCI null indicator */ dvoid *indptr = (dvoid *)&ind; /* null indicator pointer */ OCIType *mesg_tdo = (OCIType *) 0; /* TDO for RAW datatype */ XID xid; /* XA's global transaction id */ ub4 i; /* array index */ checkXAerr(xafunc->xa_open_entry(xaoinfo, 1, TMNOFLAGS), "xaoopen"); svchp = xaoSvcCtx((text *)0); /* get service handle from XA */ envhp = xaoEnv((text *)0); /* get enviornment handle from XA */ if (!svchp || !envhp) { printf("Unable to obtain OCI Handles from XA!\n"); exit (-1); } OCIHandleAlloc((dvoid *)envhp, (dvoid **)&errhp, OCI_HTYPE_ERROR, 0, (dvoid **)0); /* allocate error handle */ /* enqueue 1000 messages, 1 message for each XA transaction */ for (msgno = 0; msgno < 1000; msgno++) { sprintf((const char *)message, "Msgno: %d, Hello, World!", msgno); mesglen = (ub4)strlen((const char *)message); xidgen(&xid, msgno); /* generate an XA xid */ checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart"); checkOCIerr(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *)message, mesglen, &rawmesg)); if (!mesg_tdo) /* get Type descriptor (TDO) for RAW type */ checkOCIerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQADM", strlen("AQADM"), (CONST text *)"RAW", strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); checkOCIerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"aqsqueue", 0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 0, 0)); checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend"); checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit"); printf("%s Enqueued\n", message); } /* dequeue 1000 messages within one XA transaction */ xidgen(&xid, msgno); /* generate an XA xid */ checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart"); for (msgno = 0; msgno < 1000; msgno++) { checkOCIerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"aqsqueue", 0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 0, 0)); if (ind) printf("Null Raw Message"); else for (i = 0; i < OCIRawSize(envhp, rawmesg); i++) printf("%c", *(OCIRawPtr(envhp, rawmesg) + i)); printf("\n"); } checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend"); checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit"); }
You must set up the following data structures for certain examples to work:
/* Create_types.sql */ CONNECT system/manager GRANT AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE TO scott; CONNECT scott/tiger CREATE TYPE MESSAGE AS OBJECT (id NUMBER, data VARCHAR2(80)); EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'qt', queue_payload_type => 'message'); EXECUTE DBMS_AQADM.CREATE_QUEUE('msgqueue', 'qt'); EXECUTE DBMS_AQADM.START_QUEUE('msgqueue');
The following examples illustrate Oracle Streams AQ memory usage:
Example C-13 Enqueuing Messages, Freeing Memory After Every Call (OCI)
This program, enqnoreuse.c
, dequeues each line of text from a queue 'msgqueue
' that has been created in the scott
schema using create_types.sql
. Messages are enqueued using enqnoreuse.c
or enqreuse.c
(see the following). If there are no messages, then it waits for 60 seconds before timing out. In this program, the dequeue subroutine does not reuse client side objects' memory. It allocates the required memory before dequeue and frees it after the dequeue is complete.
#ifndef OCI_ORACLE #include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void deqmesg(text *buf, ub4 *buflen); OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; static void deqmesg(buf, buflen) text *buf; ub4 *buflen; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ message *mesg = (dvoid *)0; /* instance of SCOTT.MESSAGE */ null_message *mesgind = (dvoid *)0; /* null indicator */ OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = 60; /* timeout after 60 seconds */ ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Allocate an instance of SCOTT.MESSAGE, and get its null indicator: */ checkerr(errhp, OCIObjectNew(envhp, errhp, svchp, OCI_TYPECODE_OBJECT, mesgtdo, (dvoid *)0, OCI_DURATION_SESSION, TRUE, (dvoid **)&mesg)); checkerr(errhp, OCIObjectGetInd(envhp, errhp, (dvoid *)mesg, (dvoid **)&mesgind)); /* Allocate a descriptor for dequeue options and set wait time, navigation: */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp)); /* Dequeue the message and commit: */ checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", deqopt, 0, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); /* Copy the message payload text into the user buffer: */ if (mesgind->null_data) *buflen = 0; else memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), (size_t)(*buflen = OCIStringSize(envhp, mesg->data))); /* Free the dequeue options descriptor: */ checkerr(errhp, OCIDescriptorFree((dvoid *)deqopt, OCI_DTYPE_AQDEQ_OPTIONS)); /* Free the memory for the objects: */ Checkerr(errhp, OCIObjectFree(envhp, errhp, (dvoid *)mesg, OCI_OBJECTFREE_FORCE)); } /* end deqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* payload text */ ub4 buflen; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* Set attribute server context in the service context: */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* Allocate a user context handle: */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); do { deqmesg(buf, &buflen); printf("%.*s\n", buflen, buf); } while(1); } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
Example C-14 Enqueuing Messages, Reusing Memory (OCI)
This program, enqreuse.c
, enqueues each line of text into a queue 'msgqueue' that has been created in the scott
schema by executing create_types.sql
. Each line of text entered by the user is stored in the queue until user enters EOF
. In this program the enqueue subroutine reuses the memory for the message payload, as well as the Oracle Streams AQ message properties descriptor.
#ifndef OCI_ORACLE #include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void enqmesg(ub4 msgno, text *buf); struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; /* Global data reused on calls to enqueue: */ OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; message msg; null_message nmsg; OCIAQMsgProperties *msgprop; static void enqmesg(msgno, buf) ub4 msgno; text *buf; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ message *mesg = &msg; /* instance of SCOTT.MESSAGE */ null_message *mesgind = &nmsg; /* null indicator */ text corrid[128]; /* correlation identifier */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Fill in the attributes of SCOTT.MESSAGE: */ checkerr(errhp, OCINumberFromInt(errhp, &msgno, sizeof(ub4), 0, &mesg->id)); checkerr(errhp, OCIStringAssignText(envhp, errhp, buf, strlen(buf), &mesg->data)); mesgind->null_adt = mesgind->null_id = mesgind->null_data = 0; /* Set the correlation id in the message properties descriptor: */ sprintf((char *)corrid, "Msg#: %d", msgno); checkerr(errhp, OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&corrid, strlen(corrid), OCI_ATTR_CORRELATION, errhp)); /* Enqueue the message and commit: */ checkerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"msgqueue", 0, msgprop, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); } /* end enqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* user supplied text */ int msgno = 0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* Set attribute server context in the service context: */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* Allocate a user context handle: */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* Allocate a message properties descriptor to fill in correlation ID :*/ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0)); do { printf("Enter a line of text (max 80 chars):"); if (!gets((char *)buf)) break; enqmesg((ub4)msgno++, buf); } while(1); /* Free the message properties descriptor: */ checkerr(errhp, OCIDescriptorFree((dvoid *)msgprop, OCI_DTYPE_AQMSG_PROPERTIES)); } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
Example C-15 Dequeuing Messages, Freeing Memory After Every Call (OCI)
This program, deqnoreuse.c
, dequeues each line of text from a queue 'msgqueue
' that has been created in the scott
schema by executing create_types.sql
. Messages are enqueued using enqnoreuse
or enqreuse
. If there are no messages, then it waits for 60 seconds before timing out. In this program the dequeue subroutine does not reuse client side objects' memory. It allocates the required memory before dequeue and frees it after the dequeue is complete.
#ifndef OCI_ORACLE #include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void deqmesg(text *buf, ub4 *buflen); OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; static void deqmesg(buf, buflen) text *buf; ub4 *buflen; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ message *mesg = (dvoid *)0; /* instance of SCOTT.MESSAGE */ null_message *mesgind = (dvoid *)0; /* null indicator */ OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = 60; /* timeout after 60 seconds */ ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Allocate an instance of SCOTT.MESSAGE, and get its null indicator: */ checkerr(errhp, OCIObjectNew(envhp, errhp, svchp, OCI_TYPECODE_OBJECT, mesgtdo, (dvoid *)0, OCI_DURATION_SESSION, TRUE, (dvoid **)&mesg)); checkerr(errhp, OCIObjectGetInd(envhp, errhp, (dvoid *)mesg, (dvoid **)&mesgind)); /* Allocate a descriptor for dequeue options and set wait time, navigation: */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp)); /* Dequeue the message and commit: */ checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", deqopt, 0, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); /* Copy the message payload text into the user buffer: */ if (mesgind->null_data) *buflen = 0; else memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), (size_t)(*buflen = OCIStringSize(envhp, mesg->data))); /* Free the dequeue options descriptor: */ checkerr(errhp, OCIDescriptorFree((dvoid *)deqopt, OCI_DTYPE_AQDEQ_OPTIONS)); /* Free the memory for the objects: */ checkerr(errhp, OCIObjectFree(envhp, errhp, (dvoid *)mesg, OCI_OBJECTFREE_FORCE)); } /* end deqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* payload text */ ub4 buflen; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* Set attribute server context in the service context: */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* Allocate a user context handle: */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); do { deqmesg(buf, &buflen); printf("%.*s\n", buflen, buf); } while(1); } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */
Example C-16 Dequeuing Messages, Reusing Memory (OCI)
This program, deqreuse.c
, dequeues each line of text from a queue 'msgqueue
' that has been created in the scott
schema by executing create_types.sql
. Messages are enqueued using enqnoreuse.c
or enqreuse.c
. If there are no messages, then it waits for 60 seconds before timing out. In this program, the dequeue subroutine reuses client side objects' memory between invocation of OCIAQDeq
.
During the first call to OCIAQDeq
, OCI automatically allocates the memory for the message payload.
During subsequent calls to OCIAQDeq
, the same payload pointers are passed and OCI automatically resizes the payload memory if necessary.
#ifndef OCI_ORACLE
#include <oci.h> #endif #include <stdio.h> static void checkerr(OCIError *errhp, sword status); static void deqmesg(text *buf, ub4 *buflen); struct message { OCINumber id; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_id; OCIInd null_data; }; typedef struct null_message null_message; /* Global data reused on calls to enqueue: */ OCIEnv *envhp; OCIError *errhp; OCISvcCtx *svchp; OCIAQDeqOptions *deqopt; message *mesg = (message *)0; null_message *mesgind = (null_message *)0; static void deqmesg(buf, buflen) text *buf; ub4 *buflen; { OCIType *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */ ub4 wait = 60; /* timeout after 60 seconds */ ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */ /* Get the type descriptor object for the type SCOTT.MESSAGE: */ checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SCOTT", strlen("SCOTT"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesgtdo)); /* Set wait time, navigation in dequeue options: */ checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp)); checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp)); /* * Dequeue the message and commit. The memory for the payload is * automatically allocated/resized by OCI: */ checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", deqopt, 0, mesgtdo, (dvoid **)&mesg, (dvoid **)&mesgind, 0, 0)); checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); /* Copy the message payload text into the user buffer: */ if (mesgind->null_data) *buflen = 0; else memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), (size_t)(*buflen = OCIStringSize(envhp, mesg->data))); } /* end deqmesg */ void main() { OCIServer *srvhp; OCISession *usrhp; dvoid *tmp; text buf[80]; /* payload text */ ub4 buflen; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* set attribute server context in the service context */ OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* allocate the dequeue options descriptor */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); do { deqmesg(buf, &buflen); printf("%.*s\n", buflen, buf); } while(1); /* * This program never reaches this point as the dequeue times out & exits. * If it does reach here, it is a good place to free the dequeue * options descriptor using OCIDescriptorFree and free the memory allocated * by OCI for the payload using OCIObjectFree */ } /* end main */ static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; switch (status) { case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; default: printf("Error - %d\n", status); break; } exit(-1); } /* end checkerr */