OGG实现Oracle to kafka数据同步

梁铭图 2021-07-21 17:39:00
作者介绍

梁铭图,新炬网络首席架构师。具有十余年数据库运维、数据库设计、数据治理以及系统规划建设经验,拥有Oracle OCM和ACE Director、Togaf企业架构师(鉴定级)、IBM CATE等认证,曾获dbaplus年度MVP、华为云MVP等荣誉,并参与数据资产管理国家标准的编写工作。在数据库运维管理和架构设计、运维体系规划、数据资产管理方面有深入研究。

 

背景

 

从GoldenGate 12.2开始,GoldenGate就支持直接投递数据到Kafka等平台,而不用通过Java二次开发。在数据复制过程中,GoldenGate充当Kafka Producer的角色,从Oraclet等关系型数据库解析增量数据,再实时往kafka平台写入。

 

实施过程

 

1)创建ogg账号,用来部署ogg软件

 

useradd -u 1300 -d /home/ogg ogg

usermod -a -G oinstall ogg

 

2)创建kafka账号,用来部署zookeeper和kafka

 

useradd -u 1400 -d /home/kafka kafka

 

3)源端oracle数据库创建测试表

 

  •  
  •  
  •  
  •  
  •  
  •  
create table ogg.t1        ( ID1     number(8),          ID2    number(8),          info   varchar(10),          constraint pk_t1 primary key(ID1) using index        );

 

4)源端ogg安装

 

下载for oracle ogg安装包:

 

修改安装文件

 

vi Disk1/responseoggcore.rsp

 

修改如下参数:

 

INSTALL_OPTION=ORA12c    ---数据库版本

SOFTWARE_LOCATION=/ogg2  --安装目录

START_MANAGER=NO          

UNIX_GROUP_NAME=oinstall 

 

静默安装:

 

./runInstaller  -silent -responseFile ./response/oggcore.rsp

 

5)目标端ogg安装

 

在目标端下载并安装Ogg for big data

 

6)安装zookeeper

 

解压ZK安装包,修改配置文件

 

vi /zooper/apache-zookeeper-3.7.0-bin/conf/zoo.cfg

tickTime = 2000

dataDir = /zooper/apache-zookeeper-3.7.0-bin/data

clientPort = 2181

initLimit = 5

syncLimit = 2

 

/zooper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start

$ JMX enabled by default

$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg

$ Starting zookeeper ... STARTED



显示STARTED后,表示zookeeper已启动完成。

 

7)安装kafka

 

  • 解压kafka安装包,修改配置文件:

 

vi /kafka/kafka_2.12-2.7.0/config/server.properties

broker.id=0

listeners=PLAINTEXT://postgresql1:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=postgresql1:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0

 

/kafka/kafka_2.12-2.7.0/bin/kafka-server-start.sh
/kafka/kafka_2.12-2.7.0/config/server.properties

 

  • 创建topic

 

cd /kafka/kafka_2.12-2.7.0/bin

 

bin目录下:


./kafka-topics.sh --create --zookeeper postgresql1:2181 --replication-factor 1 --partitions 1 --topic cyltopic

 

参数说明: 

 

–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样 

–replication-factor:指定副本数量 

–partitions:指定分区数量 

–topic:主题名称

 

  • 查看topic

 

./kafka-topics.sh --list --zookeeper postgresql1:2181
 

 

  • 创建一个生产者

 

bin目录下:

 

./kafka-console-producer.sh --broker-list postgresql1:9092 --topic cyltopic

 


 

  • 创建一个消费者

 

bin目录下:

 

./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning

 


8)源端ogg配置

 

  • 抽取进程配置

 

添加抽取进程组:

 

  •  
  •  
add extract ext_test, TRANLOG, BEGIN NOWadd exttrail ./dirdat/te, EXTRACT ext_test, MEGABYTES 200

 

配置参数:

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
EXTRACT ext_test--Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxxgettruncatesDISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024DBOPTIONS  ALLOWUNUSEDCOLUMNREPORTCOUNT EVERY 1 MINUTES, RATEFETCHOPTIONS NOUSESNAPSHOTEXTTRAIL ./dirdat/teWILDCARDRESOLVE DYNAMICGETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESdynamicresolutiontable ogg.t1;

 

  • 投递进程配置

 

添加投递进程组

 

  •  
  •  
ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/teADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200

 

配置参数:

 

  •  
  •  
  •  
  •  
  •  
  •  
EXTRACT dpe_testPASSTHRURMTHOST 192.168.20.101, MGRPORT 7809RMTTRAIL ./dirdat/teDYNAMICRESOLUTIONTABLE *.*;

 

9)def文件生成

 

ggsci>edit param defgen

内容

DEFSFILE dirdef/source.def, PURGE

USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxx

TABLE ogg.t1 ;

 

$ ./defgen paramfile dirprm/defgen.prm  --shell命令



将生成的source.def文件拷贝到目标端dirdef目录下

 

10)目标端ogg配置

 

  • 配置参数文件

 

拷贝ogg安装目录下示例参数文件

 

  •  
  •  
  •  
  •  
  •  
ls -trl /ogg/AdapterExamples/big-data/kafka-rw-r--r--. 1 ogg ogg  332 Jan 15  2020 rkafka.prm-rw-r--r--. 1 ogg ogg  261 Jan 15  2020 custom_kafka_producer.properties-rw-r--r--. 1 ogg ogg 1133 Apr  6  2020 kafka.propscp /ogg/AdapterExamples/big-data/kafka/* /ogg/dirprm

 

cd /ogg/dirprm

vi kafka.props

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
gg.handlerlist = kafkahandlergg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#gg.handler.kafkahandler.TopicName =cyltopicgg.handler.kafkahandler.topicMappingTemplate=cyltopicgg.handler.kafkahandler.format =avro_opgg.handler.kafkahandler.format=delimitedtextgg.handler.kafkahandler.format.fieldDelimiter=|gg.handler.kafkahandler.SchemaTopicName=mycyltopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.format.pkUpdateHandling=update


gg.handler.kafkahandler.mode =op#gg.handler.kafkahandler.maxGroupSize =100, 1Mb#gg.handler.kafkahandler.minGroupSize =50, 500Kb 

goldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUE

gg.log=log4jgg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/kafka/kafka_2.12-2.7.0/libs/*

#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*javawriter.bootoptions=-Xmx512m 

-Xms32m -Djava.class.path=ggjava/ggjava.jar

 

vi custom_kafka_producer.properties

 

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
bootstrap.servers=postgresql1:9092acks=1compression.type=gzipreconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializer# 100KB per partitionbatch.size=102400linger.ms=10000

 

  • 创建复制进程

 

add replicat rep_test, exttrail ./dirdat/te

 

  •  
  •  
  •  
  •  
  •  
  •  
REPLICAT rep_testTARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsSOURCEDEFS dirdef/source.defREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP ogg.*, TARGET ogg.*;

 

验证测试

 

1)源端数据更新

 

begin

for i in 1..1000

loop

   insert /*+append*/ into ogg.t1 values(i,i,'name2-'||i) ;

   end loop;

   commit;

end;

/

 

2)目标端kafka数据查询:

 

./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning



 

小结

 

Ogg 为将Oracle等关系型数据库的数据直接投递到Kafka后,可以直接使用flink等流式的数据处理框架对数据进行实时的分析处理,适配更多对数据有更多实时性要求的数据分析场景(如银行卡盗刷风控等),实现更丰富的业务内容。

最新评论
访客 2024年04月08日

如果字段的最大可能长度超过255字节,那么长度值可能…

访客 2024年03月04日

只能说作者太用心了,优秀

访客 2024年02月23日

感谢详解

访客 2024年02月20日

一般干个7-8年(即30岁左右),能做到年入40w-50w;有…

访客 2023年08月20日

230721

活动预告