百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

系统数据实时同步方案二落地

liebian365 2024-11-22 17:12 18 浏览 0 评论

本文:使用canal中间件订阅binlog增量数据。

原理:运用数据库主从复制原理。

一:canal介绍:

canal是阿里开源的一款基于 MySql 数据库 binlog 的增量订阅和消费组件,通过它可以订阅数据库的 binlog 日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。

Canal 主要用途是对 MySql 数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对 MySql 的增量数据进行实时同步,支持同步到 MySql、ElasticSearch、HBase 等数据存储中去。



原理:

  • Canal 模拟 MySql Slave 的交互协议,伪装自己为 MySql Slave ,向 MySql Master 发送dump 协议。
  • MySql Master 收到 dump 请求,开始推送 binary log 给 Slave (即 Canal )。
  • Canal 解析 binary log 对象(原始为 byte 流)。
  • Canal 对外提供增量数据订阅和消费,提供 Kafka、RocketMQ、RabbitMq、Es、Tcp 等组件来消费。

二:使用场景

  • 同步缓存/更新es
  • 任务下发
  • 等等

三:落地实现

Canal 的各个组件 canal-server、canal-adapter、canal-admin。

下载地址:https://github.com/alibaba/canal/releases。

Canal 官方文档:https://github.com/alibaba/canal/wiki。

  • canal-server(canal-deploy):可以直接监听 MySql 的 binlog,把自己伪装成 MySql 的从库,只负责接收数据,并不做处理。
  • canal-adapter:相当于 Canal 的客户端,会从 canal-server 中获取数据,然后对数据进行同步,可以同步到 MySql、ElasticSearch 和 HBase 等存储中去。
  • canal-admin:为 Canal 提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI 操作界面,方便更多用户快速和安全的操作。
  1. 首先先需要将数据库的binlog功能开启,并创建用户账号,同时赋予权限,这一步骤在上一篇文章有地址:https://mp.toutiao.com/profile_v4/graphic/preview?pgc_id=7430658491062239780
  2. 创建文件夹并解压canal
mkdir /usr/local/canal
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C /usr/local/canal/
  1. 修改配置文件canal.properties
vim /usr/local/canal/conf/canal.properties

主要修改内容为:
canal.port = 11111  #java程序连接端口
  1. 修改 instance.properties
vim /usr/local/canal/conf/example/instance.properties

## 不能与已有的 mysql 节点server-id重复
canal.instance.mysql.slaveId=1
## mysql地址
canal.instance.master.address=192.168.31.102:3306

## 指定连接 mysql 的用户密码
canal.instance.dbUsername=test 
canal.instance.dbPassword=123456
## 字符集
canal.instance.connectionCharset = UTF-8 
  1. 启动canal
cd /usr/local/canal/bin
./startup.sh
# 查看运行状态
## 关闭命令是 ./stop.sh

#查看日志数据
## 查看 server 日志
cat /usr/local/canal/logs/canal/canal.log
## 或者使用如下命令查看 instance 的日志
tail -f -n 100 /usr/local/canal/logs/example/example.log

核心代码:

public class CanalTest {

	/**
	 * 	1. 	连接Canal服务器
	 * 	2.	向Master请求dump协议
	 * 	3.	把发送过来的binlog进行解析
	 * 	4.	最后做实际的操作处理...发送到MQ Print...
	 * @param args
	 */
	public static void main(String[] args) {
		
		CanalConnector connector = CanalConnectors.newSingleConnector(
				new InetSocketAddress("192.168.31.101", 11111), // 192.168.31.101是canal所在节点ip,11111是java连接canal的端口号
				"example", "test", "123456");
		
		int batchSize = 1000; //	拉取数据量
		int emptyCount = 0;
		try {
			//	连接我们的canal服务器
			connector.connect();
			//	订阅什么内容? 什么库表的内容??
			connector.subscribe(".*\\..*"); // 表示订阅所有的库和表
			//	出现问题直接进行回滚操作
			connector.rollback();
			
			int totalEmptyCount = 1200;
			
			while(emptyCount < totalEmptyCount) {
				Message message = connector.getWithoutAck(batchSize); // 一次性拉取 1000 条数据,封装成一个 message
				//	batchId用于处理完数据后进行ACK提交动作
				long batchId = message.getId();
				int size = message.getEntries().size();

				if(batchId == -1 || size == 0) {
					// 没有拉取到数据
					emptyCount++;
					System.err.println("empty count: " + emptyCount);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						// ignore..
					}
				} else {
					// 有数据
					emptyCount = 0;
					System.err.printf("message[batchId=%s, size=%s] \n", batchId, size);
					//	处理解析数据
					printEnrty(message.getEntries());
				}
				//	确认提交处理后的数据
				connector.ack(batchId);
			}
			System.err.println("empty too many times, exit");
			
		} finally {
			//	关闭连接
			connector.disconnect();
		}
		
	}

	private static void printEnrty(List<Entry> entries) {
		for(Entry entry : entries) {
			System.err.println("entry.getEntryType():"+entry.getEntryType());
			// 	如果EntryType 当前处于事务的过程中 那就不能处理
			if(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
				continue;
			}
			
			//	rc里面包含很多信息:存储数据库、表、binlog
			RowChange rc = null;
			try {
				//	二进制的数据
				rc = RowChange.parseFrom(entry.getStoreValue());
			} catch (Exception e) {
				throw new RuntimeException("parser error!");
			}
			EventType eventType = rc.getEventType();
			
			System.err.println(String.format("binlog[%s:%s], name[%s,%s], eventType : %s", 
					entry.getHeader().getLogfileName(),
					entry.getHeader().getLogfileOffset(),
					entry.getHeader().getSchemaName(),
					entry.getHeader().getTableName(),
					eventType));
			
			//	真正的对数据进行处理
			for(RowData rd : rc.getRowDatasList()) {
				if(eventType == EventType.DELETE) {
					//	delete操作 BeforeColumnsList
					List<Column> deleteList = rd.getBeforeColumnsList();
					printColumn(deleteList, "删除前");
					
				} else if(eventType == EventType.INSERT) {
					//	insert操作AfterColumnsList
					List<Column> insertList = rd.getAfterColumnsList();
					printColumn(insertList, "新增后");
				} 
				//	update
				else {
					List<Column> updateBeforeList = rd.getBeforeColumnsList();
					printColumn(updateBeforeList, "修改前");
					List<Column> updateAfterList = rd.getAfterColumnsList();
					printColumn(updateAfterList, "修改后");
				}
			}
		}
	}
	
	private static void printColumn(List<Column> columns, String operationMsg) {
        System.err.println("CanalEntry.Column的个数:"+list.size());
		for(Column column: columns) {
			System.err.println("操作类型:" + operationMsg + "--"
					+column.getName() 
					+ " : " 
					+ column.getValue() 
					+ ", update = " 
					+ column.getUpdated());
		}
	}	
}


扩展:

java与 canal 结合的操作是单线程,性能上不是很好,没有消息堆积能力,并发量一上来性能支撑不住,所以需要将 canal 与 kafka 整合,将 mysql binlog 数据投递到 kafka 上,再经过消费端去处理。kafka的优点:稳定性好,性能好,高吞吐量,可以做流量削峰,缓存数据。使用kafka可以有消息堆积,缓存消息,高性能高吞吐。在处理大规模的数据时有很大优势。


mysql的binlog 有变更时,canal 会解析 binlog 日志,将解析的数据发送给 kafka,然后编写 消费端代码,处理 kafka 的消息。比如可以输出到 ES

  • Canal 与 Kafka整合,需要配置canal.properties
vim /usr/local/canal/conf/canal.properties

主要修改内容为:
canal.serverMode = kafka # 选择kafka的推送模式,发送kafka消息,默认是 tcp
kafka.bootstrap.servers = 192.168.31.101:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 100
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 3
  • 修改 instance.properties
vim /usr/local/canal/conf/example/instance.properties

主要修改内容如下:
# table regex 这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user
canal.instance.filter.regex=.*\\..*

####### mq config ######
# canal.mq.topic=example
# 动态topic,根据库名和表名生成 dynamic topic route by schema or table regex
canal.mq.dynamicTopic=.*\\..*
# table black regex
canal.instance.filter.black.regex=

#canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
# 根据 kafka 创建 topic时,默认生成的 partition 数量来设置,一般小于等于 kafka 的 partition 数量,我的kafka设置的为5,所以这里也设置5
canal.mq.partitionsNum=5
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
# .*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
canal.mq.partitionHash=.*\\..*:$pk$

相关推荐

4万多吨豪华游轮遇险 竟是因为这个原因……

(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...

“菜鸟黑客”必用兵器之“渗透测试篇二”

"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...

科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白

作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...

麦子陪你做作业(二):KEGG通路数据库的正确打开姿势

作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...

知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势

智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...

每日新闻播报(September 14)_每日新闻播报英文

AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...

香港新巴城巴开放实时到站数据 供科技界研发使用

中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...

5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper

本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...

Qt动画效果展示_qt显示图片

今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...

如何从0到1设计实现一门自己的脚本语言

作者:dong...

三年级语文上册 仿写句子 需要的直接下载打印吧

描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...

C++|那些一看就很简洁、优雅、经典的小代码段

目录0等概率随机洗牌:1大小写转换2字符串复制...

二年级上册语文必考句子仿写,家长打印,孩子照着练

二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...

一年级语文上 句子专项练习(可打印)

...

亲自上阵!C++ 大佬深度“剧透”:C++26 将如何在代码生成上对抗 Rust?

...

取消回复欢迎 发表评论: