系统数据实时同步方案二落地
liebian365 2024-11-22 17:12 18 浏览 0 评论
本文:使用canal中间件订阅binlog增量数据。
原理:运用数据库主从复制原理。
一:canal介绍:
canal是阿里开源的一款基于 MySql 数据库 binlog 的增量订阅和消费组件,通过它可以订阅数据库的 binlog 日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。
Canal 主要用途是对 MySql 数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对 MySql 的增量数据进行实时同步,支持同步到 MySql、ElasticSearch、HBase 等数据存储中去。
原理:
- Canal 模拟 MySql Slave 的交互协议,伪装自己为 MySql Slave ,向 MySql Master 发送dump 协议。
- MySql Master 收到 dump 请求,开始推送 binary log 给 Slave (即 Canal )。
- Canal 解析 binary log 对象(原始为 byte 流)。
- Canal 对外提供增量数据订阅和消费,提供 Kafka、RocketMQ、RabbitMq、Es、Tcp 等组件来消费。
二:使用场景
- 同步缓存/更新es
- 任务下发
- 等等
三:落地实现
Canal 的各个组件 canal-server、canal-adapter、canal-admin。
下载地址:https://github.com/alibaba/canal/releases。
Canal 官方文档:https://github.com/alibaba/canal/wiki。
- canal-server(canal-deploy):可以直接监听 MySql 的 binlog,把自己伪装成 MySql 的从库,只负责接收数据,并不做处理。
- canal-adapter:相当于 Canal 的客户端,会从 canal-server 中获取数据,然后对数据进行同步,可以同步到 MySql、ElasticSearch 和 HBase 等存储中去。
- canal-admin:为 Canal 提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI 操作界面,方便更多用户快速和安全的操作。
- 首先先需要将数据库的binlog功能开启,并创建用户账号,同时赋予权限,这一步骤在上一篇文章有地址:https://mp.toutiao.com/profile_v4/graphic/preview?pgc_id=7430658491062239780
- 创建文件夹并解压canal
mkdir /usr/local/canal
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C /usr/local/canal/
- 修改配置文件canal.properties
vim /usr/local/canal/conf/canal.properties
主要修改内容为:
canal.port = 11111 #java程序连接端口
- 修改 instance.properties
vim /usr/local/canal/conf/example/instance.properties
## 不能与已有的 mysql 节点server-id重复
canal.instance.mysql.slaveId=1
## mysql地址
canal.instance.master.address=192.168.31.102:3306
## 指定连接 mysql 的用户密码
canal.instance.dbUsername=test
canal.instance.dbPassword=123456
## 字符集
canal.instance.connectionCharset = UTF-8
- 启动canal
cd /usr/local/canal/bin
./startup.sh
# 查看运行状态
## 关闭命令是 ./stop.sh
#查看日志数据
## 查看 server 日志
cat /usr/local/canal/logs/canal/canal.log
## 或者使用如下命令查看 instance 的日志
tail -f -n 100 /usr/local/canal/logs/example/example.log
核心代码:
public class CanalTest {
/**
* 1. 连接Canal服务器
* 2. 向Master请求dump协议
* 3. 把发送过来的binlog进行解析
* 4. 最后做实际的操作处理...发送到MQ Print...
* @param args
*/
public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.31.101", 11111), // 192.168.31.101是canal所在节点ip,11111是java连接canal的端口号
"example", "test", "123456");
int batchSize = 1000; // 拉取数据量
int emptyCount = 0;
try {
// 连接我们的canal服务器
connector.connect();
// 订阅什么内容? 什么库表的内容??
connector.subscribe(".*\\..*"); // 表示订阅所有的库和表
// 出现问题直接进行回滚操作
connector.rollback();
int totalEmptyCount = 1200;
while(emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 一次性拉取 1000 条数据,封装成一个 message
// batchId用于处理完数据后进行ACK提交动作
long batchId = message.getId();
int size = message.getEntries().size();
if(batchId == -1 || size == 0) {
// 没有拉取到数据
emptyCount++;
System.err.println("empty count: " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore..
}
} else {
// 有数据
emptyCount = 0;
System.err.printf("message[batchId=%s, size=%s] \n", batchId, size);
// 处理解析数据
printEnrty(message.getEntries());
}
// 确认提交处理后的数据
connector.ack(batchId);
}
System.err.println("empty too many times, exit");
} finally {
// 关闭连接
connector.disconnect();
}
}
private static void printEnrty(List<Entry> entries) {
for(Entry entry : entries) {
System.err.println("entry.getEntryType():"+entry.getEntryType());
// 如果EntryType 当前处于事务的过程中 那就不能处理
if(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
// rc里面包含很多信息:存储数据库、表、binlog
RowChange rc = null;
try {
// 二进制的数据
rc = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parser error!");
}
EventType eventType = rc.getEventType();
System.err.println(String.format("binlog[%s:%s], name[%s,%s], eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType));
// 真正的对数据进行处理
for(RowData rd : rc.getRowDatasList()) {
if(eventType == EventType.DELETE) {
// delete操作 BeforeColumnsList
List<Column> deleteList = rd.getBeforeColumnsList();
printColumn(deleteList, "删除前");
} else if(eventType == EventType.INSERT) {
// insert操作AfterColumnsList
List<Column> insertList = rd.getAfterColumnsList();
printColumn(insertList, "新增后");
}
// update
else {
List<Column> updateBeforeList = rd.getBeforeColumnsList();
printColumn(updateBeforeList, "修改前");
List<Column> updateAfterList = rd.getAfterColumnsList();
printColumn(updateAfterList, "修改后");
}
}
}
}
private static void printColumn(List<Column> columns, String operationMsg) {
System.err.println("CanalEntry.Column的个数:"+list.size());
for(Column column: columns) {
System.err.println("操作类型:" + operationMsg + "--"
+column.getName()
+ " : "
+ column.getValue()
+ ", update = "
+ column.getUpdated());
}
}
}
扩展:
java与 canal 结合的操作是单线程,性能上不是很好,没有消息堆积能力,并发量一上来性能支撑不住,所以需要将 canal 与 kafka 整合,将 mysql binlog 数据投递到 kafka 上,再经过消费端去处理。kafka的优点:稳定性好,性能好,高吞吐量,可以做流量削峰,缓存数据。使用kafka可以有消息堆积,缓存消息,高性能高吞吐。在处理大规模的数据时有很大优势。
mysql的binlog 有变更时,canal 会解析 binlog 日志,将解析的数据发送给 kafka,然后编写 消费端代码,处理 kafka 的消息。比如可以输出到 ES
- Canal 与 Kafka整合,需要配置canal.properties
vim /usr/local/canal/conf/canal.properties
主要修改内容为:
canal.serverMode = kafka # 选择kafka的推送模式,发送kafka消息,默认是 tcp
kafka.bootstrap.servers = 192.168.31.101:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 100
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 3
- 修改 instance.properties
vim /usr/local/canal/conf/example/instance.properties
主要修改内容如下:
# table regex 这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user
canal.instance.filter.regex=.*\\..*
####### mq config ######
# canal.mq.topic=example
# 动态topic,根据库名和表名生成 dynamic topic route by schema or table regex
canal.mq.dynamicTopic=.*\\..*
# table black regex
canal.instance.filter.black.regex=
#canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
# 根据 kafka 创建 topic时,默认生成的 partition 数量来设置,一般小于等于 kafka 的 partition 数量,我的kafka设置的为5,所以这里也设置5
canal.mq.partitionsNum=5
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
# .*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
canal.mq.partitionHash=.*\\..*:$pk$
相关推荐
- 4万多吨豪华游轮遇险 竟是因为这个原因……
-
(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...
- “菜鸟黑客”必用兵器之“渗透测试篇二”
-
"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...
- 科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白
-
作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...
- 麦子陪你做作业(二):KEGG通路数据库的正确打开姿势
-
作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...
- 知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势
-
智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...
- 每日新闻播报(September 14)_每日新闻播报英文
-
AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...
- 香港新巴城巴开放实时到站数据 供科技界研发使用
-
中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...
- 5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper
-
本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...
- Qt动画效果展示_qt显示图片
-
今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...
- 如何从0到1设计实现一门自己的脚本语言
-
作者:dong...
- 三年级语文上册 仿写句子 需要的直接下载打印吧
-
描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...
- C++|那些一看就很简洁、优雅、经典的小代码段
-
目录0等概率随机洗牌:1大小写转换2字符串复制...
- 二年级上册语文必考句子仿写,家长打印,孩子照着练
-
二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)