本文主要是向读者介绍如何通过 ogg 为 oracle 数据库的变更操作实时同步到大数据产品 kafka 上。

开始介绍前,先为读者介绍一下环境背景

  • 机器ip 和其对应的服务

192.168.88.166【ogg目标端】【ogg的mgr(端口1357), rep_test】

192.168.88.128【ogg源端】   【oracle, ogg的mgr(端口9001), ext_test, dpe_test, zookeeper, kafka】

  • 软件版本

oracle 版本,11g release 2

kafka 版本,2.11-0.10.1.0

zookeeper 版本, 3.3.6

ogg源端JVM 版本, 1.7

ogg目标端JVM 版本,1.8 (必须1.8 以上,否者ogg无法启动)

ogg 因为下载时没有留意版本,所以直接贴文件名,应该能够对应版本

ogg 源端安装包文件名,123010_fbo_ggs_Linux_x64_shiphome,文件大小:543200432

ogg 目标端安装包文件名,123110_ggs_Adapters_Linux_x64,文件大小:81812011


开始介绍部署与配置步骤。

使用system用户登陆oracle 的sqlplus

sqlplus "system/oracle" as sysdba
  • 首先检查oracle 是否已经开启 Archive logging
archive log list;

如果显示以下错误,则证明没有开启

ORA-01031: insufficient privileges

用户可以使用以下命令查看oracle 是否已经开启了 自动归档模式

select name,log_mode from v$database;

LOG_MODE 显示 NOARCHIVELOG 则代表没有开启

开启 Archive logging ,需要先停止数据库,执行以下命令

shutdown immediate;

然后将数据库启动到 mount 状态

startup mount

更改归档模式,启动 日志自动归档

ALTER DATABASE ARCHIVELOG;

以下一篇博客对oracle 启动和停止数据库的状态介绍得非常清楚,大家可以参考一下

http://blog.csdn.net/lutinghuan/article/details/7484062

这个使用如果用户想了解日志归档是否真的已经开启,以及查看归档日志存储在什么路径,可以再次执行

archive log list

例如作者机器就是显示

Database log mode           Archive Mode
Automatic archival Enabled
Archive destination /home/oracle/app/oracle/product/11.2.0/dbhome_1/dbs/arch
Oldest online log sequence 51
Next log sequence to archive 53
Current log sequence 53

如果用户希望修改归档日志的存储路径,可以执行以下命令

alter system set log_archive_dest='用户希望的路径,但是需要oracle程序可以读写'

修改好之后,就是重新打开oracle 的 database

alter database open

在shell中用户需要先创建好日志归档目录,例如作者的归档目录为 /home/oracle/app/oracle/product/11.2.0/dbhome_1/dbs/arch,则该目录需要预先创建

mkdir -p /home/oracle/app/oracle/product/11.2./dbhome_1/dbs/arch

测试归档

alter system switch logfile

然后用户可以在  /home/oracle/app/oracle/product/11.2.0/dbhome_1/dbs/arch 目录下看到新的归档日志。

用户也可以使用sql 命令查看归档情况,但是作者没有仔细查看这个输出

select * from v$archived_log

到这里,oracle 如何开启日志自动归档的方法就介绍完毕了。

  • 检查是否开启forcelogging和 minimal supplemental logging

注意:如果不指定Primary key 和unique 属性,OGG将不会传送PK字段或Unique indiex字段信息。这样,下游的应用,在处理update数据时将失去依据

SELECT supplemental_log_data_min,force_logging FROM v$database;

像作者的环境就没有开启(屏幕输出NO),所以我们还需要执行开启命令,执行完毕后,我们再来查看 forcelogging和 minimal supplemental logging 的开启情况

alter database add supplemental log data (primary key) columns;
alter database add supplemental log data (unique) columns;
alter database force logging;
alter system switch logfile;

检查开启情况,显示如下则代表ok

SUPPLEME FOR
-------- ---
IMPLICIT YES

在oracle 中开启 enable_goldengate_replication 参数

alter system set ENABLE_GOLDENGATE_REPLICATION=true;
  • 创建 ogg 用户

创建OGG 用户,用户名为ogg,密码也为ogg

create user ogg identified by ogg;
grant connect,resource to ogg;
grant select any dictionary to ogg;
grant select any table to ogg;
  • 在Oracle 源端部署OGG 服务

作者的Oracle 服务部署在 oracle 系统用户下,所以打算 ogg 也同样部署在 oracle 用户下,相关的用户和路径可以参考:http://www.cnblogs.com/chenfool/p/7411626.html

创建 ogg 目录

/home/oracle/ogg

将 ogg 软件拷贝到 /home/oracle/ogg 目录下,并且解压

cp /mnt/hgfs/mnt/orace_ogg/123010_fbo_ggs_Linux_x64_shiphome.zip /home/oracle/ogg
unzip 123010_fbo_ggs_Linux_x64_shiphome.zip

修改ogg 的配置文件 /home/oracle/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp, 注意:作者设置环境变量的文件 ~/.bash_profile 已经设置好了 ORACLE_HOME  变量,详细可以查看:http://www.cnblogs.com/chenfool/p/7411626.html

INSTALL_OPTION=ORA11g
SOFTWARE_LOCATION=/home/oracle/oggapp
START_MANAGER=false
MANAGER_PORT=
DATABASE_LOCATION=${ORACLE_HOME}

运行安装 OGG 程序

cd /home/oracle/ogg/fbo_ggs_Linux_x64_shiphome/Disk1
./runInstaller -silent -responseFile /home/oracle/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

修改oracle 用户的配置文件 ~/.bash_profile,主要是增加OGG_HOME 内容

export ORACLE_BASE=/home/oracle/app
export ORACLE_HOME=${ORACLE_BASE}/oracle/product/11.2./dbhome_1
export ORACLE_HOME_LISTNER=${ORACLE_HOME}
export OGG_HOME=/home/oracle/oggapp
export ORACLE_SID=orcl
export PATH=${PATH}:${ORACLE_HOME}/bin:${OGG_HOME}
export LD_LIBRARY_PATH=${ORACLE_HOME}/lib:/usr/lib:${OGG_HOME}
export DISPLAY=:0.0

登陆 ogg 控制台

cd /home/oracle/oggapp
./ggsci

给ogg 创建目录,注意:此操作是在 ggsci 中执行

create subdirs

作者执行后,屏幕输出

Parameter file                 /home/oracle/oggapp/dirprm: created.
Report file /home/oracle/oggapp/dirrpt: created.
Checkpoint file /home/oracle/oggapp/dirchk: created.
Process status files /home/oracle/oggapp/dirpcs: created.
SQL script files /home/oracle/oggapp/dirsql: created.
Database definitions files /home/oracle/oggapp/dirdef: created.
Extract data files /home/oracle/oggapp/dirdat: created.
Temporary files /home/oracle/oggapp/dirtmp: created.
Credential store files /home/oracle/oggapp/dircrd: created.
Masterkey wallet files /home/oracle/oggapp/dirwlt: created.
Dump files /home/oracle/oggapp/dirdmp: created.

配置源端manager进程,注意:此操作是在 ggsci 中执行

edit params mgr

然后操作会调到 VI 上,用户再输入以下内容后保存退出

port 9001
autostart er *
autorestart er *

启动 manager ,注意:此操作是在 ggsci 中执行

start mgr

查看manager 的状态,可以看到 MANAGER 为RUNNING,注意:此操作是在 ggsci 中执行

info all

编辑ext_test服务,该服务是专门用来抽取oracle 新增日志的进程,注意:此操作是在 ggsci 中执行

edit params ext_test

然后操作会调到 VI 上,用户再输入以下内容后保存退出

EXTRACT ext_test
Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")
USERID ogg, PASSWORD ogg
gettruncates
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES
DBOPTIONS ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY MINUTES, RATE
FETCHOPTIONS NOUSESNAPSHOT
TRANLOGOPTIONS DBLOGREADER EXTTRAIL ./dirdat/ex WILDCARDRESOLVE DYNAMIC
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
dynamicresolution table scott.test;

然后执行命令创建ext_test 的服务,并且指定将抽取到的日志存放在 ./dirdat/ 目录下,并且文件名以ex 开头,注意:此操作是在 ggsci 中执行

add extract ext_test, TRANLOG, BEGIN NOW
ADD EXTTRAIL ./dirdat/ex , EXTRACT ext_test, MEGABYTES

注意,指定的目录末尾一定只能是两个字符,否则报错如下

file portion must be two characters.

启动该日志抽取服务,注意:此操作是在 ggsci 中执行

start ext_test

日志抽取部分的配置就ok了,下面开始配置日志发送的服务dpe_test

同样的先编辑服务的配置

edit param dpe_test

然后界面会进入VI 模式,在文件上填写以下内容后保存退出,同样的dirdat 路径下的文件名只能够用两个字符,否则会报告错误

EXTRACT dpe_test
PASSTHRU RMTHOST 192.168.88.162, MGRPORT RMTTRAIL ./dirdat/re
TABLE scott.test;

这里必须要和大家介绍一下,因为作者在这里卡了很久。

第一,RMTHOST 这个IP 地址,是远端接收日志的服务的IP地址

第二,MGRPORT 端口是远端接收日志的MGR 服务端口,这里千万不要搞混淆了。

第三,RMTTAIL 这个是远端接收日志的路径,并非本机的,即是在 88.162 机器上的dirdata 目录上的存放接收到的日志路径

后面的步骤就是,将ext_test 抽取到的日志存放路径 配置给 dpe_test 发送日志服务,注意:此操作是在 ggsci 中执行

ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/ex
ADD RMTTRAIL ./dirdat/re, EXTRACT dpe_test, MEGABYTES

大家注意,红色字体是ext_test 的路径,绿色字体是发送到远端服务的路径

配置好dpe_test 服务后,还不能够立马启动,因为远端接收的MGR 服务还没有起来,所以这块还遗留一个小尾巴

  • 部署目标端的ogg

在目标端机器(192.168.88.162)上事先创建oracle:oracle 用户组和用户

groupadd  oracle
useradd -s /bin/bash -m -g oracle oracle
passwd oracle

切换oracle 用户,为ogg adapter 创建目录

mkdir /home/oracle/ogg_adapter

拷贝ogg adapter 压缩包,并且解压

cp /mnt/hgfs/mnt/orace_ogg/123110_ggs_Adapters_Linux_x64.zip  /home/oracle/ogg_adapter ;
unzip 123110_ggs_Adapters_Linux_x64.zip

将解压的 ogg_adapter/ggs_Adapters_Linux_x64.tar 文件拷贝到  /home/oracle/ogg 目录(如果不存在此目录,请事先创建)

cp ogg_adapter/ggs_Adapters_Linux_x64.tar /home/oracle/ogg

在 /home/oracle/ogg 目录下解压 ggs_Adapters_Linux_x64.tar.gz 文件

tar xvf ggs_Adapters_Linux_x64.tar.gz

为oracle 用户设置JDK 和 ogg 的环境变量

export JAVA_HOME=/opt/jdk1..0_144
export CLASSPATH=${JAVA_HOME}/lib/tools.jar:${JAVA_HOME}/lib/dt.jar
export PATH=${JAVA_HOME}/bin:${PATH} export OGG_HOME=/home/oracle/oggapp
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64

如果不设置JDK的LD_LIBRARY_PATH环境变量,会在启动后面的rep_test 服务时,报错

ERROR   OGG-15050  Error loading Java VM runtime library

用户也要注意 /etc/hosts 的设置,一定需要将机器的hostname 和 IP 地址配置好映射关系,否者也会出现错误

ERROR   OGG-15161  Could not initialize the connection with MGR MGR (No route to host).

如果用户出现此错误,需要先重启目标端的mgr 服务,直接在 ggsci 命令窗口执行 stop mgr 命令是不能够停止mgr 服务的,还需要用户在shell 中通过 kill -15 PID 的方法停止mgr。

用户正确修改 /etc/hosts 配置文件后,再重启 目标端的 mgr 和 rep_test 服务,然后需要 重新启动 源端 的 dpe_test 服务,因为目标段的 mgr 服务一旦停止, dpe_test 就可能出现异常。

拷贝模板文件

cp /home/oracle/ogg/AdapterExamples/big-data/kafka/*  /home/oracle/ogg/dirprm/

启动 ggsci 控制程序

/home/oracle/ogg/ggsci

创建目录,注意:此操作是在 ggsci 中执行

create subdirs

配置源端manager进程,注意:此操作是在 ggsci 中执行

edit params mgr

然后出现VI 界面,输入以下内容后保存退出

port 1357
autostart er *
autorestart er *

读者们注意了,在目标段的MGR 配置中,端口号设置是需要和 源端的 dpe_test 配合的,这个读者注意。

启动manager 进程,并且确认,注意:此操作是在 ggsci 中执行

start mgr

确认命令,如果正常运行,则显示 MANAGER  RUNNING ,注意:此操作是在 ggsci 中执行

info all

修改接收服务配置,注意:此操作是在 ggsci 中执行

edit params rep_test

修改内容如下

REPLICAT rep_test
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
GETTRUNCATES
SOURCEDEFS dirdef/source.def
REPORTCOUNT EVERY MINUTES, RATE
GROUPTRANSOPS
MAP scott.*, TARGET scott.*;

dirprm/kafka.props 是kafka 的配置文件,下面会介绍到

dirdef/source.def 是源端的表结构定义文件,这个暂时还没有介绍到,等一下会介绍。

然后就是添加rep_test 该服务,并且指定 dirdat/re 路径作为 rep_test 服务的日志来源,读者可以留意一下,dirdat/re 路径,实际上就是远端dpe_test 指定的路径。

因为rep_test 的服务作用就是将 日志文件解析,然后再做相应的处理,例如本例子就是将解析后的日志发送到kafka 上

为 ogg 增加rep_test 服务,注意:此操作是在 ggsci 中执行

add replicat rep_test, exttrail ./dirdat/re

修改/home/oracle/ogg/dirprm/kafka.props,主要修改以下内容

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.TopicMappingTemplate=chentest
#gg.handler.kafkahandler.TopicName=chentest
gg.handler.kafkahandler.KafkaProducerConfigFile=./dirprm/custom_kafka_producer.properties

gg.handler.kafkahandler.format=json

gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=tx

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/home/oracle/ogg/ggjava/resources/lib/*:/home/oracle/kafka_lib/*
#Sample gg.classpath for HDP

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

这里有几点需要读者们注意的。

1 gg.handler.kafkahandler.format 参数需要填写 json ,如果填写text 或者其他,会由于源端oracle 中执行update sql 命令,从而导致 日志解析失败,因为text 无法表示出修改记录的形式

2 gg.handler.kafkahandler.mode 参数填写 tx,表示解析后的日志是可读的

3 gg.classpath 参数重点需要填写两个路径,分别是 ogg for kafka 的驱动路径,另外一个是kafka 产品的lib 驱动路径(该路径读者可以在KAFKA_HOME 目录下寻找,直接拷贝过来即可)

4 kafka 中的topicName 要使用 gg.handler.kafkahandler.TopicMappingTemplate 参数,而不能够使用gg.handler.kafkahandler.TopicName 参数(ogg 的kafka 模版中的参数竟然是错误的,简直令人发指),否者rep_test 进程将启动失败。

修改连接kafka 的配置

vi /home/oracle/ogg/dirprm/custom_kafka_producer.properties

主要修改内容,在作者的测试中,并没有选择开启gzip 压缩模式,所以不知道该参数是否生效(反正我看到有文章说该参数会导致ogg 起不来)。

bootstrap.servers 参数是指定 用户已经部署好的 kafka 连接ip和端口

bootstrap.servers=192.168.88.128:9092

rep_test 的服务配置已经基本完了,但是 源端表结构的配置还没有开始做,所以也暂时对 rep_test 服务留一个小尾巴。

  • 源端表结构文件初始化

请读者回到 源端 的ggsci 客户端的交互命令窗口上来,执行以下命令,创建一个新的初始化源端表结构的配置文件,注意:此操作是在 ggsci 中执行

edit param defgen

此时界面会跳转到VI界面上,用户直接在上面编辑以下内容。

userid 和 password 参数是 oracle 数据库中的用户名和密码登录方式

table 参数就是需要初始化的表名

DEFSFILE dirdef/source.def, PURGE
USERID ogg, PASSWORD ogg
TABLE scott.test ;

然后用户在 linux shell 窗口下执行以下命令(源端执行shell 命令)

defgen paramfile dirprm/defgen.prm

然后把在源端新生成的 dirdef/source.def 文件scp 到 目标端 的 dirdef/source.def

  • 将遗留的尾巴收割

我们首先来启动rep_test 服务(目标端的ggsci 执行),注意:此操作是在 ggsci 中执行

start rep_test

再来启动 dpe_test 服务(源端的ggsci 执行),注意:此操作是在 ggsci 中执行

start dpe_test

用户也可以分别在源端和目标端的 ggsci 中查看rep_test 和 dpe_test 服务的启动状态

info  all
  • 零散的技巧点

《一》、如果用户希望查看 ext_test 、dpe_test 和 rep_test 服务的启动日志,可以在各自机器ogg 的dirrpt 目录下查看对应服务的启动日志

《二》、如果rep_test服务发生错误,并且dirrpt/REP_TEST_info_log4j.log 日志错误如下:

ERROR -- ::, [main] A failure occurred sending a message to Kafka.
org.apache.kafka.common.errors.RecordTooLargeException: The message is bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
WARN -- ::, [main] Error sending event to listener kafkahandler, status: ABEND, event: Commit transaction

那么用户可以在 dirprm/custom_kafka_producer.properties 配置文件中增加一行配置,该配置可以避免rep_test 向kafka 发送消息时,由于消息过大而报错

max.request.size=???

该错误解决方法来源于博客:http://www.ateam-oracle.com/oracle-goldengate-big-data-adapter-apache-kafka-producer/

《三》、如果遇到字符集问题,应该明白,设置数据同步的字符集应该以目标端的字符集为准,详细可以参考以下博客:

http://blog.163.com/myy10146@126/blog/static/11355718620134175728388/

同时,这里记录一下GBK和UTF8 的写法

AMERICAN_AMERICA.ZHS16GBK
AMERICAN_AMERICA.AL32UTF8

《四》、在启动ext_test服务时,如果遇到以下错误

 ERROR   OGG-  No valid log files for current redo sequence , thread , error retrieving redo file name

for sequence , archived = , use_alternate = 0Not able to establish initial position for begin time -- ::.
 ERROR   OGG-  PROCESS ABENDING.

则为 ext_test 服务的配置文件增加一行配置

TRANLOGOPTIONS DBLOGREADER

《五》、删除ext_test 、 dpe_test 、 rep_test 服务的方法,主要是作者对于ogg 的配置不熟悉,如果环境出现了进城无法启动,则可以通过删除服务然后重新配置

delete extract dpe_test
delete extract ext_test
delete replicat rep_test

用户如果删除服务后,一定要记得将 dirdat/ex* 和 dirdat/re* 的相关文件删除,否者会在后面增加服务时出错。

删除服务不会删除原有服务的配置文件,所以后面重新增加服务就比较简单,执行以下命令即可,当然这些命令就是上面配置的添加命令

add extract ext_test , tranlog , begin now
add exttrail ./dirdat/ex , extract ext_test , megabytes
add extract dpe_test , exttrailsource ./dirdat/ex
add rmttrail ./dirdat/re, extract dpe_test, megabytes add replicat rep_test , exttrail ./dirdat/re

《六》  目标端和源端的hostname 一定要不一样,否则在启动 rep_test 时会由于无法连接kafka 而报错,并且应该将目标端和源端的IP 地址和 hostname 都正确配置在 /etc/hosts 文件中

《七》 用户想支持truncate 功能,则还需要在 ext_test 和 rep_test 的配置文件上增加以下参数,如果不想要该信息,则可以移除该参数

GETTRUNCATES

《八》如果用户在同步源端数据时,只希望同步部分列字段值,可以在ext_test 服务配置文件上做标识,例如test 表的表结构为 name varchar(10), id int,如果用户不想同步id 字段值,可以填写如下配置,红色部分关键字代表不同步的列

EXTRACT ext_test
Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")
USERID ogg, PASSWORD ogg
gettruncates
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES
DBOPTIONS ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY MINUTES, RATE
FETCHOPTIONS NOUSESNAPSHOT EXTTRAIL ./dirdat/ex WILDCARDRESOLVE DYNAMIC
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
dynamicresolution table ogg.test, colsexcept(id);

参考博客:

http://blog.csdn.net/warren_zqw/article/details/52894586

http://guojuanjun.blog.51cto.com/277646/295454/

http://www.cnblogs.com/soysauce/p/7411314.html

http://blog.csdn.net/shipeng1022/article/details/50720690

http://www.qiuyb.com/archives/436

05-22 07:48