canal实战(二):使用canal-kafka实现数据库增量实时更新_桃花惜春风的博客-CSDN博客_canal根据更新时间筛选id


本站和网页 https://blog.csdn.net/xiaoyu_BD/article/details/82113861 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

canal实战(二):使用canal-kafka实现数据库增量实时更新_桃花惜春风的博客-CSDN博客_canal根据更新时间筛选id
canal实战(二):使用canal-kafka实现数据库增量实时更新
桃花惜春风
于 2018-08-27 16:43:50 发布
15287
收藏
20
分类专栏:
# kafka
canal
kafka深入理解
文章标签:
canal
kafka
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xiaoyu_BD/article/details/82113861
版权
kafka
同时被 3 个专栏收录
34 篇文章
2 订阅
订阅专栏
canal
7 篇文章
0 订阅
订阅专栏
kafka深入理解
15 篇文章
28 订阅
订阅专栏
canal是阿里的一款开源工具,纯java开发,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。
安装canal
下载安装包: https://github.com/alibaba/canal/releases canal.kafka-1.1.0.tar.gz
解压到固定目录:
tar -zxvf canal.kafka-1.1.0.tar.gz
修改配置
vi conf/example/instance.properties
#数据库地址
canal.instance.master.address=192.168.56.104:3306
#数据库用户密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#数据库字符集
canal.instance.connectionCharset=UTF-8
#默认数据库
canal.instance.defaultDatabaseName =zgai_db
vim /usr/local/canal/conf/canal.properties
#canalid
canal.id= 1
#canal地址
canal.ip=192.168.56.102
#zookeeper地址
canal.zkServers=192.168.56.102:2181
vim /usr/local/canal/conf/kafka.yml
#kafka地址
servers: 192.168.56.101:9092
# canal的批次大小,单位 k
canalBatchSize: 50
#topic
topic: mytopic
配置详细介绍
canal.properties
canal配置主要分为两部分定义:
instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等) |参数名字 |参数说明 |默认值| | ------------- |:-------------😐 -----😐 |canal.destinations| 当前server上部署的instance列表| 无 |canal.conf.dir| conf/目录所在的路径 |…/conf |canal.auto.scan| 开启instance自动扫描如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动b. instance目录删除:卸载对应instance配置,如已启动则进行关闭c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作 | true |canal.auto.scan.interval |instance自动扫描的间隔时间,单位秒| 5 |canal.instance.global.mode| 全局配置加载方式 |spring |canal.instance.global.lazy| 全局lazy模式 |false |canal.instance.global.manager.address| 全局的manager配置方式的链接信息| 无 |canal.instance.global.spring.xml| 全局的spring配置方式的组件文件 |classpath:spring/memory-instance.xml (spring目录相对于canal.conf.dir) |canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml… | instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式命名规则:canal.instance.{name}.xxx |无 |canal.instance.tsdb.spring.xml| v1.0.25版本新增,全局的tsdb配置方式的组件文件 |classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir) common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】
参数名字参数说明默认值canal.id每个canal server实例的唯一标识,暂无实际意义1canal.ip canalserver绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务无canal.portcanal server提供socket服务的端口11111canal.zkServerscanal server链接zookeeper集群的链接信息 例子:10.20.144.22:2181,10.20.144.51:2181无canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000canal.instance.memory.batch.modecanal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小MEMSIZEcanal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024canal.instance.transactionn.size最大事务完整解析的长度支持超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性1024canal.instance.fallbackIntervalInSecondscanal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢60canal.instance.detecting.enable是否开启心跳检查falsecanal.instance.detecting.sql心跳检查sql insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.interval.time 心跳检查频率,单位秒3canal.instance.detecting.retry.threshold心跳检查失败重试次数3canal.instance.detecting.heartbeatHaEnable心跳检查失败后,是否开启自动mysql自动切换说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据falsecanal.instance.network.receiveBufferSize网络链接参数,SocketOptions.SO_RCVBUF16384canal.instance.network.sendBufferSize网络链接参数,SocketOptions.SO_SNDBUF16384canal.instance.network.soTimeout网络链接参数,SocketOptions.SO_TIMEOUT30canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名truecanal.instance.filter.query.dcl是否忽略dcl语句falsecanal.instance.filter.query.dml是否忽略dml语句(mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)falsecanal.instance.filter.query.ddl是否忽略ddl语句falsecanal.instance.filter.table.error是否忽略binlog表结构获取失败的异常(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)falsecanal.instance.filter.rows是否dml的数据变更事件(主要针对用户只订阅ddl/dcl的操作)falsecanal.instance.filter.transaction.entry是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件falsecanal.instance.binlog.format支持的binlog format格式列表(otter会有支持format格式限制)ROW,STATEMENT,MIXEDcanal.instance.binlog.image支持的binlog image格式列表(otter会有支持format格式限制)FULL,MINIMAL,NOBLOBcanal.instance.get.ddl.isolationddl语句是否单独一个batch返回(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)falsecanal.instance.parser.parallel是否开启binlog并行解析模式(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)truecanal.instance.parser.parallelBufferSizebinlog并行解析的异步ringbuffer队列(必须为2的指数)256canal.instance.tsdb.enable是否开启tablemeta的tsdb能力truecanal.instance.tsdb.dir主要针对h2-tsdb.xml时对应h2文件的存放目录,默认conf/xx/h2.mv.dbcanal.instance.tsdb.urljdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)jdbc:h2: ${canal.instance.tsdb.dir} /h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsernamejdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)canalcanal.instance.tsdb.dbPasswordjdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)canalcanal.instance.rds.accesskeyaliyun账号的ak信息 (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)无canal.instance.rds.secretkeyaliyun账号的sk信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)无
instance.properties
参数名字参数说明默认值canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 (v1.1.x版本之后canal会自动生成,不需要手工指定)无canal.instance.master.addressmysql主库链接地址127.0.0.1:3306canal.instance.master.journal.namemysql主库链接时起始的binlog文件无canal.instance.master.positionmysql主库链接时起始的binlog偏移量无canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳无canal.instance.gtidon是否启用mysql gtid的订阅模式falsecanal.instance.master.gtidmysql主库链接时对应的gtid位点无canal.instance.dbUsernamemysql数据库帐号canalcanal.instance.dbPasswordmysql数据库密码canalcanal.instance.defaultDatabaseNamemysql链接时默认schema无canal.instance.connectionCharsetmysql 数据解析编码UTF-8canal.instance.filter.regexmysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子: 1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔).\…canal.instance.filter.black.regexmysql 数据解析表的黑名单,表达式规则见白名单的规则无canal.instance.rds.instanceIdaliyun rds对应的实例id信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)无
说明:
mysql链接时的起始位置
canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status) #####mysql解析关注表定义标准的Perl正则,注意转义时需要双斜杠:\ #####mysql链接的编码目前canal版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过filter.regex配置,将其拆分为多个canal instance,为每个instance指定不同的编码
更多文章关注公众号
更多:canal源码分析与应用专栏 —————————————————————————————————— 作者:桃花惜春风 转载请标明出处,原文地址: https://blog.csdn.net/xiaoyu_BD/article/details/82113861 如果感觉本文对您有帮助,您的支持是我坚持写作最大的动力,谢谢!
桃花惜春风
关注
关注
点赞
20
收藏
打赏
评论
canal实战(二):使用canal-kafka实现数据库增量实时更新
canal是阿里的一款开源工具,纯java开发,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。工作原理mysql主备复制实现从上层来看,复制分成三步:master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events
复制链接
扫一扫
专栏目录
超详细的canal使用总结
小飞的博客
10-12
8850
超详细的canal使用总结
canal的介绍
​canal,译意为水道/管道/沟渠,从官网的介绍中可以知道,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
​这是一张官网提供的示意图:
官网的解释最权威,我就直接引用一下官网的原话,另,附上官网地址:https://github.com/alibaba/canal
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
canal常见问题答疑【转载】
u013764793的博客
02-08
1355
canal常见问题
参与评论
您还未登录,请先
登录
后发表或查看评论
Canal 介绍和使用
最新发布
yy139926的博客
11-09
123
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
canal使用记录
dingxiaohuang4790的博客
09-17
1934
canal是阿里巴巴的来源项目。我们可以通过配置binlog实现数据库监控,得到数据库表或者数据的更新信息。参考我的文档前先去官网看下,可能已经支持更高版本的MySQL了
1. 查看官方开源项目
https://github.com/alibaba/canal
2. 下载最新的canal.deployer-XXXX-SNAPSHOT.tar.gz
https://github.c...
canal 入门(2)
qfzhaohan的博客
11-10
186
写个简单的Demo 去监听mysql 数据的变动
Jar包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
测试代码
package com.hq.eos.sync.client;
impo
Canal配置文件详解
zhou12314的专栏
03-28
8060
conf\example\instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0 //每个instance都会伪装成一个mysql slave...
canal入门,看这一篇就够了。
爪哇干货分享
09-04
610
整体结构
前言
我们在日常的开发中,为了业务查询以及业务实现的需要,不单止要保存在数据库中,还要同步保存到Elastic Search、HBase、Redis、MQ等等。
这时我注意到阿里开源的框架「Canal」,他可以很方便地「同步数据库的增量数据到其他的存储应用」。
什么是Canal
**canal [kə'næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是
canal 设置 timestamp
November_28的博客
04-16
2746
canal
阿里巴巴开源项目,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
工作原理
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始...
canal实战(一)|canal的配置安装与服务启动(含安装包)
神芷迦蓝寺
11-19
1827
## 本文章改编于阿里开源工具Canal,原版网址https://github.com/alibaba/canal
当你看到这篇文章时,说明你已经对canal有所理解,其主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。废话不多说,直接上流程
1 MySQL
1.1 安装
网上资源很多,暂不做详细说明,我给自己的博客打个广告吧:https://blog.csdn.net/mochou111/article/details/98174721
1.2 账号权限
...
canal第三篇:配置多个instance
04-20
1620
基于前两篇文章中的docker-composer.yaml调整
canal-server
在canal-server/conf/canal.properties 加入如下信息
……
# 多个使用 逗号 分割
canal.destinations = example,example2
……
instance.properties
在 canal-server/conf 创建文件夹
example
新建instance.properties
example2
新建instance.properties.
使用canal同步数据,踩坑排雷全过程
boyames的博客
05-04
2476
1、mysql配置
(1)检查binlog功能是否有开启
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
1 row in set (0.00 sec)
如果log_bin 显示为OFF则说明服务没有开启,需要修改Linux中
使用RabbitMQ+Canal实现Es商品更新功能
月哥说了算的博客
08-24
342
涉及微服务:
service_canal:负责监控数据库,当数据库有改动,使用rabbitq发送消息商品id。
service_search:负责操作Es,接收消息负责修改Es。
(1)引入pom
<dependency>
<groupId>org.springframework.boot</groupId>
&l...
canal修改同步位点解析
XUEJIA2S的博客
04-21
1万+
前言
在canal的数据同步使用过程中,有时会遇到需要修改同步位点的情况,这里对修改位点操作做一下记录。
分析
我们知道在canal-server的instance的配置文件中有一下配置项是与位点修改相关的,
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.times...
Canal原理介绍及安装详解
qq_654603797
07-01
205
Canal简介
概述:
​canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
背景:
早期的数据库同步业务,主要是基于trigger的方式获取增量变更。从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
canal--介绍/使用/配置
IT利刃出鞘的博客
04-19
1468
其他网址
Canal AdminGuide - agapple - ITeye博客
简介
canal官网
https://github.com/alibaba/canalhttps://github.com/alibaba/canal/wiki
canal原理
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump...
关于Canal的表过滤配置文件无法生效的问题
trancybao的博客
10-03
2180
Canal中的配置文件有几个坑,下面列举一下
1.这里的canal.instance.defaultDatabaseName并没有什么作用,至少我没有发现让它生效的办法。
2.如果要过滤数据,需要配置canal.instance.filter.regex
两个一个是白名单,一个是黑名单,我这里设置只需要test_push数据库下的所有表。
3.客户端的subscribe()方法会覆盖配置文件的配置,这个是配置不生效的元凶,但是我们不能直接不写这个方法,需要将其中条件改为空,直接为空默认会使用
超详细的Canal入门,看这篇就够了!
热门推荐
yehongzhi1994的博客
08-09
19万+
Canal入门看这篇就够了!
canal全量日志获取过程可能遇到的canal.instance.master.journal.name未起作用
jing2211728的博客
03-30
3749
canal使用过程中全量日志的使用
在使用canal进行数据库日志获取的时候,第一次获取需要获取全量日志。在instance.properties中是可以进行配置的。其中canal.instance.master.journal.name
canal.instance.master.position
canal.instance.master.timestamp几个属性可以控制获取日志时候的起始位...
canal数据同步搭建及踩坑记录
翻身咸鱼的博客
09-12
3万+
最近开始做canal数据同步方案。
canl的原理就是伪装成mysql的slave,然后悄悄地“偷数据”,复制数据吧。
至于canal的搭建,自行查看官网
mysql添加上配置并且赋上权限之后,最后canal的配置,一般只需要修改conf/example/instance.properties的这四个地方:
canal.instance.mysql.slaveId = 1234
ca...
Canal、Canaladmin、Canaladapter多instance数据库表双向同步,TCP/Zookeeper、Rocketmq配置
殷长庆的博客
07-02
1070
目录背景配置下载MySQL配置单机模式配置Canaladmin配置Instance配置Canal配置Canaladapter配置集群模式(ZK)Canaladmin配置Canal配置使用RocketMQCanaladmin配置 Instance配置Canaladapter配置两个MySQL数据库实例中,实现order表双向同步增删改操作准备一台Linux,用来安装canal环境,需要提前安装JDK和MySQL,安装JDK是为了启动canal这三个项目,这个是必要的安装MySQL是因为admin用到了cana
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:技术黑板
设计师:CSDN官方博客
返回首页
桃花惜春风
CSDN认证博客专家
CSDN认证企业博客
码龄6年
企业员工
136
原创
1万+
周排名
11万+
总排名
88万+
访问
等级
8609
积分
391
粉丝
645
获赞
109
评论
445
收藏
私信
关注
热门文章
kafka如何直接查看log文件中的信息
47074
报错:com.alibaba.fastjson.JSONException: write javaBean error
46312
kafka报错(三):Failed to construct kafka consumer
45044
logstash如何将日志中字符串类型的时间转化成@timestamp
37878
kafka如何删除group
26949
分类专栏
canal源码分析与应用
6篇
Elasticsearch深入理解
31篇
kafka深入理解
15篇
ELK
6篇
日志系统
2篇
kafka
34篇
flume
1篇
logstash
7篇
开发语言
java
12篇
springboot
1篇
Scala
2篇
python
2篇
Java Servlet
1篇
前端
2篇
vue
2篇
游记
1篇
Spark
10篇
canal
7篇
hadoop
5篇
linux
3篇
数据库
mysql
8篇
oracle
3篇
破解
Lucene
2篇
最新评论
Elasticsearch原理(三):写入流程
qq_41544746:
大佬 ElasticSearch原理4 没找到啊
logstash如何将日志中字符串类型的时间转化成@timestamp
双木林。:
大神你们好,我的日志内容是:2022-07-06 17:53:54:670 [ip:10.8.0.1,appname:os9000] [AsyncResolver-bootstrap-executor-0] INFO c.n.d.shared.resolver.aws.ConfigClusterResolver - Resolving eureka endpoints via configuration
然后我的配置是:filter {
grok {
match => {
"message" => "%{IP:ip}"
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss:SSS"]
target => "@timestamp"
还是收集不到时间日志里面的时间,es里面的是:"@timestamp" : "2022-07-06T15:57:35.412Z",
"path" : "/data/share-storage/logs/cmsp/system-log/gitlab/warm/intelligent-audit.log.2022-07-06.0.WARN.log",
"message" : "2022-07-06 23:54:15:375 [ip:10.8.0.1,appname:system-log] [main] WARN o.s.c.l.c.LoadBalancerCacheAutoConfiguration$LoadBalancerCaffeineWarnLogger - Spring Cloud LoadBalancer is currently working with default default cache. You can switch to using Caffeine cache, by adding it to the classpath.",
"host" : "1c77d787ac68",
"ip" : "10.8.0.1",
"@version" : "1",
kafka报错(三):Failed to construct kafka consumer
weixin_44384275:
肥肠感谢
Elasticsearch干货(二):index、create、update区别
Faithforus:
了然
kafka报错(三):Failed to construct kafka consumer
奈何桥的轮回mm:
有用,看了一下果然是application.yml文件生产者和消费者序列化配置错误导致的
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
Elasticsearch源码(二):scroll原理
Logstash安装与入门
Logstash基础
2022年6篇
2021年6篇
2020年6篇
2019年7篇
2018年72篇
2016年39篇
目录
目录
分类专栏
canal源码分析与应用
6篇
Elasticsearch深入理解
31篇
kafka深入理解
15篇
ELK
6篇
日志系统
2篇
kafka
34篇
flume
1篇
logstash
7篇
开发语言
java
12篇
springboot
1篇
Scala
2篇
python
2篇
Java Servlet
1篇
前端
2篇
vue
2篇
游记
1篇
Spark
10篇
canal
7篇
hadoop
5篇
linux
3篇
数据库
mysql
8篇
oracle
3篇
破解
Lucene
2篇
目录
评论
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
桃花惜春风
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值