Oracle9i Streams Release 2 (9.2) Part Number A96571-02 |
|
|
View PDF |
This chapter illustrates a messaging environment that can be constructed using Streams.
This chapter contains these topics:
This example illustrates using a single SYS.AnyData
queue at a database called oedb.net
to create a Streams messaging environment in which events containing message payloads of different types are stored in the same queue. Specifically, this example illustrates the following messaging features of Streams:
SYS.Anydata
events into the queueSYS.Anydata
events into the queueFigure 19-1 provides an overview of this environment.
The following are prerequisites that must be completed before you begin the example in this section.
AQ_TM_PROCESSES
: This parameter establishes queue monitor processes. Values from 1
to 10
specify the number of queue monitor processes created to monitor the messages. If AQ_TM_PROCESSES
is not specified or is set to 0
, then the queue monitor processes are not created. In this example, AQ_TM_PROCESSES
should be set to at least 1
.
Setting the parameter to 1
or more starts the specified number of queue monitor processes. These queue monitor processes are responsible for managing time-based operations of messages such as delay and expiration, cleaning up retained messages after the specified retention time, and cleaning up consumed messages if the retention time is 0.
COMPATIBLE
: This parameter must be set to 9.2.0
or higher.oedb.net
database from the client where you run these scripts.
strmadmin
) and prompts you for the tablespace you want to use for this user's data. Before you start this example, either create a new tablespace or identify an existing tablespace for the Streams administrator to use. The Streams administrator should not use the SYSTEM
tablespace.Complete the following steps to set up users and create a Streams queue for a Streams messaging environment.
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_setup_message.out /*
Connect to oedb.net
as SYS
user.
*/ CONNECT SYS/CHANGE_ON_INSTALL@oedb.net AS SYSDBA /*
This example uses the oe
sample schema. For this example to work properly, the oe
user must have privileges to execute the subprograms in the DBMS_AQ
package. The oe
user will be specified as the queue user when the Streams queue is created in Step 3. The SET_UP_QUEUE
procedure will grant the oe user ENQUEUE
and DEQUEUE
privileges on the queue, but the oe
user also needs EXECUTE
privilege on the DBMS_AQ
package to enqueue events into and dequeue events from the queue.
Also, most of the configuration and administration actions illustrated in this example are performed by the Streams administrator. In this step, create the Streams administrator named strmadmin
and grant this user the necessary privileges. These privileges enable the user to execute subprograms in packages related to Streams, create rule sets, create rules, and monitor the Streams environment by querying data dictionary views. You may choose a different name for this user.
*/ GRANT EXECUTE ON DBMS_AQ TO oe; GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin IDENTIFIED BY strmadminpw; ACCEPT streams_tbs PROMPT 'Enter the tablespace for the Streams administrator: ' ALTER USER strmadmin DEFAULT TABLESPACE &streams_tbs QUOTA UNLIMITED ON &streams_tbs; GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin; GRANT EXECUTE ON DBMS_AQ TO strmadmin; GRANT EXECUTE ON DBMS_AQADM TO strmadmin; GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / /*
Connect as the Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Run the SET_UP_QUEUE
procedure to create a queue named oe_queue
at oedb.net
. This queue will function as the Streams queue by holding events used in the messaging environment.
Running the SET_UP_QUEUE
procedure performs the following actions:
oe_queue_table
. This queue table is owned by the Streams administrator (strmadmin
) and uses the default storage of this user.oe_queue
owned by the Streams administrator (strmadmin
)*/ BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_queue_table', queue_name => 'oe_queue'); END; / /*
*/ BEGIN SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE( privilege => 'ALL', queue_name => 'strmadmin.oe_queue', grantee => 'oe'); END; / /*
Create an agent that will be used to perform explicit enqueue operations on the oe_queue
queue.
*/ BEGIN SYS.DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'explicit_enq'); END; / /*
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue
queue is a secure queue because it was created using SET_UP_QUEUE
. This step enables the oe
user to perform enqueue operations on this queue.
*/ BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'explicit_enq', db_username => 'oe'); END; / /*
Check the streams_setup_message.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to create one PL/SQL procedure that enqueues non-LCR events into the Streams queue and one PL/SQL procedure that enqueues row LCR events into the Streams queue.
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_enqprocs_message.out /*
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Create a type to represent orders based on the columns in the oe.orders
table. The type attributes include the columns in the oe.orders
table, along with one extra attribute named action
. The value of the action
attribute for instances of this type will be used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type will be used for events that will be enqueued into the Streams queue.
*/ CREATE OR REPLACE TYPE order_event_typ AS OBJECT ( order_id NUMBER(12), order_date TIMESTAMP(6) WITH LOCAL TIME ZONE, order_mode VARCHAR2(8), customer_id NUMBER(6), order_status NUMBER(2), order_total NUMBER(8,2), sales_rep_id NUMBER(6), promotion_id NUMBER(6), action VARCHAR(7)); / /*
Create a type to represent customers based on the columns in the oe.customers
table. The type attributes include the columns in the oe.customers
table, along with one extra attribute named action
. The value of the action
attribute for instances of this type will be used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type will be used for events that will be enqueued into the Streams queue.
*/ CREATE OR REPLACE TYPE customer_event_typ AS OBJECT ( customer_id NUMBER(6), cust_first_name VARCHAR2(20), cust_last_name VARCHAR2(20), cust_address CUST_ADDRESS_TYP, phone_numbers PHONE_LIST_TYP, nls_language VARCHAR2(3), nls_territory VARCHAR2(30), credit_limit NUMBER(9,2), cust_email VARCHAR2(30), account_mgr_id NUMBER(6), cust_geo_location MDSYS.SDO_GEOMETRY, action VARCHAR(7)); / /*
Create a PL/SQL procedure called enq_proc
to enqueue events into the Streams queue.
Note: A single enqueued message can be dequeued by an apply process and by an explicit dequeue, but this example does not illustrate this capability. |
*/ CREATE OR REPLACE PROCEDURE oe.enq_proc (event IN SYS.Anydata) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_eventid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT( 'explicit_enq', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue', enqueue_options => enqopt, message_properties => mprop, payload => event, msgid => enq_eventid); END; / /*
Create a procedure called enq_row_lcr
that constructs a row LCR and then enqueues the row LCR into the queue.
See Also:
Oracle9i Supplied PL/SQL Packages and Types Reference for more information about LCR constructors |
*/ CREATE OR REPLACE PROCEDURE oe.enq_row_lcr( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); -- Construct the LCR based on information passed to procedure row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue the created row LCR DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(row_lcr), msgid => enq_msgid); END enq_row_lcr; / /*
Check the streams_enqprocs_message.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to configure an apply process to apply the user-enqueued events in the Streams queue.
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_apply_message.out /*
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Create a function called get_oe_action
to determine the value of the action
attribute in the events in the queue. This function is used in rules later in this example to determine the value of the action
attribute for an event. Then, the clients of the rules engine perform the appropriate action for the event (either dequeue by apply process or explicit dequeue). In this example, the clients of the rules engine are the apply process and the oe.explicit_dq
PL/SQL procedure.
*/ CREATE OR REPLACE FUNCTION oe.get_oe_action (event IN SYS.Anydata) RETURN VARCHAR2 IS ord oe.order_event_typ; cust oe.customer_event_typ; num NUMBER; type_name VARCHAR2(61); BEGIN type_name := event.GETTYPENAME; IF type_name = 'OE.ORDER_EVENT_TYP' THEN num := event.GETOBJECT(ord); RETURN ord.action; ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN num := event.GETOBJECT(cust); RETURN cust.action; ELSE RETURN NULL; END IF; END; / /*
Create a message handler called mes_handler
that will be used as a message handler by the apply process. This procedure takes the payload in a user-enqueued event of type oe.order_event_typ
or oe.customer_event_typ
and inserts it as a row in the oe.orders
table and oe.customers
table, respectively.
*/ CREATE OR REPLACE PROCEDURE oe.mes_handler (event SYS.AnyData) IS ord oe.order_event_typ; cust oe.customer_event_typ; num NUMBER; type_name VARCHAR2(61); BEGIN type_name := event.GETTYPENAME; IF type_name = 'OE.ORDER_EVENT_TYP' THEN num := event.GETOBJECT(ord); INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date, ord.order_mode, ord.customer_id, ord.order_status, ord.order_total, ord.sales_rep_id, ord.promotion_id); ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN num := event.GETOBJECT(cust); INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name, cust.cust_last_name, cust.cust_address, cust.phone_numbers, cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email, cust.account_mgr_id, cust.cust_geo_location); END IF; END; / /*
*/ GRANT EXECUTE ON get_oe_action TO strmadmin; GRANT EXECUTE ON mes_handler TO strmadmin; /*
Connect as the Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Create the evaluation context for the rule set. The table alias is tab
in this example, but you can use a different table alias name if you wish.
*/ DECLARE table_alias SYS.RE$TABLE_ALIAS_LIST; BEGIN table_alias := SYS.RE$TABLE_ALIAS_LIST(SYS.RE$TABLE_ALIAS( 'tab', 'strmadmin.oe_queue_table')); DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT( evaluation_context_name => 'oe_eval_context', table_aliases => table_alias); END; / /*
Create the rule set for the apply process.
*/ BEGIN DBMS_RULE_ADM.CREATE_RULE_SET( rule_set_name => 'apply_oe_rs', evaluation_context => 'strmadmin.oe_eval_context'); END; / /*
Create a rule that evaluates to TRUE
if the action
value of an event
is apply
. Notice that tab.user_data
is passed to the oe.get_oe_action
function. The tab.user_data
column holds the event payload in a queue table. The table alias for the queue table was specified as tab
in Step 5.
*/ BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.apply_action', condition => ' oe.get_oe_action(tab.user_data) = ''APPLY'' '); END; / /*
Create a rule that evaluates to TRUE
if the event in the queue is a row LCR that changes either the oe.orders
table or the oe.customers
table. This rule will enable the apply process to apply user-enqueued changes to the tables directly. For convenience, this rule uses the Oracle-supplied evaluation context SYS.STREAMS$_EVALUATION_CONTEXT
because the rule is used to evaluate LCRs. When this rule is added to the rule set, this evaluation context is used for the rule during evaluation instead of the rule set's evaluation context.
*/ BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'apply_lcrs', condition => ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' || ' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' || ':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ', evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT'); END; / /*
Add the rules created in Step 7 and Step 8 to the rule set created in Step 6.
*/ BEGIN DBMS_RULE_ADM.ADD_RULE( rule_name => 'apply_action', rule_set_name => 'apply_oe_rs'); DBMS_RULE_ADM.ADD_RULE( rule_name => 'apply_lcrs', rule_set_name => 'apply_oe_rs'); END; / /*
Create an apply process that is associated with the oe_queue
, that uses the apply_oe_rs
rule set, and that uses the mes_handler
procedure as a message handler.
*/ BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strmadmin.oe_queue', apply_name => 'apply_oe', rule_set_name => 'strmadmin.apply_oe_rs', message_handler => 'oe.mes_handler', apply_user => 'oe', apply_captured => false); END; / /*
Grant EXECUTE
privilege on the strmadmin.apply_oe_rs
rule set. Because oe
was specified as the apply user when the apply process was created in Step 10, oe
needs execute privilege on the rule set used by the apply process.
*/ BEGIN DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE( privilege => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET, object_name => 'strmadmin.apply_oe_rs', grantee => 'oe', grant_option => FALSE); END; / /*
Set the disable_on_error
parameter to n
so that the apply process is not disabled if it encounters an error, and start the apply process at oedb.net
.
*/ BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_oe', parameter => 'disable_on_error', value => 'n'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_oe'); END; / /*
Check the streams_apply_message.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to configure explicit dequeue of messages based on message contents.
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_explicit_dq.out /*
Connect as the Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Create an agent that will be used to perform explicit dequeue operations on the oe_queue
queue.
*/ BEGIN SYS.DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'explicit_dq'); END; / /*
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue
queue is a secure queue because it was created using SET_UP_QUEUE
. The oe
user will be able to perform dequeue operations on this queue when the agent is used to create a subscriber to the queue in the next step.
*/ BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'explicit_dq', db_username => 'oe'); END; / /*
Add a subscriber to the oe_queue
queue. This subscriber will perform explicit dequeues of events. A subscriber rule is used to dequeue any events where the action
value is not apply
. If the action value is apply
for an event, then the event is ignored by the subscriber. Such events are dequeued and processed by the apply process.
*/ DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('explicit_dq', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_queue', subscriber => subscriber, rule => 'oe.get_oe_action(tab.user_data) != ''APPLY'''); END; / /*
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Create a PL/SQL procedure called explicit_dq
to dequeue events explicitly using the subscriber created in Step 4.
*/ CREATE OR REPLACE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; msgid RAW(16); payload SYS.AnyData; new_messages BOOLEAN := TRUE; ord oe.order_event_typ; cust oe.customer_event_typ; tc pls_integer; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_messages) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_queue', dequeue_options => deqopt, message_properties => mprop, payload => payload, msgid => msgid); COMMIT; deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('Event Dequeued'); DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName); IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN tc := payload.GetObject(ord); DBMS_OUTPUT.PUT_LINE('order_id - ' || ord.order_id); DBMS_OUTPUT.PUT_LINE('order_date - ' || ord.order_date); DBMS_OUTPUT.PUT_LINE('order_mode - ' || ord.order_mode); DBMS_OUTPUT.PUT_LINE('customer_id - ' || ord.customer_id); DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status); DBMS_OUTPUT.PUT_LINE('order_total - ' || ord.order_total); DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id); DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id); END IF; IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN tc := payload.GetObject(cust); DBMS_OUTPUT.PUT_LINE('customer_id - ' || cust.customer_id); DBMS_OUTPUT.PUT_LINE('cust_first_name - ' || cust.cust_first_name); DBMS_OUTPUT.PUT_LINE('cust_last_name - ' || cust.cust_last_name); DBMS_OUTPUT.PUT_LINE('street_address - ' || cust.cust_address.street_address); DBMS_OUTPUT.PUT_LINE('postal_code - ' || cust.cust_address.postal_code); DBMS_OUTPUT.PUT_LINE('city - ' || cust.cust_address.city); DBMS_OUTPUT.PUT_LINE('state_province - ' || cust.cust_address.state_province); DBMS_OUTPUT.PUT_LINE('country_id - ' || cust.cust_address.country_id); DBMS_OUTPUT.PUT_LINE('phone_number1 - ' || cust.phone_numbers(1)); DBMS_OUTPUT.PUT_LINE('phone_number2 - ' || cust.phone_numbers(2)); DBMS_OUTPUT.PUT_LINE('phone_number3 - ' || cust.phone_numbers(3)); DBMS_OUTPUT.PUT_LINE('nls_language - ' || cust.nls_language); DBMS_OUTPUT.PUT_LINE('nls_territory - ' || cust.nls_territory); DBMS_OUTPUT.PUT_LINE('credit_limit - ' || cust.credit_limit); DBMS_OUTPUT.PUT_LINE('cust_email - ' || cust.cust_email); DBMS_OUTPUT.PUT_LINE('account_mgr_id - ' || cust.account_mgr_id); END IF; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more events'); END; END LOOP; END; / /*
Check the streams_explicit_dq.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to enqueue non-LCR events and row LCR events into the queue.
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_enq_deq.out /*
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Enqueue events with apply
for the action
value. Based on the apply process rules, the apply process will dequeue and process these events with the oe.mes_handler
message handler procedure created in "Create a Message Handler". The COMMIT
after the enqueues makes these two enqueues part of the same transaction. An enqueued message is not visible until the session that enqueued it commits the enqueue.
*/ BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ( 2500,'05-MAY-01','online',117,3,44699,161,NULL,'APPLY'))); END; / BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ( 990,'Hester','Prynne',oe.cust_address_typ('555 Beacon Street','Boston', 'MA',02109,'US'),oe.phone_list_typ('+1 617 123 4104', '+1 617 083 4381', '+1 617 742 5813'),'i','AMERICA',5000,'a@scarlet_letter.com',145, NULL,'APPLY'))); END; / COMMIT; /*
Enqueue events with dequeue
for the action
value. The oe.explicit_dq
procedure created in "Create a Procedure to Dequeue Events Explicitly" will dequeue these events because the action
is not apply
. Based on the apply process rules, the apply process will ignore these events. The COMMIT
after the enqueues makes these two enqueues part of the same transaction.
*/ BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ( 2501,'22-JAN-00','direct',117,3,22788,161,NULL,'DEQUEUE'))); END; / BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ( 991,'Nick','Carraway',oe.cust_address_typ('10th Street', 11101,'Long Island','NY','US'),oe.phone_list_typ('+1 718 786 2287', '+1 718 511 9114', '+1 718 888 4832'),'i','AMERICA',3000, 'nick@great_gatsby.com',149,NULL,'DEQUEUE'))); END; / COMMIT; /*
Enqueue row LCR events. The apply process will apply these events directly. Enqueued LCRs should commit at transaction boundaries. In this step, a COMMIT
statement is run after each enqueue, making each enqueue a separate transaction. However, you can perform multiple LCR enqueues before a commit if there is more than one LCR in a transaction.
Create a row LCR that inserts a row into the oe.orders
table.
*/ DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newunit4 SYS.LCR$_ROW_UNIT; newunit5 SYS.LCR$_ROW_UNIT; newunit6 SYS.LCR$_ROW_UNIT; newunit7 SYS.LCR$_ROW_UNIT; newunit8 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_ID', SYS.AnyData.ConvertNumber(2502), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'ORDER_DATE', SYS.AnyData.ConvertTimestampLTZ('04-NOV-00'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT( 'ORDER_MODE', SYS.AnyData.ConvertVarchar2('online'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit4 := SYS.LCR$_ROW_UNIT( 'CUSTOMER_ID', SYS.AnyData.ConvertNumber(145), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit5 := SYS.LCR$_ROW_UNIT( 'ORDER_STATUS', SYS.AnyData.ConvertNumber(3), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit6 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL', SYS.AnyData.ConvertNumber(35199), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit7 := SYS.LCR$_ROW_UNIT( 'SALES_REP_ID', SYS.AnyData.ConvertNumber(160), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit8 := SYS.LCR$_ROW_UNIT( 'PROMOTION_ID', SYS.AnyData.ConvertNumber(1), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4, newunit5,newunit6,newunit7,newunit8); oe.enq_row_lcr( source_dbname => 'OEDB.NET', cmd_type => 'INSERT', obj_owner => 'OE', obj_name => 'ORDERS', old_vals => NULL, new_vals => newvals); END; / COMMIT; /*
Create a row LCR that updates the row inserted into the oe.orders
table previously.
*/ DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; newunit1 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_ID', SYS.AnyData.ConvertNumber(2502), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL', SYS.AnyData.ConvertNumber(35199), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); newunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL', SYS.AnyData.ConvertNumber(5235), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); oe.enq_row_lcr( source_dbname => 'OEDB.NET', cmd_type => 'UPDATE', obj_owner => 'OE', obj_name => 'ORDERS', old_vals => oldvals, new_vals => newvals); END; / COMMIT; /*
Check the streams_enq_deq.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to dequeue the events explicitly and query the events that were applied by the apply process. These events were enqueued in the "Enqueue Events".
Run the procedure you created in "Create a Procedure to Dequeue Events Explicitly" and specify the consumer of the events you want to dequeue. In this case, the consumer is the subscriber you added in "Add a Subscriber to the oe_queue Queue". In this example, events that are not dequeued explicitly by this procedure are dequeued by the apply process.
CONNECT oe/oe@oedb.net SET SERVEROUTPUT ON SIZE 100000 EXEC oe.explicit_dq('explicit_dq');
You should see the non-LCR events that were enqueued in "Enqueue Non-LCR Events to be Dequeued Explicitly".
Query the oe.orders
and oe.customers
table to see the rows corresponding to the events applied by the apply process:
SELECT * FROM oe.orders WHERE order_id = 2500; SELECT cust_first_name, cust_last_name, cust_email FROM oe.customers WHERE customer_id = 990; SELECT * FROM oe.orders WHERE order_id = 2502;
You should see the non-LCR event that was enqueued in "Enqueue Non-LCR Events to be Dequeued by the Apply Process" and the row LCR events that were enqueued in "Enqueue Row LCR Events to be Dequeued by the Apply Process".
This example enqueues non-LCR events and row LCR events into the queue using JMS. Then, this example dequeues these events from the queue using JMS.
Complete the following steps:
For this example to complete successfully, the LCR schema must be loaded into the SYS
schema using the catxlcr.sql
script in Oracle home in the rdbms/admin/
directory. Run this script now if it has not been run already.
For example, if your Oracle home directory is /usr/oracle
, then enter the following to run the script:
CONNECT SYS/CHANGE_ON_INSTALL AS SYSDBA @/usr/oracle/rdbms/admin/catxlcr.sql
CONNECT oe/oe CREATE TYPE address AS OBJECT (street VARCHAR (30), num NUMBER) / CREATE TYPE person AS OBJECT (name VARCHAR (30), home ADDRESS) /
The following jar and zip files should be in the CLASSPATH
based on the release of JDK you are using.
Also, make sure LD_LIBRARY_PATH
(Solaris) or PATH
(Windows NT) has $ORACLE_HOME/lib
set.
-- For JDK1.3.x $ORACLE_HOME/jdbc/lib/classes12.zip $ORACLE_HOME/rdbms/jlib/aqapi13.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar $ORACLE_HOME/jlib/jndi.jar -- For JDK1.2.x $ORACLE_HOME/jdbc/lib/classes12.zip $ORACLE_HOME/rdbms/jlib/aqapi12.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar $ORACLE_HOME/jlib/jndi.jar -- For JDK1.1.x $ORACLE_HOME/jdbc/lib/classes111.zip $ORACLE_HOME/rdbms/jlib/aqapi11.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar $ORACLE_HOME/jlib/jndi.jar
First, create a file input.typ
with the following lines:
SQL PERSON AS JPerson SQL ADDRESS AS JAddress
Then, run Jpublisher.
jpub -input=input.typ -user=OE/OE
Completing these actions generates two Java classes named JPerson
and JAddress
for the person
and address
types, respectively.
This program uses the Oracle JMS API to publish messages into a Streams topic.
This program does the following:
import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import oracle.xdb.*; public class StreamsEnq { public static void main (String args []) throws java.sql.SQLException, ClassNotFoundException, JMSException { TopicConnectionFactory tc_fact= null; TopicConnection t_conn = null; TopicSession t_sess = null; try { if (args.length < 3 ) System.out.println("Usage:java filename [SID] [HOST] [PORT]"); else { /* Create the TopicConnectionFactory * Only the JDBC OCI driver can be used to access Streams through JMS */ tc_fact = AQjmsFactory.getTopicConnectionFactory( args[1], args[0], Integer.parseInt(args[2]), "oci8"); t_conn = tc_fact.createTopicConnection( "OE","OE"); /* Create a Topic Session */ t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* Start the connection */ t_conn.start() ; /* Publish non-LCR based messages */ publishUserMessages(t_sess); /* Publish LCR based messages */ publishLcrMessages(t_sess); t_sess.close() ; t_conn.close() ; System.out.println("End of StreamsEnq Demo") ; } } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } /* * publishUserMessages - this method publishes an ADT message and a * JMS text message to a streams topic */ public static void publishUserMessages(TopicSession t_sess) throws Exception { Topic topic = null; TopicPublisher t_pub = null; JPerson pers = null; JAddress addr = null; TextMessage t_msg = null; AdtMessage adt_msg = null; AQjmsAgent agent = null; AQjmsAgent[] recipList = null; try { /* Get the topic */ topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); /* Create a publisher */ t_pub = t_sess.createPublisher(topic); /* Agent to access oe_queue */ agent = new AQjmsAgent("explicit_enq", null); /* Create a PERSON adt message */ adt_msg = ((AQjmsSession)t_sess).createAdtMessage(); pers = new JPerson(); addr = new JAddress(); addr.setNum(new java.math.BigDecimal(500)); addr.setStreet("Oracle Pkwy"); pers.setName("Mark"); pers.setHome(addr); /* Set the payload in the message */ adt_msg.setAdtPayload(pers); ((AQjmsMessage)adt_msg).setSenderID(agent); System.out.println("Publish message 1 -type PERSON\n"); /* Create the recipient list */ recipList = new AQjmsAgent[1]; recipList[0] = new AQjmsAgent("explicit_dq", null); /* Publish the message */ ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList); t_sess.commit(); t_msg = t_sess.createTextMessage(); t_msg.setText("Test message"); t_msg.setStringProperty("color", "BLUE"); t_msg.setIntProperty("year", 1999); ((AQjmsMessage)t_msg).setSenderID(agent); System.out.println("Publish message 2 -type JMS TextMessage\n"); /* Publish the message */ ((AQjmsTopicPublisher)t_pub).publish(topic, t_msg, recipList); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); } } /* * publishLcrMessages - this method publishes an XML LCR message to a * streams topic */ public static void publishLcrMessages(TopicSession t_sess) throws Exception { Topic topic = null; TopicPublisher t_pub = null; XMLType xml_lcr = null; AdtMessage adt_msg = null; AQjmsAgent agent = null; StringBuffer lcr_data = null; AQjmsAgent[] recipList = null; java.sql.Connection db_conn = null; try { /* Get the topic */ topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); /* Create a publisher */ t_pub = t_sess.createPublisher(topic); /* Get the JDBC connection */ db_conn = ((AQjmsSession)t_sess).getDBConnection(); /* Agent to access oe_queue */ agent = new AQjmsAgent("explicit_enq", null); /* Create a adt message */ adt_msg = ((AQjmsSession)t_sess).createAdtMessage(); /* Create the LCR representation in XML */ lcr_data = new StringBuffer(); lcr_data.append("<ROW_LCR "); lcr_data.append("xmlns='http://xmlns.oracle.com/streams/schemas/lcr' \n"); lcr_data.append("xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' \n"); lcr_data.append("xsi:schemaLocation='http://xmlns.oracle.com/streams/schemas/lcr http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd'"); lcr_data.append("> \n"); lcr_data.append("<source_database_name>source_dbname</source_database_name> \n"); lcr_data.append("<command_type>INSERT</command_type> \n"); lcr_data.append("<object_owner>Ram</object_owner> \n"); lcr_data.append("<object_name>Emp</object_name> \n"); lcr_data.append("<tag>0ABC</tag> \n"); lcr_data.append("<transaction_id>0.0.0</transaction_id> \n"); lcr_data.append("<scn>0</scn> \n"); lcr_data.append("<old_values> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C01</column_name> \n"); lcr_data.append("<data><varchar2>Clob old</varchar2></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C02</column_name> \n"); lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C03</column_name> \n"); lcr_data.append("<data> \n"); lcr_data.append("<date><value>1997-11-24</value><format>SYYYY-MM-DD</format></date> \n"); lcr_data.append("</data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C04</column_name> \n"); lcr_data.append("<data> \n"); lcr_data.append("<timestamp><value>1999-05-31T13:20:00.000</value><format>SYYYY-MM-DD'T'HH24:MI: SS.FF</format></timestamp> \n"); lcr_data.append("</data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C05</column_name> \n"); lcr_data.append("<data><raw>ABCDE</raw></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("</old_values> \n"); lcr_data.append("<new_values> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C01</column_name> \n"); lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C02</column_name> \n"); lcr_data.append("<data><number>35.23</number></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C03</column_name> \n"); lcr_data.append("<data><number>-100000</number></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C04</column_name> \n"); lcr_data.append("<data><varchar>Hel lo</varchar></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C05</column_name> \n"); lcr_data.append("<data><char>wor ld</char></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("</new_values> \n"); lcr_data.append("</ROW_LCR>"); /* Create the XMLType containing the LCR */ xml_lcr = oracle.xdb.XMLType.createXML(db_conn, lcr_data.toString()); /* Set the payload in the message */ adt_msg.setAdtPayload(xml_lcr); ((AQjmsMessage)adt_msg).setSenderID(agent); System.out.println("Publish message 3 - XMLType containing LCR ROW\n"); /* Create the recipient list */ recipList = new AQjmsAgent[1]; recipList[0] = new AQjmsAgent("explicit_dq", null); /* Publish the message */ ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); } } }
This program uses Oracle JMS API to receive messages from a Streams topic.
This program does the following:
person
, address
and XMLType
in JMS typemapimport oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import oracle.xdb.*; import java.sql.SQLException; public class StreamsDeq { public static void main (String args []) throws java.sql.SQLException, ClassNotFoundException, JMSException { TopicConnectionFactory tc_fact= null; TopicConnection t_conn = null; TopicSession t_sess = null; try { if (args.length < 3 ) System.out.println("Usage:java filename [SID] [HOST] [PORT]"); else { /* Create the TopicConnectionFactory * Only the JDBC OCI driver can be used to access Streams through JMS */ tc_fact = AQjmsFactory.getTopicConnectionFactory( args[1], args[0], Integer.parseInt(args[2]), "oci8"); t_conn = tc_fact.createTopicConnection( "OE","OE"); /* Create a Topic Session */ t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* Start the connection */ t_conn.start() ; receiveMessages(t_sess); t_sess.close() ; t_conn.close() ; System.out.println("\nEnd of StreamsDeq Demo") ; } } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } /* * receiveMessages -This method receives messages from the Streams queue */ public static void receiveMessages(TopicSession t_sess) throws Exception { Topic topic = null; JPerson pers = null; JAddress addr = null; XMLType xtype = null; TextMessage t_msg = null; AdtMessage adt_msg = null; Message jms_msg = null; TopicReceiver t_recv = null; int i = 0; java.util.Dictionary map= null; try { /* Get the topic */ topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); /* Create a TopicReceiver to receive messages for consumer "jms_recv */ t_recv = ((AQjmsSession)t_sess).createTopicReceiver(topic, "jms_recv", null); map = ((AQjmsSession)t_sess).getTypeMap(); /* Register mappings for ADDRESS and PERSON in the JMS typemap */ map.put("OE.PERSON", Class.forName("JPerson")); map.put("OE.ADDRESS", Class.forName("JAddress")); /* Register mapping for XMLType in the TypeMap - required for LCRs */ map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLTypeFactory")); System.out.println("Receive messages ...\n"); do { try { jms_msg = (t_recv.receive(10)); i++; /* Set navigation mode to NEXT_MESSAGE */ ((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_MESSAGE); } catch (JMSException jms_ex2) { if((jms_ex2.getLinkedException() != null) && (jms_ex2.getLinkedException() instanceof SQLException)) { SQLException sql_ex2 =(SQLException)(jms_ex2.getLinkedException()); /* End of current transaction group * Use NEXT_TRANSACTION navigation mode */ if(sql_ex2.getErrorCode() == 25235) { ((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_TRANSACTION); continue; } else throw jms_ex2; } else throw jms_ex2; } if(jms_msg == null) { System.out.println("\nNo more messages"); } else { if(jms_msg instanceof AdtMessage) { adt_msg = (AdtMessage)jms_msg; System.out.println("Retrieved message " + i + ": " + adt_msg.getAdtPayload()); if(adt_msg.getAdtPayload() instanceof JPerson) { pers =(JPerson)( adt_msg.getAdtPayload()); System.out.println("PERSON: Name: " + pers.getName()); } else if(adt_msg.getAdtPayload() instanceof JAddress) { addr =(JAddress)( adt_msg.getAdtPayload()); System.out.println("ADDRESS: Street" + addr.getStreet()); } else if(adt_msg.getAdtPayload() instanceof oracle.xdb.XMLType) { xtype = (XMLType)adt_msg.getAdtPayload(); System.out.println("XMLType: Data: \n" + xtype.getStringVal()); } System.out.println("Msg id: " + adt_msg.getJMSMessageID()); System.out.println(); } else if(jms_msg instanceof TextMessage) { t_msg = (TextMessage)jms_msg; System.out.println("Retrieved message " + i + ": " + t_msg.getText()); System.out.println("Msg id: " + t_msg.getJMSMessageID()); System.out.println(); } else System.out.println("Invalid message type"); } } while (jms_msg != null); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); t_sess.rollback(); } catch (java.sql.SQLException sql_ex) { System.out.println("SQL Exception: " + sql_ex); sql_ex.printStackTrace(); t_sess.rollback(); } } }
javac StreamsEnq.java StreamsDeq.java JPerson.java JAddress.java
java StreamsEnq ORACLE_SID HOST PORT
For example, if your Oracle SID is orc182
, your host is hq_server
, and your port is 1521
, then enter the following:
java StreamsEnq orcl82 hq_server 1521
java StreamsDeq ORACLE_SID HOST PORT
For example, if your Oracle SID is orc182
, your host is hq_server
, and your port is 1520
, then enter the following:
java StreamsDeq orcl82 hq_server 1521