canal 笔记

使用canal前需要准备以下几个内容

  1. 安装配置MySQL
    1.1 安装 mysql,
    1.2 配置 mysql binlog使用ROW模式
    1.3 在MySQL添加对应的canal用户
    1.4 检查canal用户生效
  2. 下载canal并配置
    2.1 下载canal
    2.2 配置 canal
    2.3 启动canal (需要JDK>=1.6.25)

(1) 配置MySQL

(1.1) 安装MySQL

参考 https://dev.mysql.com/doc/refman/5.7/en/installing.html

(1.2) 修改MySQL配置文件

canal的原理是基于mysql binlog技术,所以需要开启mysql的binlog写入功能,并且配置binlog模式为row.

[mysqld]  
log-bin=mysql-bin  # 开启 binlog
binlog-format=ROW  # 选择 ROW 模式
server_id=1        # 配置 MySQL replaction ,不能和 canal 的 slaveId 重复

(1.3) MySQL添加canal用户并授权

canal的原理是模拟自己为mysql slave,所以需要mysql slave的相关权限

CREATE USER canal IDENTIFIED BY 'canal';    

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  

FLUSH PRIVILEGES; 

(1.4) 校验用户对应权限

  1. show master status ;
    如果正常显示binlog,则没问题,如果提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation ,则没有对应 REPLICATION CLIENT 权限

  2. show slave status ;
    如果提示 Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation ,则没有对应 REPLICATION SLAVE 权限

(2) 下载并启动canal

执行 ./bin/startup.sh 即可启动

(2.1) 下载canal

https://github.com/alibaba/canal/releases 选择合适的版本
下载 wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz

(2.2) 修改配置

修改 conf/example/instance.properties
以下只列出比较重要的配置

## mysql serverId  不能重复
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = database_wkq
canal.instance.connectionCharset = UTF-8
#table regex  需要监控的表 通过,分隔  也可以使用正则 .*\\..*
canal.instance.filter.regex = table_wkq,table_2,table_3,
# table black regex
canal.instance.filter.black.regex =  

(2.3) 启动canal

通过 sh bin/startup.sh 或者 ./bin/startup.sh 启动

启动后通过 jps -l 命令 可以看到 com.alibaba.otter.canal.deployer.CanalLauncher

** canal启动时canal.log **

canal.deployer-1.0.24/logs/canal/canal.log

2018-07-23 20:27:46.449 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-07-23 20:27:46.625 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.62.130:11111]
2018-07-23 20:27:47.576 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
2018-07-23 20:27:47.721 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx1 successful.
2018-07-23 20:27:47.802 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx2 successful.
2018-07-23 20:27:47.862 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx3 successful.
2018-07-23 20:27:47.921 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx4 successful.
2018-07-23 20:27:47.987 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx5 successful.
2018-07-23 20:27:48.044 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx6 successful.
2018-07-23 20:27:48.094 [canal-instance-scan-0] INFO  c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx7 successful.

(2.2) canal正常启动时instance对应的日志

canal.deployer-1.0.24/logs/example/example.log

2018-07-23 20:27:47.429 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-07-23 20:27:47.436 [canal-instance-scan-0] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]
2018-07-23 20:27:47.444 [canal-instance-scan-0] WARN  org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2018-07-23 20:27:47.451 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx 
2018-07-23 20:27:47.453 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2018-07-23 20:27:47.666 [destination = xxx , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

停止canal

sh stop.sh./bin/stop.sh

2018-07-23 21:45:08.241 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## stop the canal server
2018-07-23 21:45:08.296 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[10.0.62.130:11111]
2018-07-23 21:45:08.296 [Thread-5] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## canal server is down.

(4) 程序中使用

以下代码仅作为示例

<dependency>
	<groupId>com.alibaba.otter</groupId>
	<artifactId>canal.client</artifactId>
	<version>1.1.4</version>
</dependency>

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * CanalTest
 *
 * @author: weikeqin.cn@gmail.com
 * @date: 2020-05-30 08:26
 **/
@Slf4j
public class CanalTest {

    /**
     * @param args
     */
    public static void main(String args[]) {

        String canalHost = "127.0.0.1";
        int canalPort = 11111;
        String destination = "example";
        InetSocketAddress address = new InetSocketAddress(canalHost, canalPort);

        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "", "");
        // connector = CanalConnectors.newClusterConnector(addresses, destination, "", "");

        int batchSize = 1000;
        int emptyCount = 0;
        try {
            // 链接对应的canal server
            connector.connect();
            // 客户端订阅,不提交客户端filter,以服务端的filter为准
            connector.subscribe();
            // 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
            connector.rollback();

            int totalEmptyCount = 12000000;

            // 退出条件 一般是 while true
            while (emptyCount < totalEmptyCount) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {

                    emptyCount++;
                    log.info("empty count : " + emptyCount);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        log.info("", e);
                    }

                } else {
                    emptyCount = 0;
                    log.info("message[batchId={},size={}] ", batchId, size);

                    // 消费
                    consumeMsg(message.getEntries());
                }

                // 提交确认
                connector.ack(batchId);
                // 处理失败, 回滚数据
                // connector.rollback(batchId);
            }

            log.info("empty too many times, exit");
        } finally {
            // 释放链接
            connector.disconnect();
        }
    }

    /**
     * 消费消息
     *
     * @param entries
     */
    private static void consumeMsg(List<CanalEntry.Entry> entries) {

        // 这里只打印
        printEntry(entries);
        // TODO 其它操作

    }

    /**
     * @param entrys
     */
    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            log.info(String.format("================  binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    entry.getHeader().getTableName(),
                    eventType)
            );

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    log.info("------- before");
                    printColumn(rowData.getBeforeColumnsList());
                    log.info("------- after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    /**
     * @param columns
     */
    private static void printColumn(List<CanalEntry.Column> columns) {
        Map<String, String> map = new HashMap<>();
        for (CanalEntry.Column column : columns) {
            map.put(column.getName(), column.getValue());
            //log.info(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
        log.info("{}", map);
    }


}

(5) canal复制原理

复制如何工作,整体上来说,复制有3个步骤:
(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);
(2) slave将master的binary log events复制到它的中继日志(relay log)中;
(3) slave读取中继日志中的事件,将其重放到备库数据之上。

下图描述了复制的过程:
canal复制

(6) 遇到的问题

(6.1) Error When doing Client Authentication:ErrorPacket

Caused by: java.io.IOException: connect /127.0.0.1:3306 failure:java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:208)
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:71)
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:56)
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
	at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:157)
	at java.lang.Thread.run(Thread.java:748)

原因 用户名密码不正确

(6.2) Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx
[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
 with command: show master status

用canal账户登录后发现可以查看对应数据库对应表的数据,但是 show master status 提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

1、instance.properties配置文件里配置的用户没有REPLICATION权限
2、canal instance.properties 配置错误
3、配置文件里用户名密码不正确
4、MySQL对应用户不存在
5、MySQL配置不对

给canal用户对应的replication权限
grant replication client on *.* to 'canal'@'%';
flush privileges

(6.3) Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation

[destination = xxx , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx[java.io.IOException: Received error packet: errno = 1227, sqlstate = 42000 errmsg = Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
	at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:95)

Access denied 没权限 需要给对应账户授权
REPLICATION SLAVE 常用于建立复制时所需要用到的用户权限,也就是slave server必须被master server授权具有该权限的用户,才能通过该用户复制。
并且”SHOW SLAVE HOSTS”这条命令和REPLICATION SLAVE权限有关,否则执行时会报错:

REPLICATION CLIENT 不可用于建立复制,有该权限时,只是多了可以使用如”SHOW SLAVE STATUS”、”SHOW MASTER STATUS”等命令。
在5.6.6版本以后,也可以使用”SHOW BINARY LOGS”。

GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'
flush privileges

(6.4) canal用了UseConcMarkSweepGC不能用JDK14

Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option PermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option MaxPermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option UseConcMarkSweepGC; support was removed in 14.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option CMSParallelRemarkEnabled; support was removed in 14.0
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

修改 bin/start.sh 文件,修改对应的JAVA路径

## set java path
if [ -z "$JAVA" ] ; then
  #JAVA=$(which java)
  JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java"
fi

(6.5) com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused

Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
	at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198)
	at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)

canal没启动 或者 canal挂了
配置被删了,检查对应 destinationinstance.properties
instance.properties 没配置


(7) canal-admin后台管理

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
简单来说,canal-admin是一个后台维护系统,简化了配置canal的工作,提高了效率,终于不用到服务器上一个一个配了

访问地址 http://127.0.0.1:8089/

(7.1) canal-admin的核心模型主要有

instance,对应canal-server里的instance,一个最小的订阅mysql的队列
server,对应canal-server,一个server里可以包含多个instance
集群,对应一组canal-server,组合在一起面向高可用HA的运维

References

[1] canal/wiki
[2] canal-AdminGuide
[3] ClientExample
[4] Canal-Admin-QuickStart
[5] Canal-Admin-Guide
[6] canal配置使用
[7] Mysql 普通账户授权replication client后登录失败问题
[8] REPLICATION SLAVE 与 REPLICATION CLIENT 权限
[9] 对replication slave,replication client的一点说明
[10] MySQL 5.6 Reference Manual – 6.2.1 Privileges Provided by MySQL
[11] SimpleCanalClientTest
[12] ClusterCanalClientTest