我正在处理从一个数据库到另一个数据库的aq传播,但是当我安排传播并将第一条消息排队到本地aq表时,dba_queue_schedules.last_error_msg“ora-25215:user_data type和queue type不匹配”。请注意,aq表中使用的两种对象类型是相同的,出于测试目的,我使用以下类型:

create or replace type LOCAL_OBJ_MSG as object(
    test varchar2(4000))
/

两个aq表也是一样的,我用相同的脚本创建了它们,只更改了名称,一个本地的,另一个远程的。本地aq表在本地数据库中,远程aq表在schema_name.remote数据库中。
下面是我用来创建aq表的脚本:
declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.stop_queue(
    queue_name => 'REMOTE_iTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.drop_queue('REMOTE_iTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
  dbms_aqadm.drop_queue_table('REMOTE_iTEST', force => true);
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.stop_queue(
    queue_name => 'REMOTE_oTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.drop_queue('REMOTE_oTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
  dbms_aqadm.drop_queue_table('REMOTE_oTEST', force => true);
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  l_exist number;
  cursor c_index is
    select 1 from user_objects u where u.OBJECT_NAME = upper('LOCAL_OBJ_MSG') and u.OBJECT_TYPE = 'TYPE';
begin
  open c_index;
  fetch c_index into l_exist;
  close c_index;

  if l_exist = 1 then
    execute immediate 'drop type LOCAL_OBJ_MSG';
  end if;
end;
/

create or replace type LOCAL_OBJ_MSG as object(
    test varchar2(4000))
/

begin
  dbms_aqadm.create_queue_table (
    queue_table => 'REMOTE_iTEST',
    queue_payload_type => 'LOCAL_OBJ_MSG',
    storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
    message_grouping => DBMS_AQADM.NONE,
    sort_list => 'ENQ_TIME',
    multiple_consumers => true,
    comment => 'Incoming TEST message table.');
end;
/

begin
  dbms_aqadm.create_queue (
    queue_table    => 'REMOTE_iTEST',
    queue_name     => 'REMOTE_iTEST',
    queue_type => sys.dbms_aqadm.normal_queue,
    retention_time => sys.dbms_aqadm.INFINITE,
    comment => 'Incoming TEST messages.',
    max_retries => 5);
end;
/

begin
  dbms_aqadm.start_queue(
    queue_name  => 'REMOTE_iTEST',
    dequeue     => true,
    enqueue     => true);
end;
/
begin
  dbms_aqadm.create_queue_table (
    queue_table => 'REMOTE_oTEST',
    queue_payload_type => 'LOCAL_OBJ_MSG',
    storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
    message_grouping => DBMS_AQADM.NONE,
    sort_list => 'ENQ_TIME',
    multiple_consumers => true,
    comment => 'Outgoing TEST message table.');
end;
/

begin
  dbms_aqadm.create_queue (
    queue_table    => 'REMOTE_oTEST',
    queue_name     => 'REMOTE_oTEST',
    queue_type => sys.dbms_aqadm.normal_queue,
    retention_time => sys.dbms_aqadm.INFINITE,
    comment => 'Outgoing TEST messages.',
    max_retries => 5);
end;
/

begin
  dbms_aqadm.start_queue(
    queue_name  => 'REMOTE_oTEST',
    dequeue     => TRUE,
    enqueue     => TRUE);
end;
/

下面是用于创建订阅服务器、出列过程等的脚本:
本地数据库:
begin
  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber(queue_name     => 'LOCAL_oTEST',
                            subscriber     => sys.aq$_agent(name     => 'LOCAL_oTEST_subscriber',
                                                            address  => 'SCHEMA_NAME.REMOTE_oTEST@DB_LINK_NAME',
                                                            protocol => 0),
                            queue_to_queue => true);
  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name        => 'LOCAL_oTEST',
                                  latency           => 0,
                                  destination       => 'DB_LINK_NAME',
                                  destination_queue => 'SCHEMA_NAME.REMOTE_oTEST');
end;
/

远程数据库:
-- Create a table to store the messages received.
create table sepa_omsg_aq_demo
  (received timestamp default systimestamp,
   message LOCAL_OBJ_MSG);

-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure REMOTE_CALLBACK_TEST
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload LOCAL_OBJ_MSG;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name,
                  dequeue_options => r_dequeue_options,
                  message_properties => r_message_properties,
                  payload => o_payload,
                  msgid => v_message_handle);
  insert into sepa_omsg_aq_demo
    (message)
    values (o_payload);
  commit;
exception
  when others then
    rollback;
end;
/

-- Register the procedure for dequeuing the messages received.
-- I'd like to point out that the subscriber is the one defined for the local database
begin
  dbms_aq.register (
     sys.aq$_reg_info_list(
        sys.aq$_reg_info('REMOTE_oTEST:LOCAL_oTEST_subscriber',
                         dbms_aq.namespace_aq,
                         'plsql://REMOTE_CALLBACK_TEST',
                         hextoraw('FF'))
                        ),
        1);
end;
/

sript将消息排队到本地aq表:
declare
  enq_msgid raw(16);
  eopt      dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;

  message local_obj_msg;
begin
  message := local_obj_msg('a');
  dbms_aq.enqueue(queue_name         => 'LOCAL_oTEST',
                  enqueue_options    => eopt,
                  message_properties => mprop,
                  payload            => message,
                  msgid              => enq_msgid);
  commit;
end;
/

还要注意,在sys.aq$_message_types表(local db)中,已验证创建传播的状态为“f”,并且:
declare
  rc binary_integer;
begin
  dbms_aqadm.verify_queue_types(src_queue_name  => 'local_otest',
                                dest_queue_name => 'schema_name.remote_otest',
                                rc              => rc,
                                destination     => 'db_link_name');
  dbms_output.put_line('Compatible: ' || rc);
end;
/

返回0,这意味着表类型不兼容。在我做了一些深入研究之后,我发现如果本地和远程数据库nls_length_语义不同,则无法进行远程传播,但是在这种情况下并非如此。我查过了。你知道我做错了什么吗?或者我怎样才能找到这两张桌子之间的区别,以及如何解决?或者可能是某个db参数值之间的差异?
Oracle数据库11g 11.2.0.3.0版

最佳答案

要检查的一个区域是每个数据库上的nls_lenght_语义参数值。
NLS_LENGTH_SEMANTICS允许您指定列的长度
以字符而不是字节表示的数据类型。
pl/sql类型创建脚本不隐式使用varchar2长度内的字节或字符转换。这将导致类型的生成继承为nls_length_语义设置的值。
确保检查两个数据库nls_length_semantics值。如果值不同(字节与字符),则可能是问题的原因。您可以通过在varchar2中显式地使用字节或字符来在一个数据库上重建类型和队列

create or replace type LOCAL_OBJ_MSG as object(
    test varchar2(4000 CHAR))
/

或者更改其中一个数据库上的nls_length_语义以确保它们匹配。更改nls_length_语义后重新编译数据库很重要

10-04 19:44