系统数据实时同步方案二落地
liebian365 2024-11-22 17:12 27 浏览 0 评论
本文:使用canal中间件订阅binlog增量数据。
原理:运用数据库主从复制原理。
一:canal介绍:
canal是阿里开源的一款基于 MySql 数据库 binlog 的增量订阅和消费组件,通过它可以订阅数据库的 binlog 日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。
Canal 主要用途是对 MySql 数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对 MySql 的增量数据进行实时同步,支持同步到 MySql、ElasticSearch、HBase 等数据存储中去。
原理:
- Canal 模拟 MySql Slave 的交互协议,伪装自己为 MySql Slave ,向 MySql Master 发送dump 协议。
- MySql Master 收到 dump 请求,开始推送 binary log 给 Slave (即 Canal )。
- Canal 解析 binary log 对象(原始为 byte 流)。
- Canal 对外提供增量数据订阅和消费,提供 Kafka、RocketMQ、RabbitMq、Es、Tcp 等组件来消费。
二:使用场景
- 同步缓存/更新es
- 任务下发
- 等等
三:落地实现
Canal 的各个组件 canal-server、canal-adapter、canal-admin。
下载地址:https://github.com/alibaba/canal/releases。
Canal 官方文档:https://github.com/alibaba/canal/wiki。
- canal-server(canal-deploy):可以直接监听 MySql 的 binlog,把自己伪装成 MySql 的从库,只负责接收数据,并不做处理。
- canal-adapter:相当于 Canal 的客户端,会从 canal-server 中获取数据,然后对数据进行同步,可以同步到 MySql、ElasticSearch 和 HBase 等存储中去。
- canal-admin:为 Canal 提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI 操作界面,方便更多用户快速和安全的操作。
- 首先先需要将数据库的binlog功能开启,并创建用户账号,同时赋予权限,这一步骤在上一篇文章有地址:https://mp.toutiao.com/profile_v4/graphic/preview?pgc_id=7430658491062239780
- 创建文件夹并解压canal
mkdir /usr/local/canal
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C /usr/local/canal/
- 修改配置文件canal.properties
vim /usr/local/canal/conf/canal.properties
主要修改内容为:
canal.port = 11111 #java程序连接端口
- 修改 instance.properties
vim /usr/local/canal/conf/example/instance.properties
## 不能与已有的 mysql 节点server-id重复
canal.instance.mysql.slaveId=1
## mysql地址
canal.instance.master.address=192.168.31.102:3306
## 指定连接 mysql 的用户密码
canal.instance.dbUsername=test
canal.instance.dbPassword=123456
## 字符集
canal.instance.connectionCharset = UTF-8
- 启动canal
cd /usr/local/canal/bin
./startup.sh
# 查看运行状态
## 关闭命令是 ./stop.sh
#查看日志数据
## 查看 server 日志
cat /usr/local/canal/logs/canal/canal.log
## 或者使用如下命令查看 instance 的日志
tail -f -n 100 /usr/local/canal/logs/example/example.log
核心代码:
public class CanalTest {
/**
* 1. 连接Canal服务器
* 2. 向Master请求dump协议
* 3. 把发送过来的binlog进行解析
* 4. 最后做实际的操作处理...发送到MQ Print...
* @param args
*/
public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.31.101", 11111), // 192.168.31.101是canal所在节点ip,11111是java连接canal的端口号
"example", "test", "123456");
int batchSize = 1000; // 拉取数据量
int emptyCount = 0;
try {
// 连接我们的canal服务器
connector.connect();
// 订阅什么内容? 什么库表的内容??
connector.subscribe(".*\\..*"); // 表示订阅所有的库和表
// 出现问题直接进行回滚操作
connector.rollback();
int totalEmptyCount = 1200;
while(emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 一次性拉取 1000 条数据,封装成一个 message
// batchId用于处理完数据后进行ACK提交动作
long batchId = message.getId();
int size = message.getEntries().size();
if(batchId == -1 || size == 0) {
// 没有拉取到数据
emptyCount++;
System.err.println("empty count: " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore..
}
} else {
// 有数据
emptyCount = 0;
System.err.printf("message[batchId=%s, size=%s] \n", batchId, size);
// 处理解析数据
printEnrty(message.getEntries());
}
// 确认提交处理后的数据
connector.ack(batchId);
}
System.err.println("empty too many times, exit");
} finally {
// 关闭连接
connector.disconnect();
}
}
private static void printEnrty(List<Entry> entries) {
for(Entry entry : entries) {
System.err.println("entry.getEntryType():"+entry.getEntryType());
// 如果EntryType 当前处于事务的过程中 那就不能处理
if(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
// rc里面包含很多信息:存储数据库、表、binlog
RowChange rc = null;
try {
// 二进制的数据
rc = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parser error!");
}
EventType eventType = rc.getEventType();
System.err.println(String.format("binlog[%s:%s], name[%s,%s], eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType));
// 真正的对数据进行处理
for(RowData rd : rc.getRowDatasList()) {
if(eventType == EventType.DELETE) {
// delete操作 BeforeColumnsList
List<Column> deleteList = rd.getBeforeColumnsList();
printColumn(deleteList, "删除前");
} else if(eventType == EventType.INSERT) {
// insert操作AfterColumnsList
List<Column> insertList = rd.getAfterColumnsList();
printColumn(insertList, "新增后");
}
// update
else {
List<Column> updateBeforeList = rd.getBeforeColumnsList();
printColumn(updateBeforeList, "修改前");
List<Column> updateAfterList = rd.getAfterColumnsList();
printColumn(updateAfterList, "修改后");
}
}
}
}
private static void printColumn(List<Column> columns, String operationMsg) {
System.err.println("CanalEntry.Column的个数:"+list.size());
for(Column column: columns) {
System.err.println("操作类型:" + operationMsg + "--"
+column.getName()
+ " : "
+ column.getValue()
+ ", update = "
+ column.getUpdated());
}
}
}
扩展:
java与 canal 结合的操作是单线程,性能上不是很好,没有消息堆积能力,并发量一上来性能支撑不住,所以需要将 canal 与 kafka 整合,将 mysql binlog 数据投递到 kafka 上,再经过消费端去处理。kafka的优点:稳定性好,性能好,高吞吐量,可以做流量削峰,缓存数据。使用kafka可以有消息堆积,缓存消息,高性能高吞吐。在处理大规模的数据时有很大优势。
mysql的binlog 有变更时,canal 会解析 binlog 日志,将解析的数据发送给 kafka,然后编写 消费端代码,处理 kafka 的消息。比如可以输出到 ES
- Canal 与 Kafka整合,需要配置canal.properties
vim /usr/local/canal/conf/canal.properties
主要修改内容为:
canal.serverMode = kafka # 选择kafka的推送模式,发送kafka消息,默认是 tcp
kafka.bootstrap.servers = 192.168.31.101:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 100
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 3
- 修改 instance.properties
vim /usr/local/canal/conf/example/instance.properties
主要修改内容如下:
# table regex 这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user
canal.instance.filter.regex=.*\\..*
####### mq config ######
# canal.mq.topic=example
# 动态topic,根据库名和表名生成 dynamic topic route by schema or table regex
canal.mq.dynamicTopic=.*\\..*
# table black regex
canal.instance.filter.black.regex=
#canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
# 根据 kafka 创建 topic时,默认生成的 partition 数量来设置,一般小于等于 kafka 的 partition 数量,我的kafka设置的为5,所以这里也设置5
canal.mq.partitionsNum=5
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
# .*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
canal.mq.partitionHash=.*\\..*:$pk$
相关推荐
- Linux-常用操作命令介绍(linux常用的命令大全)
-
1.帮助命令帮助命令1.1help命令语法格式:命令--help作用:查看某个命令的帮助信息示例#ls--help#netstat--help1.2man命令语法格式:man命令...
- 推荐:一个小而美的Java工具类库(java工具软件)
-
前言是的,你没看错,没看错,它就是hutool!相信很多做java开发的朋友应该都已经认识并使用过它了,今天带大家再重温一下它都有哪些功能,并以示例来看看hutool是如何简便实现JWT认...
- 【SpringBoot后端开发】第三部分 Linux操作系统常用命令(3)
-
创作不易,请帮忙转发、点赞和评论!四、Linux常用命令对于Linux系统来说,中央处理器、内存、磁盘驱动器、键盘、鼠标、用户等都是文件,而Linux系统管理的命令是它正常运行的核心,与之DOS命令类...
- linux常用命令在线查询工具(linux常用命令在线查询工具有哪些)
-
linuxvi编辑器常用命令linux查看iplinuxfind-name查找文件名linuxshelllinux查看端口占用linux删除文件命令linuxcp命令复制文件到另一个...
- 使用免费绿色工具chfs,将文件夹共享成网盘
-
需求:业务需求方有个需要将apk包上传到服务器中,通过chfs可以将服务器目录共享出来,可以可以登录后台自行上传apk文件包。本文就教大家三个知识点1.centos7下使用chfs,共享目录。2.使用...
- Mysql和Hive之间通过Sqoop进行数据同步
-
文章回顾理论大数据框架原理简介大数据发展历程及技术选型实践搭建大数据运行环境之一搭建大数据运行环境之二本地MAC环境配置CPU数和内存大小查看CPU数sysctlmachdep.cpu#核数为...
- 真实案例记录Linux被植入rootkit导致服务器带宽跑满的解决过程
-
一、关于linux下的rootkitrootkit是Linux平台下最常见的一种木马后门工具,它主要通过替换系统文件来达到攻击和和隐蔽的目的,这种木马比普通木马后门更加危险和隐蔽,普通的检测工...
- python周期任务调度工具Schedule使用详解
-
如果你想周期性地执行某个Python脚本,最出名的选择应该是Crontab脚本,但是Crontab具有以下缺点:不方便执行秒级任务。当需要执行的定时任务有上百个的时候,Crontab的管...
- Linux 系统日常巡检脚本(shell巡检脚本)
-
Linux系统日常巡检脚本,巡检内容包含了,磁盘,内存cpu进程文件更改用户登录等一系列的操作直接用就行了。报告以邮件发送到邮箱在log下生成巡检报告。#!/bin/bash#@Au...
- Schedule—简单实用的 Python 周期任务调度工具
-
如果你想周期性地执行某个Python脚本,最出名的选择应该是Crontab脚本,但是Crontab具有以下缺点:1.不方便执行秒级任务。2.当需要执行的定时任务有上百个的时候,Cronta...
- celery定时与异步任务详解(定时任务异步执行)
-
celery简介Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery的架构由三部分组成,消息中间件(messagebroke...
- 开源免费的定时任务管理系统:Gocron
-
Gocron:精准调度未来,你的全能定时任务管理工具!-精选真开源,释放新价值。概览Gocron是github上一个开源免费的定时任务管理系统。它使用Go语言开发,是一个轻量级定时任务集中调度和管理...
- PHP Laravel定时任务Schedule(laravel定时任务原理)
-
前提:本文方法是利用Linux的crontab定时任务来协助实现Laravel调度(Mac也一样)。一、首先添加Crontab定时任务,这里只做简单介绍:用命令crontab-e添加如下内容**...
- Linux的常用命令就是记不住,怎么办?于是推出了这套教程
-
1.帮助命令1.1help命令#语法格式:命令--help#作用:查看某个命令的帮助信息#示例:#ls--help查看ls命令的帮助信息#netst...
- 如何定期执行 Python 脚本:5 种常见方法
-
定期执行任务是自动化工作流程中的重要环节,无论是数据抓取、文件备份,还是定期报告生成,定时运行脚本都可以极大提高效率。本文将介绍五种方法,通过这些方法,你可以轻松设置定期执行Python脚本的任务...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)