canal发送数据到kafka_tom_fans的博客-CSDN博客_canal发kafka


本站和网页 https://blog.csdn.net/tom_fans/article/details/89390335 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

canal发送数据到kafka_tom_fans的博客-CSDN博客_canal发kafka
canal发送数据到kafka
tom_fans
于 2019-04-18 23:37:30 发布
10806
收藏
16
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/tom_fans/article/details/89390335
版权
1. canal安装
官方文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
版本: canal 1.1.3 , JDK 1.8+ ,MySQL 5.7 
软件下载之后解压缩,有2个配置文件需要更改:
canal.properties
example/instance.properties
再修改之前,先理解这2个文件的作用,canal.properties主要包含通用配置,实例配置,MQ/kafka配置,其他的配置保持默认即可。
通用配置我在下面做了注释,基本也就需要修改那些东西,那么什么是实例配置? 所谓实例的概念是一个canal可以同时获取多台机器的数据库binlog,那么很显然每个数据库的position,数据库名字都不一样,canal为了把配置分开,引入实例的概念。简单理解就是,你每抽取一个数据库的binlog,都可以作为一个实例有单独的配置文件。
比如我有2台机器,每台机器有4个库,现在我想通过一个canal来获取binlog, 也许你会说,那我用2个canal客户端不就可以,当然这是最好的办法,但是如果你想通过一个canal来解决,那就需要建立2个实例文件。
canal.destinations = example,example1
然后在conf目录也要有2个目录example和example1. 
canal.properties:
#################################################
######### common argument #############
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
#zookeeper配置
canal.zkServers = 10.40.2.94:2181,10.40.2.95:2181,10.40.2.96:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
#默认为TCP,也就是你通过官方的example可以在终端查看数据,我们修改为kafka
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
# 关于tsdb概念,建议看一下官方文档,大概意思是canal在获取DDL的时候有可能获取的是错误的,
# 那么为了解决这个问题,所有DDL都会直接解析,重新生成到meta里面,我们可以用MySQL去保存,默认是H2
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#保存meta的数据库地址和数据库名称,用户密码
canal.instance.tsdb.url = jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername = admin
canal.instance.tsdb.dbPassword = internal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
#canal实例
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
#kafka地址
canal.mq.servers = 10.40.2.94:9092,10.40.2.94:9093,10.40.2.94:9094
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =
example/instance.properties:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.40.2.175:3306
canal.instance.master.journal.name= mysql-bin.000006
canal.instance.master.position= 123439266
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=admin
canal.instance.tsdb.dbPassword=internal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=admin
canal.instance.dbPassword=internal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
# 数据库及表过滤,这里我只抽取sourcedb的日志
canal.instance.filter.regex=sourcedb\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
# MQ/KAFka TOPIC配置
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
上述2个文件修改好了之后启动服务:
bin/startup.sh
如果你设置的canal.serverMode=TCP,那么默认端口为11111, 如果你设置servermode为kafka,这个端口也就不存在了。
2. 卡夫卡配置及注意事项
canal默认使用的是1.1.1 scala 2.11版本的kafka。
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
我的环境默认是1.0.1,测试多次都失败,然后我又用最新版2.1.0版本,也是测试失败。 无奈只能安装和canal一样的开源版本1.1.1 scala 2.11,这个地方浪费我了很多时间去检查为什么,最后还是版本问题。
另外canal提供的example 是可以使用的,但是提供的代码完全不可用,估计是哪里写的有问题。
下面就是测试kafka是否收到数据,这个地方也卡了我好久,因为我一直用 bin/kafka-console-consumer.sh , 这个命令不报错,但是根本不会消费任何数据,最后使用如下命令才正常。
kafka消费命令:
bin/kafka-simple-consumer-shell.sh --broker-list 10.40.2.94:9092 --topic example
{"data":null,"database":"sourcedb","es":1555595121000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table jlwang4(id int)/*oggddlversion=144*/","sqlType":null,"table":"jlwang4","ts":1555595325843,"type":"CREATE"}
{"data":null,"database":"sourcedb","es":1555595360000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table jlwang5(id int)/*oggddlversion=154*/","sqlType":null,"table":"jlwang5","ts":1555595361084,"type":"CREATE"}
{"data":null,"database":"sourcedb","es":1555595549000,"id":7,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table jlwang6(id int)/*oggddlversion=156*/","sqlType":null,"table":"jlwang6","ts":1555595549508,"type":"CREATE"}
{"data":null,"database":"sourcedb","es":1555596117000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table jlwan8(id int)/*oggddlversion=168*/","sqlType":null,"table":"jlwan8","ts":1555596117855,"type":"CREATE"}
{"data":null,"database":"sourcedb","es":1555597933000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table jlwan9(id int)/*oggddlversion=174*/","sqlType":null,"table":"jlwan9","ts":1555597933438,"type":"CREATE"}
{"data":null,"database":"sourcedb","es":1555599029000,"id":22,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table jlwan10(id int)/*oggddlversion=176*/","sqlType":null,"table":"jlwan10","ts":1555599029681,"type":"CREATE"}
{"data":[{"id":"100"}],"database":"sourcedb","es":1555599089000,"id":24,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"jlwan9","ts":1555599089375,"type":"INSERT"}
canal大部分配置不需要改,所以整体比较简单就能配置好,感觉这个东西还不如maxwell做的好。
tom_fans
关注
关注
点赞
16
收藏
打赏
19
评论
canal发送数据到kafka
1. canal安装官方文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart版本: canal 1.1.3 , JDK 1.8+ ,MySQL 5.7软件下载之后解压缩,有2个配置文件需要更改:canal.propertiesexample/instance.properties再...
复制链接
扫一扫
Flink1.8实时数仓项目实战
08-26
<p>
《Flink1.8实时数仓项目实战》课程主要基于Flink**稳定版本进行讲解,课程包含Flink DataSet、容错、Connector、Flink SQL以及实时数仓项目。希望通过本课程的学习,大家能快速掌握Flink 批处理和Flink SQL,并通过实时数仓项目快速积累项目经验。
</p>
<div>
<br />
</div>
<p>
<br />
</p>
<p>
<br />
</p>
阿里Canal从入门到实战
07-09
本课程基于阿里巴巴 MySQL binlog 增量订阅&消费组件,对Canal进行全面系统的讲解。课程先介绍Canal在电商运行项目中的架构设计与解决方案,然后详细讲解Canal架构原理以及Canal HA工作机制,最后通过Canal单节点Server,Canal HA集群,Canal集成Kafka三个案例进行实操,从而快速、深入掌握阿里Canal技术,实现数据实时增量采集.<br />
评论 19
您还未登录,请先
登录
后发表或查看评论
配置过得canal包,直接解压,稍微修改一下即可用
12-21
基于实时抽取mysql的binlog日志到kafka的组件,已经配好,稍微修改即可使用。
canal实战(一):canal连接kafka实现实时同步mysql数据
热门推荐
桃花惜春风
09-03
1万+
前面已经介绍过了canal-kafka的应用。canal-kafka是把kafka作为客户端,嵌入到canal中,并且在canal基础上对源码进行了修改,以达到特定的实现canal到kafka的传送。
canal-kafka是阿里云最近更新的一个新的安装包。主要功能是实现canal与kafka的对接,实现海量的消息传输同步。在canal-kafka中,消息是以ByteString进行传输...
现实生产中使用Canal+kafka做数据同步的一次记录
Yikong的博客
12-03
831
业务场景
需求:实时同步数据库(Mysql)数据到第三方公司、另一个数据库
方案
一、 数据同步操作嵌入业务代码块
优点:实现简单。
缺点:业务代码整体耦合性变高。如果同步到第三方公司的数据是有筛选的条件的,还会影响本身业务系统的性能。
二、 多搞一个数据库,读写分离,专门用做数据同步。
优点:较于方案一耦合性降低。不影响本身业务系统的正常运转
缺点:如果本身没有读写分离的需求,为了同步数据给第三方而增加一台机器。增加了成本且有点浪费资源。实时性得不到保证
三、 主从同步加消息队列实现同步
第三种方
【Canal】canal简介
最新发布
qq_41466440的博客
11-04
236
【Canal】canal简介
canal学习二:canal消息推送到kafka
菜鸟日常
11-21
5298
本文主要记录如何配置使得 canal 消息推送到 kafka 并被消费的过程。
canal+kafka实践——实时etl
小黄鸭的博客
09-04
1万+
canal解析sql数据库的binlog并格式化数据,然后同步到kafka消息,可以用来实现实时etl
yml:
spring:
application:
name: canal
canal:
topic-prefix: etl_timely.
destination:
example: 0
username:
password:...
Canal安装与配置,推送数据到kafka
qq_37074682的博客
09-23
922
Canal安装与配置,并推送数据到kafka
Canal adapter1.1.5安装部署配置(3)
进击的小白
01-15
4882
使用前必须先安装Canal server,Canal最新1.1.4版安装部署(1)
安装
版本根据情况自行调整,最新版本参考:https://github.com/alibaba/canal/releases
#进入安装包
cd /opt
#下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
#创建安装目录
mkdir canal-adapter
Canal 监听 Mysql 自动写入 Kafka 并消费 配置 Windows 版
weixin_44466075的博客
03-18
2449
搭建环境使用的版本如下:
mysql-installer-community-8.0.20.0.msi
jdk1.8.0_251
apache-zookeeper-3.6.2-bin
kafka_2.13-2.7.0
canal.deployer-1.1.5-SNAPSHOT
Zookeeper 配置
下载完成后解压结构目录如下:
复制一份conf/zoo_sample.cfg改为zoo.cfg
编辑配置文件,主要配置代码如下:
# 日志以及数据存储路径自己定义
dataDir=D:\kaifa\env\
canal 投递 数据 只进kafka 0分区
weixin_43564627的博客
07-21
228
canal 投递 数据 只进kafka 0分区
首先要确保其余kafka 有多个分区
在 instance.properties 加入以下配置即可 二者缺一不可
下面一段摘至官网:
mq顺序性问题
1.canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
2.canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topi
canal -kafka快速实践
暮霭层层楚天阔的博客
07-14
357
https://cloud.tencent.com/developer/article/1579110
canal配合kafka使用
qq_16248977的博客
09-16
2300
一、部署和安装
部署zookeeper
zookeeper: Zookeeper QuickStart
部署kafka
kafka: Kafka QuickStart
部署canal
canal: Canal部署和使用
二、修改配置
修改instance的配置(可参考一、中的《Canal的部署和使用》)
# 按需修改成自己的数据库信息
##########################...
canal与kafka的配置
oTianShiZaiChangGe1的博客
04-10
929
参考内容:Canal Kafka RocketMQ QuickStart
canal实战(三)|canal数据消费到kafka
神芷迦蓝寺
11-20
1430
## 本文章改编于阿里开源工具Canal,原版网址https://github.com/alibaba/canal
canal第一辑canal的配置安装与服务启动
canal第二辑canal java客户端
canal的原理我们在第一辑里已经介绍,现在我们需要把canal采集到的日志数据消费到kafka,并通过kafka把数据进行进一步入库,上云等消费操作。
首先我们把Kafka进行安装配置,涉及到的有zookeeper,kafka,有的服务器还要装java环境等,kafka的安装我之前有讲...
Canal系列2-Canal同步到Kafka
只是甲的博客
03-25
2998
文章目录一. MySQL的准备1.1 binlog格式1.2 创建库表1.3 赋权限二. Canal安装及配置2.1 Canal下载及安装https://github.com/alibaba/canal/releases2.2 修改 canal.properties 的配置2.3 修改 instance.properties2.4 启动 Canal2.5 看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题2.6 启动 Kafka 消费客户端测试,查看消费情况2.7 向
08.canal+kafka同步数据消息顺序一致性问题
陌陌龙的博客
06-22
580
08.canal+kafka同步数据消息顺序一致性问题
canal+kafka同步数据库说明
qq_37729767的博客
07-01
528
最近使用canal+kafka同步数据库的数据,在此记录一下配置修改
确保mysql开启bin-log,模式为ROW;
创建canal用户:
CREATE USER canal IDENTIFIED BY ‘1qaz@WSX’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
FLUSH PRIVILEGES;
ALTER USER ‘canal’@’%’ IDENTIFIED WITH mysql_n
MySQL连接数据源常用的配置方式
hkl_Forever的博客
07-27
1922
六、相对完善的连接写法。
Canal 与kafka
weixin_40544053的博客
04-30
766
近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存、更新或者软删除)到一个另一个数据源,持久化之前需要清洗数据并且构建一个相对合理的便于后续业务数据统计、标签系统构建等扩展功能的数据模型。基于当前团队的资源和能力,优先调研了Alibaba开源中间件Canal的使用。
这篇文章简单介...
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:编程工作室
设计师:CSDN官方博客
返回首页
tom_fans
CSDN认证博客专家
CSDN认证企业博客
码龄5年
暂无认证
96
原创
4万+
周排名
144万+
总排名
23万+
访问
等级
3203
积分
90
粉丝
60
获赞
104
评论
242
收藏
私信
关注
热门文章
物联网数据采集处理架构
28017
Apache atlas集成CDH管理元数据
13301
Harbor/Docker: x509: certificate signed by unknown authority
12466
算法(3) 移动平均算法 moving average
11691
Kettle增量同步数据
11582
分类专栏
Docker
5篇
k8s
5篇
Flink
7篇
Hadoop
15篇
Hbase
15篇
Spark
8篇
Hive
2篇
oozie
1篇
Storm
4篇
Flume
2篇
Sqoop
Mapreduce
8篇
Kafka
9篇
最新评论
canal发送数据到kafka
一切都是命:
用阿里的最新版本1.1.6试试
HIVE外部表到底损失多少性能
妖果yaoyao:
他是想说明有什么在阻碍着hive查询hbase外部表会很慢,在于是不是hive要扫描hbase全部数据,并io到hive中,貌似关键也在这个io传输中
HIVE外部表到底损失多少性能
卢说:
啥玩意
Harbor/Docker: x509: certificate signed by unknown authority
hn698:
测试发现ca.crt, domain.com.key domain.com.cert都需要拷贝过来。
canal发送数据到kafka
bug 像被子里的舒服:
大佬 请问一下 为啥我配置了kafka 就telnet不通啊
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
Harbor/Docker: x509: certificate signed by unknown authority
Harbor高可用方案
Harbor单机安装
2020年18篇
2019年17篇
2018年21篇
2017年40篇
目录
目录
分类专栏
Docker
5篇
k8s
5篇
Flink
7篇
Hadoop
15篇
Hbase
15篇
Spark
8篇
Hive
2篇
oozie
1篇
Storm
4篇
Flume
2篇
Sqoop
Mapreduce
8篇
Kafka
9篇
目录
评论 19
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
tom_fans
谢谢打赏
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值