bonjour mes amis,
j'ai un problème avec Queue sur oracle,
pour commencer, je vais vous expliquer ce que je vue faire.
En générale je vue faire une communication entre 2 Queues, chaque Queue se trouve dans une autre shemat, ces 2 shemats sont liés entre eux par un dblink:
voila mes sources:
les Shemats
--CONNECT sys/xxxxxxx as sysdba
CREATE USER aq_admin_plsql IDENTIFIED BY aq_admin_plsql
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
ALTER USER aq_admin_plsql QUOTA UNLIMITED ON users;
GRANT aq_administrator_role TO aq_admin_plsql;
GRANT connect TO aq_admin_plsql;
GRANT create type TO aq_admin_plsql;
GRANT create sequence TO aq_admin_plsql;
GRANT create procedure TO aq_admin_plsql;
GRANT execute ON dbms_aq TO aq_admin_plsql;
GRANT create table TO aq_admin_plsql;
EXECUTE dbms_aqadm.grant_type_access('aq_admin_plsql');
-- -------------------------------------------------------
DROP USER aq_user_plsql CASCADE;
CREATE USER aq_user_plsql IDENTIFIED BY aq_user_plsql
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
ALTER USER aq_user_plsql QUOTA UNLIMITED ON users;
GRANT aq_administrator_role TO aq_user_plsql;
--GRANT aq_user_role TO aq_user_plsql;
GRANT connect TO aq_user_plsql;
GRANT create procedure TO aq_user_plsql;
GRANT execute ON dbms_aq TO aq_user_plsql;
GRANT create DATABASE LINK TO aq_user_plsql;
GRANT create type TO aq_user_plsql;
GRANT create sequence TO aq_user_plsql;
EXECUTE dbms_aqadm.grant_type_access('aq_user_plsql');
- pour aq_admin_plsql
la déclaration des types et SEQUENCE .
création du Queue et table Queue--CONNECT aq_admin_plsql/aq_admin_plsql
CREATE TYPE message_type AS OBJECT (
message_id NUMBER(15)
, subject VARCHAR2(100)
, text VARCHAR2(100)
, dollar_value NUMBER(4,2)
)
/
CREATE SEQUENCE message_seq
INCREMENT BY 1
START WITH 1000
NOMAXVALUE
NOCYCLE;
la création de la procedure p_dequeue--CONNECT aq_admin_plsql/aq_admin_plsql
SET SERVEROUTPUT ON
BEGIN
-- -------------------------------------------------------
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'aq_admin_plsql.msg_qt'
, queue_payload_type => 'aq_admin_plsql.message_type'
, multiple_consumers=>TRUE
);
-- -------------------------------------------------------
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'msg_queue'
, queue_table => 'aq_admin_plsql.msg_qt'
, queue_type => DBMS_AQADM.NORMAL_QUEUE
, max_retries => 0
, retry_delay => 0
, retention_time => 1209600
, dependency_tracking => FALSE
, comment => 'Test Object Type Queue'
, auto_commit => FALSE
);
-- -------------------------------------------------------
DBMS_AQADM.START_QUEUE('msg_queue');
-- -------------------------------------------------------
DBMS_AQADM.GRANT_QUEUE_PRIVILEGE (
privilege => 'ALL'
, queue_name => 'aq_admin_plsql.msg_queue'
, grantee => 'aq_user_plsql'
, grant_option => FALSE
);
-- -------------------------------------------------------
END;
/
-------------------------------------
create table output_table(text varchar2(4000));
---------------------------------------------------------------------------------
-- Now, add the receiving subscriber to the queue. Note, that these can be left "as is", they're more of a formality than anything
BEGIN
dbms_aqadm.add_subscriber
( queue_name => 'msg_queue',
subscriber => sys.aq$_agent('PAYMENT_GATEWAY', NULL, NULL) );
END;
/
en fin register the subscriber / procedure combination
CREATE OR REPLACE
PROCEDURE p_dequeue ( context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number)
as
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message message_type;
BEGIN
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
/* Add the INSERT into an output table. This is so we can see it working.
In reality, the logic to execute the process would be here */
INSERT INTO output_table VALUES ( message.text );
COMMIT;
END;
/
BEGIN
dbms_aq.register
( sys.aq$_reg_info_list (
sys.aq$_reg_info ( 'msg_queue: PAYMENT_GATEWAY',
dbms_aq.namespace_aq,
'plsql://p_dequeue',
HEXTORAW('FF')
)
),
1
);
END;
/
- pour aq_user_plsql
la déclaration des types et SEQUENCE .
création du Queue et table QueueCREATE TYPE message_type AS OBJECT (
message_id NUMBER(15)
, subject VARCHAR2(100)
, text VARCHAR2(100)
, dollar_value NUMBER(4,2)
)
/
CREATE SEQUENCE message_seq
INCREMENT BY 1
START WITH 1000
NOMAXVALUE
NOCYCLE;
--CONNECT aq_admin_plsql/aq_admin_plsql
SET SERVEROUTPUT ON
BEGIN
-- -------------------------------------------------------
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'aq_user_plsql.msg_qt2'
, queue_payload_type => 'aq_user_plsql.message_type'
, multiple_consumers=>TRUE
);
-- -------------------------------------------------------
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'msg_queue2'
, queue_table => 'aq_user_plsql.msg_qt2'
, queue_type => DBMS_AQADM.NORMAL_QUEUE
, max_retries => 0
, retry_delay => 0
, retention_time => 1209600
, dependency_tracking => FALSE
, comment => 'Test Object Type Queue'
, auto_commit => FALSE
);
-- -------------------------------------------------------
DBMS_AQADM.START_QUEUE('msg_queue2');
-- -------------------------------------------------------
END;
/
création de DBLINK
création de la procedure P_ENQUEUECREATE DATABASE LINK "MY_LINK" CONNECT TO "aq_admin_plsql" IDENTIFIED by "aq_admin_plsql" USING
'(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = JEHA.hps.int)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = XE)
)
)'
CREATE OR REPLACE
PROCEDURE p_enqueue(msg IN VARCHAR2)
AS
-- PRAGMA AUTONOMOUS_TRANSACTION;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aq_user_plsql.message_type;
message_id NUMBER;
v_recipients DBMS_AQ.aq$_recipient_list_t;
v_recipient VARCHAR2(20);
v_user_destination VARCHAR2(50);
v_dblink_destination VARCHAR2(50);
BEGIN
-- -------------------------------------------------------
v_recipient:='PAYMENT_GATEWAY';
v_user_destination:='aq_admin_plsql';
v_dblink_destination:='MY_LINK';
v_recipients(1) := SYS.aq$_agent( v_recipient, v_user_destination || '.msg_qt@' || v_dblink_destination, null);
message_properties.recipient_list := v_recipients;
SELECT aq_user_plsql.message_seq.nextval
INTO message_id
FROM dual;
-- -------------------------------------------------------
message := AQ_USER_PLSQL.MESSAGE_TYPE (
message_id
, 'Subject: EXAMPLE MESSAGE'
, 'Message: THIS IS A SAMPLE MESSAGE.'
, 10.2
);
-- -------------------------------------------------------
enqueue_options.VISIBILITY := DBMS_AQ.ON_COMMIT;
enqueue_options.SEQUENCE_DEVIATION := null;
-- -------------------------------------------------------
message_properties.PRIORITY := -5;
message_properties.DELAY := DBMS_AQ.NO_DELAY;
message_properties.EXPIRATION := DBMS_AQ.NEVER;
message_properties.CORRELATION := 'TEST MESSAGE';
-- -------------------------------------------------------
DBMS_AQ.ENQUEUE (
queue_name => 'aq_user_plsql.msg_queue2'
, enqueue_options => enqueue_options
, message_properties => message_properties
, payload => message
, msgid => message_handle
);
-- -------------------------------------------------------
COMMIT;
END;
/
------------------------------------------------------------------
en fin le résulta doit eter à la fin dans la table output_table dans le shemat aq_admin_plsql.
SELECT * FROM aq_admin_plsql.output_table@PG_TO_PCARD_LINK;
apres l'execution de la procedure
les recordes sont dans la table Queue :--CONNECT aq_user_plsql/aq_user_plsql
EXEC p_enqueue('THIS IS A TEST');
select * from msg_qt2
mais ils ne sont pas transférés à la Queue msg_qt pour être enregistre dans la table output_table !!!!!!!!!!!!!!!!
dans la requête :
j'ai ces champs vides :
Code : Sélectionner tout - Visualiser dans une fenêtre à part select * from msg_qt2
sender_protocol
sender_address
sender_name
dequeue_msgid
donc le champ sender_address ne contient pas la valeur aq_admin_plsql.msg_qt@MY_LINK
merci mes amis pour votre aide (les enregistrement ne sont pas transférer à l'autre Queue aq_admin_plsql.msg_qt)
Partager