我正在处理从一个数据库到另一个数据库的aq传播,但是当我安排传播并将第一条消息排队到本地aq表时,dba_queue_schedules.last_error_msg“ora-25215:user_data type和queue type不匹配”。请注意,aq表中使用的两种对象类型是相同的,出于测试目的,我使用以下类型:
create or replace type LOCAL_OBJ_MSG as object(
test varchar2(4000))
/
两个aq表也是一样的,我用相同的脚本创建了它们,只更改了名称,一个本地的,另一个远程的。本地aq表在本地数据库中,远程aq表在schema_name.remote数据库中。
下面是我用来创建aq表的脚本:
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.stop_queue(
queue_name => 'REMOTE_iTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.drop_queue('REMOTE_iTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
dbms_aqadm.drop_queue_table('REMOTE_iTEST', force => true);
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.stop_queue(
queue_name => 'REMOTE_oTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
dbms_aqadm.drop_queue('REMOTE_oTEST');
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
QUEUE_NOT_FOUND exception;
PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
dbms_aqadm.drop_queue_table('REMOTE_oTEST', force => true);
exception
when QUEUE_NOT_FOUND then
null;
when others then
raise;
end;
/
declare
l_exist number;
cursor c_index is
select 1 from user_objects u where u.OBJECT_NAME = upper('LOCAL_OBJ_MSG') and u.OBJECT_TYPE = 'TYPE';
begin
open c_index;
fetch c_index into l_exist;
close c_index;
if l_exist = 1 then
execute immediate 'drop type LOCAL_OBJ_MSG';
end if;
end;
/
create or replace type LOCAL_OBJ_MSG as object(
test varchar2(4000))
/
begin
dbms_aqadm.create_queue_table (
queue_table => 'REMOTE_iTEST',
queue_payload_type => 'LOCAL_OBJ_MSG',
storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
message_grouping => DBMS_AQADM.NONE,
sort_list => 'ENQ_TIME',
multiple_consumers => true,
comment => 'Incoming TEST message table.');
end;
/
begin
dbms_aqadm.create_queue (
queue_table => 'REMOTE_iTEST',
queue_name => 'REMOTE_iTEST',
queue_type => sys.dbms_aqadm.normal_queue,
retention_time => sys.dbms_aqadm.INFINITE,
comment => 'Incoming TEST messages.',
max_retries => 5);
end;
/
begin
dbms_aqadm.start_queue(
queue_name => 'REMOTE_iTEST',
dequeue => true,
enqueue => true);
end;
/
begin
dbms_aqadm.create_queue_table (
queue_table => 'REMOTE_oTEST',
queue_payload_type => 'LOCAL_OBJ_MSG',
storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
message_grouping => DBMS_AQADM.NONE,
sort_list => 'ENQ_TIME',
multiple_consumers => true,
comment => 'Outgoing TEST message table.');
end;
/
begin
dbms_aqadm.create_queue (
queue_table => 'REMOTE_oTEST',
queue_name => 'REMOTE_oTEST',
queue_type => sys.dbms_aqadm.normal_queue,
retention_time => sys.dbms_aqadm.INFINITE,
comment => 'Outgoing TEST messages.',
max_retries => 5);
end;
/
begin
dbms_aqadm.start_queue(
queue_name => 'REMOTE_oTEST',
dequeue => TRUE,
enqueue => TRUE);
end;
/
下面是用于创建订阅服务器、出列过程等的脚本:
本地数据库:
begin
-- Add the remote subscriber.
dbms_aqadm.add_subscriber(queue_name => 'LOCAL_oTEST',
subscriber => sys.aq$_agent(name => 'LOCAL_oTEST_subscriber',
address => 'SCHEMA_NAME.REMOTE_oTEST@DB_LINK_NAME',
protocol => 0),
queue_to_queue => true);
-- Start the propagation of messages.
dbms_aqadm.schedule_propagation(queue_name => 'LOCAL_oTEST',
latency => 0,
destination => 'DB_LINK_NAME',
destination_queue => 'SCHEMA_NAME.REMOTE_oTEST');
end;
/
远程数据库:
-- Create a table to store the messages received.
create table sepa_omsg_aq_demo
(received timestamp default systimestamp,
message LOCAL_OBJ_MSG);
-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure REMOTE_CALLBACK_TEST
(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
as
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle raw(26);
o_payload LOCAL_OBJ_MSG;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into sepa_omsg_aq_demo
(message)
values (o_payload);
commit;
exception
when others then
rollback;
end;
/
-- Register the procedure for dequeuing the messages received.
-- I'd like to point out that the subscriber is the one defined for the local database
begin
dbms_aq.register (
sys.aq$_reg_info_list(
sys.aq$_reg_info('REMOTE_oTEST:LOCAL_oTEST_subscriber',
dbms_aq.namespace_aq,
'plsql://REMOTE_CALLBACK_TEST',
hextoraw('FF'))
),
1);
end;
/
sript将消息排队到本地aq表:
declare
enq_msgid raw(16);
eopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
message local_obj_msg;
begin
message := local_obj_msg('a');
dbms_aq.enqueue(queue_name => 'LOCAL_oTEST',
enqueue_options => eopt,
message_properties => mprop,
payload => message,
msgid => enq_msgid);
commit;
end;
/
还要注意,在sys.aq$_message_types表(local db)中,已验证创建传播的状态为“f”,并且:
declare
rc binary_integer;
begin
dbms_aqadm.verify_queue_types(src_queue_name => 'local_otest',
dest_queue_name => 'schema_name.remote_otest',
rc => rc,
destination => 'db_link_name');
dbms_output.put_line('Compatible: ' || rc);
end;
/
返回0,这意味着表类型不兼容。在我做了一些深入研究之后,我发现如果本地和远程数据库nls_length_语义不同,则无法进行远程传播,但是在这种情况下并非如此。我查过了。你知道我做错了什么吗?或者我怎样才能找到这两张桌子之间的区别,以及如何解决?或者可能是某个db参数值之间的差异?
Oracle数据库11g 11.2.0.3.0版
最佳答案
要检查的一个区域是每个数据库上的nls_lenght_语义参数值。
NLS_LENGTH_SEMANTICS允许您指定列的长度
以字符而不是字节表示的数据类型。
pl/sql类型创建脚本不隐式使用varchar2长度内的字节或字符转换。这将导致类型的生成继承为nls_length_语义设置的值。
确保检查两个数据库nls_length_semantics值。如果值不同(字节与字符),则可能是问题的原因。您可以通过在varchar2中显式地使用字节或字符来在一个数据库上重建类型和队列
即
create or replace type LOCAL_OBJ_MSG as object(
test varchar2(4000 CHAR))
/
或者更改其中一个数据库上的nls_length_语义以确保它们匹配。更改nls_length_语义后重新编译数据库很重要