使用Canal实现ClickHouse实时同步MySQL数据

2021/02 作者:ihunter 0 0

ClickHouse作为OLAP分析引擎已经被广泛使用,数据的导入导出是用户面临的第一个问题。由于ClickHouse本身无法很好地支持单条大批量的写入,因此在实时同步数据方面需要借助其他服务协助。本文给出一种结合Canal+Kafka的方案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一张ClickHouse表的方法,欢迎大家批评指正

使用Canal实现ClickHouse实时同步MySQL数据

我的需求就是将mysql多张表的数据实时同步到clickhouse中,参考了很多网上的资料,写的都不是很好,有的配置完成后,就会报错,今天终于实现了该功能,分享给大家。

我主要利用canal来实现ClickHouse实时同步MySQL数据,本次不考虑接入kafka等mq

配置mysql canal安装配置 canal-client安装配置 创建clickhouse的数据库和表 结果展示

1. 配置mysql

开启binlog

# mysql配置文件/etc/my.cnf添加下面配置
vi  /etc/my.cnf

#插入下面内容
server-id        = 1
log_bin          = /var/lib/mysql/bin.log
binlog-format    = row # very important if you want to receive write, update and delete row events
# optional
expire_logs_days = 30
max_binlog_size  = 768M
# setup listen address
bind-address     = 0.0.0.0
使用Canal实现ClickHouse实时同步MySQL数据

新增同步账号

#登陆mysql,执行下面命令,创建账号aaa,密码为123456
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
flush privileges;

2. canal安装配置

下载 canal, 访问 release页面 , 选择需要的包下载, 如以 1.1.4 版本为例

#下载canalan安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

#解压到指定目录
mkdir -p /usr/tool/canal
tar zxvf  canal.deployer-1.1.4.tar.gz  -C  /usr/tool/canal

#解压后进入目录,结构如下
drwxr-xr-x   7 awwzc  staff   238 12 14 23:34 bin
drwxr-xr-x   9 awwzc  staff   306 12 14 23:32 conf
drwxr-xr-x  83 awwzc  staff  2822 12 14 23:30 lib
drwxr-xr-x   4 awwzc  staff   136 12 14 23:34 logs

# canal启动时会读取conf目录下面的文件夹,当作instance,进入conf目录下复制example文件夹
cp -R  example/  maxwell/

#移除example文件夹
mv -rf example

#修改canal.properties(一定要修改)
# instance列表,conf目录下必须有同名的目录
canal.destinations = maxwell

#修改maxwell文件夹下面的instance.properties文件下面几项为你自己的数据库配置即可
vi conf/maxwell/instance.properties

# position info
canal.instance.master.address=192.168.0.102:3306

# username/password
canal.instance.dbUsername=maxwell
canal.instance.dbPassword=123456

# 启动,安装目录下执行以下命令,server,instance出现下面日记说明启动成功
bin/startup.sh
# 查看server日记,会出现以下日记
tail -200f  logs/canal/canal.log

2019-12-14 23:34:47.247 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-12-14 23:34:47.312 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-12-14 23:34:47.334 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2019-12-14 23:34:47.406 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.111(192.168.0.111):11111]
2019-12-14 23:34:49.026 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

# 查看instance日记,会出现以下日记
tail -200f  logs/maxwell/maxwell.log

2019-12-15 17:59:12.908 [destination = example , address = /192.168.0.102:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-12-15 17:59:12.913 [destination = example , address = /192.168.0.102:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.0.102","port":3306}},"postion":{"gtid":"","included":false,"journalName":"bin.000002","position":249315,"serverId":1,"timestamp":1576282583000}}
2019-12-15 17:59:13.015 [destination = example , address = /192.168.0.102:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=bin.000002,position=249315,serverId=1,gtid=,timestamp=1576282583000] cost : 105ms , the next step is binlog dump


#关闭
sh bin/stop.sh


3. canal-client安装配置

下载 canal-adapter, 访问 release页面 , 选择需要的包下载, 如以 1.1.4 版本为例

#下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

#解压
mkdir -p /usr/tool/canal-adapter
tar zxvf canal.adapter-1.1.4.tar.gz  -C /usr/tool/canal-adapter
#解压后目录如下
drwxr-xr-x   7 awwzc  staff   238 12 15 13:19 bin
drwxr-xr-x   9 awwzc  staff   306 12 15 13:18 conf
drwxr-xr-x  87 awwzc  staff  2958 12 15 13:18 lib
drwxr-xr-x   3 awwzc  staff   102 12 15 13:19 logs
drwxr-xr-x   6 awwzc  staff   204 12 15 13:09 plugin

#在lib目录下面添加clickhouse连接驱动
httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

#修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;
canal.conf:
    mode: tcp
    canalServerHost: 127.0.0.1:11111   # canal-server的服务地址
    secretKey:
    #同步数据源配置
    srcDataSources
    defaultDS:
    #mysql连接信息
          url: jdbc:mysql://192.168.0.102:3306/maxwell?useUnicode=true
          username: root
          password: 123456
      canalAdapters:
      - instance: maxwell # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: rdb   #rdb类型
            key: mysql
            properties:
            #clickhouse数据看配置
              jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
              jdbc.url: jdbc:clickhouse://127.0.0.1:8123/maxwell
              jdbc.username: default
              jdbc.password:
              
1. 源数据库与目标数据库名字不同,源表名与目标表名不同
  #修改adapter的conf/rdb/mytest_user.yml配置文件,指定源数据库和目标数据库
  dataSourceKey: defaultDS
  destination: maxwell
  groupId: g1
  outerAdapterKey: mysql1
  concurrent: true
  dbMapping:
    database: aqi_china
    table: tb_aqi_0
    targetTable: aqi_china.tb_aqi
    #targetPk:
    #  id: id
    mapAll: true
    #targetColumns:
    #  id:
    #  name:
    #  role_id:
    #  c_time:
    #  test1:
    #etlCondition: "where c_time>={}"
    commitBatch: 3000 # 批量提交的大小
    
 2. 多个源数据库表写入目的端的同一张表
   在conf/rdb 目录配置多个yml文件,分别指明不同的table名称。


启动

bin/startup.sh

#查看日记,出现以下日记说名启动成功
tail -200f logs/adapter/adapter.log


我在启动的时候出现了内存溢出的异常,可以通过修改startup.sh的配置:

使用Canal实现ClickHouse实时同步MySQL数据

启动后需要看日志,成功启动后会打印


4. clickhouse创建数据库和表

#创建数据库
CREATE DATABASE aqi_china;

#创建表
CREATE TABLE IF NOT EXISTS  aqi_china.tb_aqi (id Int32,uuid String,uid Int32,aqi Int32,url String,co String,dew String,h String,no2 String,o3 String,p String,pm10 String,pm25 String,so2 String,t String,w String,wg String,vtime String,ftime Int32) ENGINE=MergeTree() PARTITION BY uid ORDER BY id SETTINGS index_granularity = 8192;


5. 结果展示

我们来看一下展示结果:

这是canal客户端打印的日志:

使用Canal实现ClickHouse实时同步MySQL数据

展示clickhouse的数据:

使用Canal实现ClickHouse实时同步MySQL数据

数据已经同步成功了


赞(2) 更多分享

上篇: Linux系统部署redis集群
下篇: ClickHouse SQL基本语法和导入导出实战