RE: HOW TO, AQ between 2 dbs

  • From: "Leonard, George" <GLeonard@xxxxxxxxxxxxx>
  • To: <Oracle-L@xxxxxxxxxxxxx>
  • Date: Sat, 19 Feb 2005 13:30:30 +0200

got it working,
 
just a case of being saturday, clear brain and starting with a clean slate.
 
below is the solutions for anyone interested.
 
George
 
PS: got to give credit where it is due, as usual http://asktome.oracle.com
 
 
 
-- build from example 3, to be used with db links.
-- the following sits on the xspif side
-- xspif1
connect aqfinexc/aqfinexc@lab
begin
 DBMS_AQADM.UNSCHEDULE_PROPAGATION (
  Queue_Name  => 'aqfinexc_queue',
  Destination => 'aqfinexc.lab1');
 commit;
end;
/
begin
  DBMS_AQADM.stop_queue('aqfinexc_queue');
  DBMS_AQADM.drop_queue('aqfinexc_queue');
  DBMS_AQADM.drop_queue_table('aqfinexc.aqfinexc_queue_qtab');
end;
/
connect aqfinexc/aqfinexc@lab1
begin
  DBMS_AQADM.stop_queue('aqfinexc_queue');
  DBMS_AQADM.drop_queue('aqfinexc_queue');
  DBMS_AQADM.drop_queue_table('aqfinexc.aqfinexc_queue_qtab');
end;
/
connect system/manager@lab1
drop user aqfinexc cascade;
CREATE USER aqfinexc IDENTIFIED BY aqfinexc;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aqfinexc;
GRANT EXECUTE ON dbms_aq TO aqfinexc;
GRANT EXECUTE ON dbms_aqadm TO aqfinexc;
alter user aqfinexc quota unlimited on users;
begin
  dbms_aqadm.grant_system_privilege('ENQUEUE_ANY', 'aqfinexc', FALSE);
  dbms_aqadm.grant_system_privilege('DEQUEUE_ANY', 'aqfinexc', FALSE);
end;
/
-- dspif1
connect system/manager@lab
drop user aqfinexc cascade;
create user aqfinexc identified by aqfinexc;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aqfinexc;
GRANT EXECUTE ON dbms_aq TO aqfinexc;
GRANT EXECUTE ON dbms_aqadm TO aqfinexc;
alter user aqfinexc quota unlimited on users;
begin
  dbms_aqadm.grant_system_privilege('ENQUEUE_ANY', 'aqfinexc', FALSE);
  dbms_aqadm.grant_system_privilege('DEQUEUE_ANY', 'aqfinexc', FALSE);
end;
/

connect aqfinexc/aqfinexc@lab1
CREATE type aqfinexc.fin_msg_typ as object(
 seq_num number,
 name  varchar2(64),
 amt  number
);
/

-- ADDED: Setup additional queue to propagate messages to
begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table    => 
'aqfinexc.aqfinexc_queue_qtab',
                                queue_payload_type => 'aqfinexc.fin_msg_typ',
                                multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE(queue_name  => 'aqfinexc_queue',
                          queue_table  => 'aqfinexc.aqfinexc_queue_qtab');
  DBMS_AQADM.START_QUEUE(queue_name => 'aqfinexc_queue');
end;
/
create table aqfinexc.messages (
 seq_num  number,
 name   varchar2(64),
 amt   number
) tablespace users
/
create or replace procedure notifyCB( context  raw
                                      ,reginfo  sys.aq$_reg_info
                                      ,descr  sys.aq$_descriptor
                                      ,payload  raw
                                      ,payloadl number)
as
 dequeue_options   dbms_aq.dequeue_options_t;
 message_properties  dbms_aq.message_properties_t;
 message_handle   RAW(16);
 message      aqfinexc.fin_msg_typ;
BEGIN
   dequeue_options.msgid    := descr.msg_id;
   dequeue_options.consumer_name := descr.consumer_name;
   DBMS_AQ.DEQUEUE(queue_name    => descr.queue_name,
                   dequeue_options   => dequeue_options,
                   message_properties  => message_properties,
                   payload     => message,
                   msgid      => message_handle);
   insert into messages (seq_num, name, amt) values
   ( message.seq_num,  message.name  , message.amt);
   COMMIT;
END;
/
begin
 dbms_aqadm.add_subscriber (
  queue_name => 'aqfinexc.aqfinexc_queue',
  subscriber => sys.aq$_agent( 'recipient', null, null )
 );
end;
/
-- Tell te queue about our queue processing procedure
BEGIN
 dbms_aq.register(
    sys.aq$_reg_info_list(
       sys.aq$_reg_info('aqfinexc.aqfinexc_queue:RECIPIENT'
                          ,DBMS_AQ.NAMESPACE_AQ
                          ,'plsql://aqfinexc.notifyCB'
                          ,HEXTORAW('FF')
   )
  ), 1
 );
end;
/
 
connect aqfinexc/aqfinexc@lab
CREATE type aqfinexc.fin_msg_typ as object(
 seq_num number,
 name  varchar2(64),
 amt  number
);
/
begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table    => 
'aqfinexc.aqfinexc_queue_qtab',
                                queue_payload_type => 'aqfinexc.fin_msg_typ',
                                multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE(queue_name  => 'aqfinexc_queue',
                          queue_table  => 'aqfinexc.aqfinexc_queue_qtab');
  DBMS_AQADM.START_QUEUE(queue_name => 'aqfinexc_queue');
end;
/

create or replace procedure enqueue_msg(p_seq_num in number,
              p_name in varchar2,
              p_amt in number,
                                        p_add in varchar2 default null)
as
  enqueue_options   dbms_aq.enqueue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle    RAW(16);
  message      aqfinexc.fin_msg_typ;
  recipients     DBMS_AQ.aq$_recipient_list_t;
BEGIN
  -- ADDED
  -- SMD: here's where the parameter is used
  recipients(1) := SYS.aq$_agent('RECIPIENT', p_add, null);
  message_properties.recipient_list := recipients;
  message := fin_msg_typ( p_seq_num, p_name, p_amt );
  dbms_aq.enqueue(queue_name     => 'aqfinexc_queue',
                  enqueue_options   => enqueue_options,
                  message_properties  => message_properties,
                  payload      => message,
                  msgid      => message_handle);
end;
/

-- ADDED: Create loopback database link
create database link aqfinexc.lab1 connect to aqfinexc identified by aqfinexc
using 'lab1.wesbank.co.za';

-- ADDED: Setup scheduling for messages
begin DBMS_AQADM.Schedule_Propagation(Queue_Name  => 'aqfinexc_queue',
                                      Destination => 'aqfinexc.lab1',
                                      Start_Time  => sysdate,
                                      Latency     => 0);
end;
/
-- SMD: this msgs is meant for the aqfinexc_queue queue and WILL BE propagated.
begin
 enqueue_msg(1, 'test 1', 2, 'aqfinexc.aqfinexc_queue@xxxxxxxxxxxxx');
 commit;
end;
/
-- Check scheduling: Neither error nor action reported
select * from user_queue_schedules;
----------------------------------
I think this is what you're looking for:
-- local
select count(*) cnt from aqfinexc_queue_qtab
/
-- local and remote
select t1.cnt, t2.cnt
      from (select count(*) cnt from aqfinexc_queue_qtab) t1,
           (select count(*) cnt from aqfinexc_queue_qtab@xxxxxxxxxxxxx) t2
/


________________________________

From: oracle-l-bounce@xxxxxxxxxxxxx on behalf of Leonard, George
Sent: Fri 2005/02/18 07:18 PM
To: Oracle-L@xxxxxxxxxxxxx
Subject: FW: HOW TO, AQ between 2 dbs





below is the code as i got it at the moment,

i am pretty sure it is correct, but then it is friday night this side,

as far as i can see it has a roblem with the destination, guessing something to 
do with the Schedule_Propagation and the dblink ...

thanks.

George



-- xspif1
connect system@lab
drop user aqfinexc cascade;
CREATE USER aqfinexc IDENTIFIED BY aqfinexc;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aqfinexc;
GRANT EXECUTE ON dbms_aq TO aqfinexc;
GRANT EXECUTE ON dbms_aqadm TO aqfinexc;
alter user aqfinexc quota unlimited on users;

begin
  dbms_aqadm.grant_system_privilege('ENQUEUE_ANY', 'aqfinexc', FALSE);
  dbms_aqadm.grant_system_privilege('DEQUEUE_ANY', 'aqfinexc', FALSE);
end;
/
-- dspif1
connect system/manager@lab1
drop user aqpdes cascade;
create user aqpdes identified by aqpdes;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aqpdes;
GRANT EXECUTE ON dbms_aq TO aqpdes;
GRANT EXECUTE ON dbms_aqadm TO aqpdes;
alter user aqpdes quota unlimited on users;
begin
  dbms_aqadm.grant_system_privilege('ENQUEUE_ANY', 'aqpdes', FALSE);
  dbms_aqadm.grant_system_privilege('DEQUEUE_ANY', 'aqpdes', FALSE);
end;
/

connect aqfinexc/aqfinexc@lab
-- message that will be send to fin system
CREATE type aqfinexc.fin_msg_typ as object(
 seq_num number,
 name   VARCHAR2(30),
 amt  number
);
/
-- look at maybe trying to rename the queue table to something else, more 
relevant to the system
begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table    => 'aqfinexc.objmsgs80_qtab',
                                queue_payload_type => 'aqfinexc.fin_msg_typ',
                                multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE(queue_name  => 'FIN_MSG_QUEUE',
                          queue_table  => 'aqfinexc.objmsgs80_qtab');
  DBMS_AQADM.START_QUEUE(queue_name => 'FIN_MSG_QUEUE');
end;
/

-- SMD: you'll see there is now a second parameter, as discussed above:
create or replace procedure fin_msg_enqueue(
          p_seqnum in number
          ,p_name  in varchar2
                              ,p_amt  in number
          ,p_add in varchar2 default null )
as
  enqueue_options   dbms_aq.enqueue_options_t;
  message_properties  dbms_aq.message_properties_t;
  message_handle    RAW(16);
  message      aqfinexc.fin_msg_typ;
  recipients     DBMS_AQ.aq$_recipient_list_t;
BEGIN
  -- ADDED
  -- SMD: here's where the parameter is used
  recipients(1) := SYS.aq$_agent('RECIPIENT', p_add, null);
  message_properties.recipient_list := recipients;
  message := fin_msg_typ(p_seqnum, p_name, p_amt);
  dbms_aq.enqueue(queue_name     => 'FIN_MSG_QUEUE',
                  enqueue_options   => enqueue_options,
                  message_properties  => message_properties,
                  payload      => message,
                  msgid      => message_handle);
end;
/

connect aqpdes/aqpdes@lab1
-- message that will be send to fin system
CREATE type aqpdes.fin_msg_typ as object(
 seq_num number,
 name   VARCHAR2(30),
 amt  number
);
/
-- look at maybe trying to rename the queue table to something else, more 
relevant to the system
begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table    => 'aqpdes.objmsgs80_qtab',
                                queue_payload_type => 'aqpdes.fin_msg_typ',
                                multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE(queue_name  => 'FIN_MSG_QUEUE',
                          queue_table  => 'aqpdes.objmsgs80_qtab');
  DBMS_AQADM.START_QUEUE(queue_name => 'FIN_MSG_QUEUE');
end;
/

-- this is where we will put the message once we retrieved it from the queue
create table message_table(
     seq_num number,
  name     varchar2(30),
  amt  number
) tablespace users;
-- this will take messages of the queue and process them as they arrive, no 
need to go and look at queue, this gets done automatically for us, FUNKY
create or replace procedure notifyCB( context  raw
                                      ,reginfo  sys.aq$_reg_info
                                      ,descr  sys.aq$_descriptor
                                      ,payload  raw
                                      ,payloadl number)
as
 dequeue_options   dbms_aq.dequeue_options_t;
 message_properties  dbms_aq.message_properties_t;
 message_handle   RAW(16);
 message      aqpdes.fin_msg_typ;
BEGIN
   dequeue_options.msgid    := descr.msg_id;
   dequeue_options.consumer_name := descr.consumer_name;
   DBMS_AQ.DEQUEUE(queue_name    => descr.queue_name,
                   dequeue_options   => dequeue_options,
                   message_properties  => message_properties,
                   payload     => message,
                   msgid      => message_handle);
 -- This might be a function call in production to pus the data into the 
relevat table via the valudation routines, might be idea to log to table anyhow 
for later reference
 -- with the processed date.
   insert into message_table (seq_num, name, amt) values
   ( message.seq_num, message.name, message.amt );
   COMMIT;
END;
/
begin
 dbms_aqadm.add_subscriber (
  queue_name => 'aqpdes.fin_msg_queue',
  subscriber => sys.aq$_agent( 'recipient', null, null )
 );
end;
/
-- Tell te queue about our queue processing procedure
BEGIN
 dbms_aq.register(
    sys.aq$_reg_info_list(
       sys.aq$_reg_info('aqpdes.fin_msg_queue:recipient'
                          ,DBMS_AQ.NAMESPACE_AQ
                          ,'plsql://aqpdes.notifyCB'
                          ,HEXTORAW('FF')
   )
  ), 1
 );
end;
/

connect aqfinexc/aqfinexc@lab
-- ADDED: Create loopback database link
create database link aqpdes connect to aqpdes identified by aqpdes
USING 'LAB1.WESBANK.CO.ZA';
/* when we need to do some cleaning
begin
 DBMS_AQADM.UNSCHEDULE_PROPAGATION (
    Queue_Name  => 'fin_msg_queue',
    Destination => 'aqfinexc.aqpdes');
end;
/
*/

-- ADDED: Setup scheduling for messages
begin DBMS_AQADM.Schedule_Propagation(Queue_Name  => 'FIN_MSG_QUEUE',
                                      Destination => 'aqpdes',
                                      Start_Time  => sysdate,
                                      Latency     => 0);
end;
/

-- Check scheduling: Everything checked out OK.
select * from user_queue_schedules;
-- SMD: this msgs is meant for the MSG_QUEUEX queue and WILL BE propagated.
begin
 fin_msg_enqueue(1,'george',20, 'aqpdes.fin_msg_queue@xxxxxxxxxxxxxxx');
 commit;
end;
/
begin
 fin_msg_enqueue(2,'piet',25, 'aqpdes.fin_msg_queue@xxxxxxxxxxxxxxx');
 commit;
end;
/




________________________________

From: oracle-l-bounce@xxxxxxxxxxxxx on behalf of Leonard, George
Sent: Fri 2005/02/18 05:56 PM
To: oracle-l@xxxxxxxxxxxxx
Subject: HOW TO, AQ between 2 dbs



Hi all

hope someone can save me some time, (busy trying examples and jsut running into 
walls, sorry late on Friday guess brain is weekend mode already)


busy trying a POC atm regarding AQ.

DB=>A, user aqfinexc need to send msg to DB=>B, user aqpdes

as message is added to A's Queue i want to have it pushed over to B and will be 
colelcted that side by a dequeue process listening, got a example to do this.

does anyone have a script to set soemthing like this up already.

i got AQ working locally but now got to get it working from A=>B (from 
Asktom.oracle.com)

thanks.

George
___________________________________________________________________________________________________


The views expressed in this email are, unless otherwise stated, those of the 
author and not those
of the FirstRand Banking Group an Authorised Financial Service Provider or its 
management.
The information in this e-mail is confidential and is intended solely for the 
addressee.
Access to this e-mail by anyone else is unauthorised.
If you are not the intended recipient, any disclosure, copying, distribution or 
any action taken or
omitted in reliance on this, is prohibited and may be unlawful.
Whilst all reasonable steps are taken to ensure the accuracy and integrity of 
information and data
transmitted electronically and to preserve the confidentiality thereof, no 
liability or
responsibility whatsoever is accepted if information or data is, for whatever 
reason, corrupted
or does not reach its intended destination.

                               ________________________________

--
//www.freelists.org/webpage/oracle-l



___________________________________________________________________________________________________


The views expressed in this email are, unless otherwise stated, those of the 
author and not those
of the FirstRand Banking Group an Authorised Financial Service Provider or its 
management.
The information in this e-mail is confidential and is intended solely for the 
addressee.
Access to this e-mail by anyone else is unauthorised.
If you are not the intended recipient, any disclosure, copying, distribution or 
any action taken or
omitted in reliance on this, is prohibited and may be unlawful.
Whilst all reasonable steps are taken to ensure the accuracy and integrity of 
information and data
transmitted electronically and to preserve the confidentiality thereof, no 
liability or
responsibility whatsoever is accepted if information or data is, for whatever 
reason, corrupted
or does not reach its intended destination.

                               ________________________________

--
//www.freelists.org/webpage/oracle-l



___________________________________________________________________________________________________


The views expressed in this email are, unless otherwise stated, those of the 
author and not those
of the FirstRand Banking Group an Authorised Financial Service Provider or its 
management.
The information in this e-mail is confidential and is intended solely for the 
addressee.
Access to this e-mail by anyone else is unauthorised.
If you are not the intended recipient, any disclosure, copying, distribution or 
any action taken or 
omitted in reliance on this, is prohibited and may be unlawful.
Whilst all reasonable steps are taken to ensure the accuracy and integrity of 
information and data 
transmitted electronically and to preserve the confidentiality thereof, no 
liability or 
responsibility whatsoever is accepted if information or data is, for whatever 
reason, corrupted 
or does not reach its intended destination.

                               ________________________________

--
//www.freelists.org/webpage/oracle-l

Other related posts: