梁铭图,新炬网络首席架构师。具有十余年数据库运维、数据库设计、数据治理以及系统规划建设经验,拥有Oracle OCM和ACE Director、Togaf企业架构师(鉴定级)、IBM CATE等认证,曾获dbaplus年度MVP、华为云MVP等荣誉,并参与数据资产管理国家标准的编写工作。在数据库运维管理和架构设计、运维体系规划、数据资产管理方面有深入研究。
背景
从GoldenGate 12.2开始,GoldenGate就支持直接投递数据到Kafka等平台,而不用通过Java二次开发。在数据复制过程中,GoldenGate充当Kafka Producer的角色,从Oraclet等关系型数据库解析增量数据,再实时往kafka平台写入。
实施过程
1)创建ogg账号,用来部署ogg软件
useradd -u 1300 -d /home/ogg ogg
usermod -a -G oinstall ogg
2)创建kafka账号,用来部署zookeeper和kafka
useradd -u 1400 -d /home/kafka kafka
3)源端oracle数据库创建测试表
create table ogg.t1( ID1 number(8),ID2 number(8),info varchar(10),constraint pk_t1 primary key(ID1) using index);
4)源端ogg安装
下载for oracle ogg安装包:
修改安装文件
vi Disk1/responseoggcore.rsp
修改如下参数:
INSTALL_OPTION=ORA12c ---数据库版本
SOFTWARE_LOCATION=/ogg2 --安装目录
START_MANAGER=NO
UNIX_GROUP_NAME=oinstall
静默安装:
./runInstaller -silent -responseFile ./response/oggcore.rsp
5)目标端ogg安装
在目标端下载并安装Ogg for big data
6)安装zookeeper
解压ZK安装包,修改配置文件
vi /zooper/apache-zookeeper-3.7.0-bin/conf/zoo.cfg
tickTime = 2000
dataDir = /zooper/apache-zookeeper-3.7.0-bin/data
clientPort = 2181
initLimit = 5
syncLimit = 2
/zooper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

显示STARTED后,表示zookeeper已启动完成。
7)安装kafka
解压kafka安装包,修改配置文件:
vi /kafka/kafka_2.12-2.7.0/config/server.properties
broker.id=0
listeners=PLAINTEXT://postgresql1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=postgresql1:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
/kafka/kafka_2.12-2.7.0/bin/kafka-server-start.sh
/kafka/kafka_2.12-2.7.0/config/server.properties
创建topic
cd /kafka/kafka_2.12-2.7.0/bin
bin目录下:
./kafka-topics.sh --create --zookeeper postgresql1:2181 --replication-factor 1 --partitions 1 --topic cyltopic
参数说明:
–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
–replication-factor:指定副本数量
–partitions:指定分区数量
–topic:主题名称
查看topic
./kafka-topics.sh --list --zookeeper postgresql1:2181

创建一个生产者
bin目录下:
./kafka-console-producer.sh --broker-list postgresql1:9092 --topic cyltopic

创建一个消费者
bin目录下:
./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning

8)源端ogg配置
抽取进程配置
添加抽取进程组:
add extract ext_test, TRANLOG, BEGIN NOWadd exttrail ./dirdat/te, EXTRACT ext_test, MEGABYTES 200
配置参数:
EXTRACT ext_test(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxxgettruncatesDISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024DBOPTIONS ALLOWUNUSEDCOLUMNREPORTCOUNT EVERY 1 MINUTES, RATEFETCHOPTIONS NOUSESNAPSHOTEXTTRAIL ./dirdat/teWILDCARDRESOLVE DYNAMICGETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESdynamicresolutiontable ogg.t1;
投递进程配置
添加投递进程组
ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/teADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200
配置参数:
EXTRACT dpe_testPASSTHRURMTHOST 192.168.20.101, MGRPORT 7809RMTTRAIL ./dirdat/teDYNAMICRESOLUTIONTABLE *.*;
9)def文件生成
ggsci>edit param defgen
内容
DEFSFILE dirdef/source.def, PURGE
USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxx
TABLE ogg.t1 ;
$ ./defgen paramfile dirprm/defgen.prm --shell命令

将生成的source.def文件拷贝到目标端dirdef目录下
10)目标端ogg配置
配置参数文件
拷贝ogg安装目录下示例参数文件
ls -trl /ogg/AdapterExamples/big-data/kafka-rw-r--r--. 1 ogg ogg 332 Jan 15 2020 rkafka.prm-rw-r--r--. 1 ogg ogg 261 Jan 15 2020 custom_kafka_producer.properties-rw-r--r--. 1 ogg ogg 1133 Apr 6 2020 kafka.propscp /ogg/AdapterExamples/big-data/kafka/* /ogg/dirprm
cd /ogg/dirprm
vi kafka.props
gg.handlerlist = kafkahandlergg.handler.kafkahandler.type = kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#gg.handler.kafkahandler.TopicName =cyltopicgg.handler.kafkahandler.topicMappingTemplate=cyltopicgg.handler.kafkahandler.format =avro_opgg.handler.kafkahandler.format=delimitedtextgg.handler.kafkahandler.format.fieldDelimiter=|gg.handler.kafkahandler.SchemaTopicName=mycyltopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.format.pkUpdateHandling=updategg.handler.kafkahandler.mode =op#gg.handler.kafkahandler.maxGroupSize =100, 1Mb#gg.handler.kafkahandler.minGroupSize =50, 500Kbgoldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4jgg.log.level=INFOgg.report.time=30sec#Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/kafka/kafka_2.12-2.7.0/libs/*#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
vi custom_kafka_producer.properties
bootstrap.servers=postgresql1:9092acks=1compression.type=gzipreconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializer# 100KB per partitionbatch.size=102400linger.ms=10000
创建复制进程
add replicat rep_test, exttrail ./dirdat/te
REPLICAT rep_testTARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsSOURCEDEFS dirdef/source.defREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP ogg.*, TARGET ogg.*;
验证测试
1)源端数据更新
begin
for i in 1..1000
loop
insert /*+append*/ into ogg.t1 values(i,i,'name2-'||i) ;
end loop;
commit;
end;
/
2)目标端kafka数据查询:
./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning

小结
Ogg 为将Oracle等关系型数据库的数据直接投递到Kafka后,可以直接使用flink等流式的数据处理框架对数据进行实时的分析处理,适配更多对数据有更多实时性要求的数据分析场景(如银行卡盗刷风控等),实现更丰富的业务内容。
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721