梁铭图,新炬网络首席架构师。具有十余年数据库运维、数据库设计、数据治理以及系统规划建设经验,拥有Oracle OCM和ACE Director、Togaf企业架构师(鉴定级)、IBM CATE等认证,曾获dbaplus年度MVP、华为云MVP等荣誉,并参与数据资产管理国家标准的编写工作。在数据库运维管理和架构设计、运维体系规划、数据资产管理方面有深入研究。
背景
从GoldenGate 12.2开始,GoldenGate就支持直接投递数据到Kafka等平台,而不用通过Java二次开发。在数据复制过程中,GoldenGate充当Kafka Producer的角色,从Oraclet等关系型数据库解析增量数据,再实时往kafka平台写入。
实施过程
1)创建ogg账号,用来部署ogg软件
useradd -u 1300 -d /home/ogg ogg
usermod -a -G oinstall ogg
2)创建kafka账号,用来部署zookeeper和kafka
useradd -u 1400 -d /home/kafka kafka
3)源端oracle数据库创建测试表
create table ogg.t1
( ID1 number(8),
ID2 number(8),
info varchar(10),
constraint pk_t1 primary key(ID1) using index
);
4)源端ogg安装
下载for oracle ogg安装包:
修改安装文件
vi Disk1/responseoggcore.rsp
修改如下参数:
INSTALL_OPTION=ORA12c ---数据库版本
SOFTWARE_LOCATION=/ogg2 --安装目录
START_MANAGER=NO
UNIX_GROUP_NAME=oinstall
静默安装:
./runInstaller -silent -responseFile ./response/oggcore.rsp
5)目标端ogg安装
在目标端下载并安装Ogg for big data
6)安装zookeeper
解压ZK安装包,修改配置文件
vi /zooper/apache-zookeeper-3.7.0-bin/conf/zoo.cfg
tickTime = 2000
dataDir = /zooper/apache-zookeeper-3.7.0-bin/data
clientPort = 2181
initLimit = 5
syncLimit = 2
/zooper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED
显示STARTED后,表示zookeeper已启动完成。
7)安装kafka
解压kafka安装包,修改配置文件:
vi /kafka/kafka_2.12-2.7.0/config/server.properties
broker.id=0
listeners=PLAINTEXT://postgresql1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=postgresql1:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
/kafka/kafka_2.12-2.7.0/bin/kafka-server-start.sh
/kafka/kafka_2.12-2.7.0/config/server.properties
创建topic
cd /kafka/kafka_2.12-2.7.0/bin
bin目录下:
./kafka-topics.sh --create --zookeeper postgresql1:2181 --replication-factor 1 --partitions 1 --topic cyltopic
参数说明:
–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
–replication-factor:指定副本数量
–partitions:指定分区数量
–topic:主题名称
查看topic
./kafka-topics.sh --list --zookeeper postgresql1:2181
创建一个生产者
bin目录下:
./kafka-console-producer.sh --broker-list postgresql1:9092 --topic cyltopic
创建一个消费者
bin目录下:
./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning
8)源端ogg配置
抽取进程配置
添加抽取进程组:
add extract ext_test, TRANLOG, BEGIN NOW
add exttrail ./dirdat/te, EXTRACT ext_test, MEGABYTES 200
配置参数:
EXTRACT ext_test
(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxx
gettruncates
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024
DBOPTIONS ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY 1 MINUTES, RATE
FETCHOPTIONS NOUSESNAPSHOT
EXTTRAIL ./dirdat/te
WILDCARDRESOLVE DYNAMIC
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
dynamicresolution
table ogg.t1;
投递进程配置
添加投递进程组
ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/te
ADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200
配置参数:
EXTRACT dpe_test
PASSTHRU
RMTHOST 192.168.20.101, MGRPORT 7809
RMTTRAIL ./dirdat/te
DYNAMICRESOLUTION
TABLE *.*;
9)def文件生成
ggsci>edit param defgen
内容
DEFSFILE dirdef/source.def, PURGE
USERID ogg, PASSWORD xxxxxxxxxxxxxxxxxx
TABLE ogg.t1 ;
$ ./defgen paramfile dirprm/defgen.prm --shell命令
将生成的source.def文件拷贝到目标端dirdef目录下
10)目标端ogg配置
配置参数文件
拷贝ogg安装目录下示例参数文件
ls -trl /ogg/AdapterExamples/big-data/kafka
-rw-r--r--. 1 ogg ogg 332 Jan 15 2020 rkafka.prm
-rw-r--r--. 1 ogg ogg 261 Jan 15 2020 custom_kafka_producer.properties
-rw-r--r--. 1 ogg ogg 1133 Apr 6 2020 kafka.props
cp /ogg/AdapterExamples/big-data/kafka/* /ogg/dirprm
cd /ogg/dirprm
vi kafka.props
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#gg.handler.kafkahandler.TopicName =cyltopic
gg.handler.kafkahandler.topicMappingTemplate=cyltopic
gg.handler.kafkahandler.format =avro_op
gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|
gg.handler.kafkahandler.SchemaTopicName=mycyltopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.format.pkUpdateHandling=update
gg.handler.kafkahandler.mode =op
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/kafka/kafka_2.12-2.7.0/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
vi custom_kafka_producer.properties
bootstrap.servers=postgresql1:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
创建复制进程
add replicat rep_test, exttrail ./dirdat/te
REPLICAT rep_test
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
SOURCEDEFS dirdef/source.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP ogg.*, TARGET ogg.*;
验证测试
1)源端数据更新
begin
for i in 1..1000
loop
insert /*+append*/ into ogg.t1 values(i,i,'name2-'||i) ;
end loop;
commit;
end;
/
2)目标端kafka数据查询:
./kafka-console-consumer.sh --bootstrap-server postgresql1:9092 --topic cyltopic --from-beginning
小结
Ogg 为将Oracle等关系型数据库的数据直接投递到Kafka后,可以直接使用flink等流式的数据处理框架对数据进行实时的分析处理,适配更多对数据有更多实时性要求的数据分析场景(如银行卡盗刷风控等),实现更丰富的业务内容。
如果字段的最大可能长度超过255字节,那么长度值可能…
只能说作者太用心了,优秀
感谢详解
一般干个7-8年(即30岁左右),能做到年入40w-50w;有…
230721