本文主要是向读者介绍如何通过 ogg 为 oracle 数据库的变更操作实时同步到大数据产品 kafka 上。
开始介绍前,先为读者介绍一下环境背景
- 机器ip 和其对应的服务
192.168.88.166【ogg目标端】【ogg的mgr(端口1357), rep_test】
192.168.88.128【ogg源端】 【oracle, ogg的mgr(端口9001), ext_test, dpe_test, zookeeper, kafka】
- 软件版本
oracle 版本,11g release 2
kafka 版本,2.11-0.10.1.0
zookeeper 版本, 3.3.6
ogg源端JVM 版本, 1.7
ogg目标端JVM 版本,1.8 (必须1.8 以上,否者ogg无法启动)
ogg 因为下载时没有留意版本,所以直接贴文件名,应该能够对应版本
ogg 源端安装包文件名,123010_fbo_ggs_Linux_x64_shiphome,文件大小:543200432
ogg 目标端安装包文件名,123110_ggs_Adapters_Linux_x64,文件大小:81812011
开始介绍部署与配置步骤。
使用system用户登陆oracle 的sqlplus
sqlplus "system/oracle" as sysdba
- 首先检查oracle 是否已经开启 Archive logging
archive log list;
如果显示以下错误,则证明没有开启
ORA-01031: insufficient privileges
用户可以使用以下命令查看oracle 是否已经开启了 自动归档模式
select name,log_mode from v$database;
LOG_MODE 显示 NOARCHIVELOG 则代表没有开启
开启 Archive logging ,需要先停止数据库,执行以下命令
shutdown immediate;
然后将数据库启动到 mount 状态
startup mount
更改归档模式,启动 日志自动归档
ALTER DATABASE ARCHIVELOG;
以下一篇博客对oracle 启动和停止数据库的状态介绍得非常清楚,大家可以参考一下
http://blog.csdn.net/lutinghuan/article/details/7484062
这个使用如果用户想了解日志归档是否真的已经开启,以及查看归档日志存储在什么路径,可以再次执行
archive log list
例如作者机器就是显示
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /home/oracle/app/oracle/product/11.2.0/dbhome_1/dbs/arch
Oldest online log sequence 51
Next log sequence to archive 53
Current log sequence 53
如果用户希望修改归档日志的存储路径,可以执行以下命令
alter system set log_archive_dest='用户希望的路径,但是需要oracle程序可以读写'
修改好之后,就是重新打开oracle 的 database
alter database open
在shell中用户需要先创建好日志归档目录,例如作者的归档目录为 /home/oracle/app/oracle/product/11.2.0/dbhome_1/dbs/arch,则该目录需要预先创建
mkdir -p /home/oracle/app/oracle/product/11.2./dbhome_1/dbs/arch
测试归档
alter system switch logfile
然后用户可以在 /home/oracle/app/oracle/product/11.2.0/dbhome_1/dbs/arch 目录下看到新的归档日志。
用户也可以使用sql 命令查看归档情况,但是作者没有仔细查看这个输出
select * from v$archived_log
到这里,oracle 如何开启日志自动归档的方法就介绍完毕了。
- 检查是否开启forcelogging和 minimal supplemental logging
注意:如果不指定Primary key 和unique 属性,OGG将不会传送PK字段或Unique indiex字段信息。这样,下游的应用,在处理update数据时将失去依据
SELECT supplemental_log_data_min,force_logging FROM v$database;
像作者的环境就没有开启(屏幕输出NO),所以我们还需要执行开启命令,执行完毕后,我们再来查看 forcelogging和 minimal supplemental logging 的开启情况
alter database add supplemental log data (primary key) columns;
alter database add supplemental log data (unique) columns;
alter database force logging;
alter system switch logfile;
检查开启情况,显示如下则代表ok
SUPPLEME FOR
-------- ---
IMPLICIT YES
在oracle 中开启 enable_goldengate_replication 参数
alter system set ENABLE_GOLDENGATE_REPLICATION=true;
- 创建 ogg 用户
创建OGG 用户,用户名为ogg,密码也为ogg
create user ogg identified by ogg;
grant connect,resource to ogg;
grant select any dictionary to ogg;
grant select any table to ogg;
- 在Oracle 源端部署OGG 服务
作者的Oracle 服务部署在 oracle 系统用户下,所以打算 ogg 也同样部署在 oracle 用户下,相关的用户和路径可以参考:http://www.cnblogs.com/chenfool/p/7411626.html
创建 ogg 目录
/home/oracle/ogg
将 ogg 软件拷贝到 /home/oracle/ogg 目录下,并且解压
cp /mnt/hgfs/mnt/orace_ogg/123010_fbo_ggs_Linux_x64_shiphome.zip /home/oracle/ogg
unzip 123010_fbo_ggs_Linux_x64_shiphome.zip
修改ogg 的配置文件 /home/oracle/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp, 注意:作者设置环境变量的文件 ~/.bash_profile 已经设置好了 ORACLE_HOME 变量,详细可以查看:http://www.cnblogs.com/chenfool/p/7411626.html
INSTALL_OPTION=ORA11g
SOFTWARE_LOCATION=/home/oracle/oggapp
START_MANAGER=false
MANAGER_PORT=
DATABASE_LOCATION=${ORACLE_HOME}
运行安装 OGG 程序
cd /home/oracle/ogg/fbo_ggs_Linux_x64_shiphome/Disk1
./runInstaller -silent -responseFile /home/oracle/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp
修改oracle 用户的配置文件 ~/.bash_profile,主要是增加OGG_HOME 内容
export ORACLE_BASE=/home/oracle/app
export ORACLE_HOME=${ORACLE_BASE}/oracle/product/11.2./dbhome_1
export ORACLE_HOME_LISTNER=${ORACLE_HOME}
export OGG_HOME=/home/oracle/oggapp
export ORACLE_SID=orcl
export PATH=${PATH}:${ORACLE_HOME}/bin:${OGG_HOME}
export LD_LIBRARY_PATH=${ORACLE_HOME}/lib:/usr/lib:${OGG_HOME}
export DISPLAY=:0.0
登陆 ogg 控制台
cd /home/oracle/oggapp
./ggsci
给ogg 创建目录,注意:此操作是在 ggsci 中执行
create subdirs
作者执行后,屏幕输出
Parameter file /home/oracle/oggapp/dirprm: created.
Report file /home/oracle/oggapp/dirrpt: created.
Checkpoint file /home/oracle/oggapp/dirchk: created.
Process status files /home/oracle/oggapp/dirpcs: created.
SQL script files /home/oracle/oggapp/dirsql: created.
Database definitions files /home/oracle/oggapp/dirdef: created.
Extract data files /home/oracle/oggapp/dirdat: created.
Temporary files /home/oracle/oggapp/dirtmp: created.
Credential store files /home/oracle/oggapp/dircrd: created.
Masterkey wallet files /home/oracle/oggapp/dirwlt: created.
Dump files /home/oracle/oggapp/dirdmp: created.
配置源端manager进程,注意:此操作是在 ggsci 中执行
edit params mgr
然后操作会调到 VI 上,用户再输入以下内容后保存退出
port 9001
autostart er *
autorestart er *
启动 manager ,注意:此操作是在 ggsci 中执行
start mgr
查看manager 的状态,可以看到 MANAGER 为RUNNING,注意:此操作是在 ggsci 中执行
info all
编辑ext_test服务,该服务是专门用来抽取oracle 新增日志的进程,注意:此操作是在 ggsci 中执行
edit params ext_test
然后操作会调到 VI 上,用户再输入以下内容后保存退出
EXTRACT ext_test
Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")
USERID ogg, PASSWORD ogg
gettruncates
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES
DBOPTIONS ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY MINUTES, RATE
FETCHOPTIONS NOUSESNAPSHOT
TRANLOGOPTIONS DBLOGREADER EXTTRAIL ./dirdat/ex WILDCARDRESOLVE DYNAMIC
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
dynamicresolution table scott.test;
然后执行命令创建ext_test 的服务,并且指定将抽取到的日志存放在 ./dirdat/ 目录下,并且文件名以ex 开头,注意:此操作是在 ggsci 中执行
add extract ext_test, TRANLOG, BEGIN NOW
ADD EXTTRAIL ./dirdat/ex , EXTRACT ext_test, MEGABYTES
注意,指定的目录末尾一定只能是两个字符,否则报错如下
file portion must be two characters.
启动该日志抽取服务,注意:此操作是在 ggsci 中执行
start ext_test
日志抽取部分的配置就ok了,下面开始配置日志发送的服务dpe_test
同样的先编辑服务的配置
edit param dpe_test
然后界面会进入VI 模式,在文件上填写以下内容后保存退出,同样的dirdat 路径下的文件名只能够用两个字符,否则会报告错误
EXTRACT dpe_test
PASSTHRU RMTHOST 192.168.88.162, MGRPORT RMTTRAIL ./dirdat/re
TABLE scott.test;
这里必须要和大家介绍一下,因为作者在这里卡了很久。
第一,RMTHOST 这个IP 地址,是远端接收日志的服务的IP地址
第二,MGRPORT 端口是远端接收日志的MGR 服务端口,这里千万不要搞混淆了。
第三,RMTTAIL 这个是远端接收日志的路径,并非本机的,即是在 88.162 机器上的dirdata 目录上的存放接收到的日志路径
后面的步骤就是,将ext_test 抽取到的日志存放路径 配置给 dpe_test 发送日志服务,注意:此操作是在 ggsci 中执行
ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/ex
ADD RMTTRAIL ./dirdat/re, EXTRACT dpe_test, MEGABYTES
大家注意,红色字体是ext_test 的路径,绿色字体是发送到远端服务的路径
配置好dpe_test 服务后,还不能够立马启动,因为远端接收的MGR 服务还没有起来,所以这块还遗留一个小尾巴
- 部署目标端的ogg
在目标端机器(192.168.88.162)上事先创建oracle:oracle 用户组和用户
groupadd oracle
useradd -s /bin/bash -m -g oracle oracle
passwd oracle
切换oracle 用户,为ogg adapter 创建目录
mkdir /home/oracle/ogg_adapter
拷贝ogg adapter 压缩包,并且解压
cp /mnt/hgfs/mnt/orace_ogg/123110_ggs_Adapters_Linux_x64.zip /home/oracle/ogg_adapter ;
unzip 123110_ggs_Adapters_Linux_x64.zip
将解压的 ogg_adapter/ggs_Adapters_Linux_x64.tar 文件拷贝到 /home/oracle/ogg 目录(如果不存在此目录,请事先创建)
cp ogg_adapter/ggs_Adapters_Linux_x64.tar /home/oracle/ogg
在 /home/oracle/ogg 目录下解压 ggs_Adapters_Linux_x64.tar.gz 文件
tar xvf ggs_Adapters_Linux_x64.tar.gz
为oracle 用户设置JDK 和 ogg 的环境变量
export JAVA_HOME=/opt/jdk1..0_144
export CLASSPATH=${JAVA_HOME}/lib/tools.jar:${JAVA_HOME}/lib/dt.jar
export PATH=${JAVA_HOME}/bin:${PATH} export OGG_HOME=/home/oracle/oggapp
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64
如果不设置JDK的LD_LIBRARY_PATH环境变量,会在启动后面的rep_test 服务时,报错
ERROR OGG-15050 Error loading Java VM runtime library
用户也要注意 /etc/hosts 的设置,一定需要将机器的hostname 和 IP 地址配置好映射关系,否者也会出现错误
ERROR OGG-15161 Could not initialize the connection with MGR MGR (No route to host).
如果用户出现此错误,需要先重启目标端的mgr 服务,直接在 ggsci 命令窗口执行 stop mgr 命令是不能够停止mgr 服务的,还需要用户在shell 中通过 kill -15 PID 的方法停止mgr。
用户正确修改 /etc/hosts 配置文件后,再重启 目标端的 mgr 和 rep_test 服务,然后需要 重新启动 源端 的 dpe_test 服务,因为目标段的 mgr 服务一旦停止, dpe_test 就可能出现异常。
拷贝模板文件
cp /home/oracle/ogg/AdapterExamples/big-data/kafka/* /home/oracle/ogg/dirprm/
启动 ggsci 控制程序
/home/oracle/ogg/ggsci
创建目录,注意:此操作是在 ggsci 中执行
create subdirs
配置源端manager进程,注意:此操作是在 ggsci 中执行
edit params mgr
然后出现VI 界面,输入以下内容后保存退出
port 1357
autostart er *
autorestart er *
读者们注意了,在目标段的MGR 配置中,端口号设置是需要和 源端的 dpe_test 配合的,这个读者注意。
启动manager 进程,并且确认,注意:此操作是在 ggsci 中执行
start mgr
确认命令,如果正常运行,则显示 MANAGER RUNNING ,注意:此操作是在 ggsci 中执行
info all
修改接收服务配置,注意:此操作是在 ggsci 中执行
edit params rep_test
修改内容如下
REPLICAT rep_test
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
GETTRUNCATES
SOURCEDEFS dirdef/source.def
REPORTCOUNT EVERY MINUTES, RATE
GROUPTRANSOPS
MAP scott.*, TARGET scott.*;
dirprm/kafka.props 是kafka 的配置文件,下面会介绍到
dirdef/source.def 是源端的表结构定义文件,这个暂时还没有介绍到,等一下会介绍。
然后就是添加rep_test 该服务,并且指定 dirdat/re 路径作为 rep_test 服务的日志来源,读者可以留意一下,dirdat/re 路径,实际上就是远端dpe_test 指定的路径。
因为rep_test 的服务作用就是将 日志文件解析,然后再做相应的处理,例如本例子就是将解析后的日志发送到kafka 上
为 ogg 增加rep_test 服务,注意:此操作是在 ggsci 中执行
add replicat rep_test, exttrail ./dirdat/re
修改/home/oracle/ogg/dirprm/kafka.props,主要修改以下内容
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.TopicMappingTemplate=chentest
#gg.handler.kafkahandler.TopicName=chentest
gg.handler.kafkahandler.KafkaProducerConfigFile=./dirprm/custom_kafka_producer.properties
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=tx
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/home/oracle/ogg/ggjava/resources/lib/*:/home/oracle/kafka_lib/*
#Sample gg.classpath for HDP
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
这里有几点需要读者们注意的。
1 gg.handler.kafkahandler.format 参数需要填写 json ,如果填写text 或者其他,会由于源端oracle 中执行update sql 命令,从而导致 日志解析失败,因为text 无法表示出修改记录的形式
2 gg.handler.kafkahandler.mode 参数填写 tx,表示解析后的日志是可读的
3 gg.classpath 参数重点需要填写两个路径,分别是 ogg for kafka 的驱动路径,另外一个是kafka 产品的lib 驱动路径(该路径读者可以在KAFKA_HOME 目录下寻找,直接拷贝过来即可)
4 kafka 中的topicName 要使用 gg.handler.kafkahandler.TopicMappingTemplate 参数,而不能够使用gg.handler.kafkahandler.TopicName 参数(ogg 的kafka 模版中的参数竟然是错误的,简直令人发指),否者rep_test 进程将启动失败。
修改连接kafka 的配置
vi /home/oracle/ogg/dirprm/custom_kafka_producer.properties
主要修改内容,在作者的测试中,并没有选择开启gzip 压缩模式,所以不知道该参数是否生效(反正我看到有文章说该参数会导致ogg 起不来)。
bootstrap.servers 参数是指定 用户已经部署好的 kafka 连接ip和端口
bootstrap.servers=192.168.88.128:9092
rep_test 的服务配置已经基本完了,但是 源端表结构的配置还没有开始做,所以也暂时对 rep_test 服务留一个小尾巴。
- 源端表结构文件初始化
请读者回到 源端 的ggsci 客户端的交互命令窗口上来,执行以下命令,创建一个新的初始化源端表结构的配置文件,注意:此操作是在 ggsci 中执行
edit param defgen
此时界面会跳转到VI界面上,用户直接在上面编辑以下内容。
userid 和 password 参数是 oracle 数据库中的用户名和密码登录方式
table 参数就是需要初始化的表名
DEFSFILE dirdef/source.def, PURGE
USERID ogg, PASSWORD ogg
TABLE scott.test ;
然后用户在 linux shell 窗口下执行以下命令(源端执行shell 命令)
defgen paramfile dirprm/defgen.prm
然后把在源端新生成的 dirdef/source.def 文件scp 到 目标端 的 dirdef/source.def
- 将遗留的尾巴收割
我们首先来启动rep_test 服务(目标端的ggsci 执行),注意:此操作是在 ggsci 中执行
start rep_test
再来启动 dpe_test 服务(源端的ggsci 执行),注意:此操作是在 ggsci 中执行
start dpe_test
用户也可以分别在源端和目标端的 ggsci 中查看rep_test 和 dpe_test 服务的启动状态
info all
- 零散的技巧点
《一》、如果用户希望查看 ext_test 、dpe_test 和 rep_test 服务的启动日志,可以在各自机器ogg 的dirrpt 目录下查看对应服务的启动日志
《二》、如果rep_test服务发生错误,并且dirrpt/REP_TEST_info_log4j.log 日志错误如下:
ERROR -- ::, [main] A failure occurred sending a message to Kafka.
org.apache.kafka.common.errors.RecordTooLargeException: The message is bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
WARN -- ::, [main] Error sending event to listener kafkahandler, status: ABEND, event: Commit transaction
那么用户可以在 dirprm/custom_kafka_producer.properties 配置文件中增加一行配置,该配置可以避免rep_test 向kafka 发送消息时,由于消息过大而报错
max.request.size=???
该错误解决方法来源于博客:http://www.ateam-oracle.com/oracle-goldengate-big-data-adapter-apache-kafka-producer/
《三》、如果遇到字符集问题,应该明白,设置数据同步的字符集应该以目标端的字符集为准,详细可以参考以下博客:
http://blog.163.com/myy10146@126/blog/static/11355718620134175728388/
同时,这里记录一下GBK和UTF8 的写法
AMERICAN_AMERICA.ZHS16GBK
AMERICAN_AMERICA.AL32UTF8
《四》、在启动ext_test服务时,如果遇到以下错误
ERROR OGG- No valid log files for current redo sequence , thread , error retrieving redo file name for sequence , archived = , use_alternate = 0Not able to establish initial position for begin time -- ::.
ERROR OGG- PROCESS ABENDING.
则为 ext_test 服务的配置文件增加一行配置
TRANLOGOPTIONS DBLOGREADER
《五》、删除ext_test 、 dpe_test 、 rep_test 服务的方法,主要是作者对于ogg 的配置不熟悉,如果环境出现了进城无法启动,则可以通过删除服务然后重新配置
delete extract dpe_test
delete extract ext_test
delete replicat rep_test
用户如果删除服务后,一定要记得将 dirdat/ex* 和 dirdat/re* 的相关文件删除,否者会在后面增加服务时出错。
删除服务不会删除原有服务的配置文件,所以后面重新增加服务就比较简单,执行以下命令即可,当然这些命令就是上面配置的添加命令
add extract ext_test , tranlog , begin now
add exttrail ./dirdat/ex , extract ext_test , megabytes
add extract dpe_test , exttrailsource ./dirdat/ex
add rmttrail ./dirdat/re, extract dpe_test, megabytes add replicat rep_test , exttrail ./dirdat/re
《六》 目标端和源端的hostname 一定要不一样,否则在启动 rep_test 时会由于无法连接kafka 而报错,并且应该将目标端和源端的IP 地址和 hostname 都正确配置在 /etc/hosts 文件中
《七》 用户想支持truncate 功能,则还需要在 ext_test 和 rep_test 的配置文件上增加以下参数,如果不想要该信息,则可以移除该参数
GETTRUNCATES
《八》如果用户在同步源端数据时,只希望同步部分列字段值,可以在ext_test 服务配置文件上做标识,例如test 表的表结构为 name varchar(10), id int,如果用户不想同步id 字段值,可以填写如下配置,红色部分关键字代表不同步的列
EXTRACT ext_test
Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")
USERID ogg, PASSWORD ogg
gettruncates
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES
DBOPTIONS ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY MINUTES, RATE
FETCHOPTIONS NOUSESNAPSHOT EXTTRAIL ./dirdat/ex WILDCARDRESOLVE DYNAMIC
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
dynamicresolution table ogg.test, colsexcept(id);
参考博客:
http://blog.csdn.net/warren_zqw/article/details/52894586
http://guojuanjun.blog.51cto.com/277646/295454/
http://www.cnblogs.com/soysauce/p/7411314.html
http://blog.csdn.net/shipeng1022/article/details/50720690
http://www.qiuyb.com/archives/436