canal 笔记
使用canal前需要准备以下几个内容
- 安装配置MySQL
1.1 安装 mysql,
1.2 配置 mysql binlog使用ROW模式
1.3 在MySQL添加对应的canal用户
1.4 检查canal用户生效- 下载canal并配置
2.1 下载canal
2.2 配置 canal
2.3 启动canal (需要JDK>=1.6.25)
(1) 配置MySQL
(1.1) 安装MySQL
(1.2) 修改MySQL配置文件
canal的原理是基于mysql binlog技术,所以需要开启mysql的binlog写入功能,并且配置binlog模式为row.
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction ,不能和 canal 的 slaveId 重复
(1.3) MySQL添加canal用户并授权
canal的原理是模拟自己为mysql slave,所以需要mysql slave的相关权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
(1.4) 校验用户对应权限
show master status ;
如果正常显示binlog,则没问题,如果提示Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation
,则没有对应 REPLICATION CLIENT 权限
show slave status ;
如果提示Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
,则没有对应 REPLICATION SLAVE 权限
(2) 下载并启动canal
执行 ./bin/startup.sh
即可启动
(2.1) 下载canal
到 https://github.com/alibaba/canal/releases 选择合适的版本
下载 wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz
(2.2) 修改配置
修改
conf/example/instance.properties
以下只列出比较重要的配置
## mysql serverId 不能重复
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = database_wkq
canal.instance.connectionCharset = UTF-8
#table regex 需要监控的表 通过,分隔 也可以使用正则 .*\\..*
canal.instance.filter.regex = table_wkq,table_2,table_3,
# table black regex
canal.instance.filter.black.regex =
(2.3) 启动canal
通过
sh bin/startup.sh
或者./bin/startup.sh
启动
启动后通过
jps -l
命令 可以看到com.alibaba.otter.canal.deployer.CanalLauncher
** canal启动时canal.log **
canal.deployer-1.0.24/logs/canal/canal.log
2018-07-23 20:27:46.449 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-07-23 20:27:46.625 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.62.130:11111]
2018-07-23 20:27:47.576 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
2018-07-23 20:27:47.721 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx1 successful.
2018-07-23 20:27:47.802 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx2 successful.
2018-07-23 20:27:47.862 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx3 successful.
2018-07-23 20:27:47.921 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx4 successful.
2018-07-23 20:27:47.987 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx5 successful.
2018-07-23 20:27:48.044 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx6 successful.
2018-07-23 20:27:48.094 [canal-instance-scan-0] INFO c.a.o.canal.deployer.monitor.SpringInstanceConfigMonitor - auto notify start xxx7 successful.
(2.2) canal正常启动时instance对应的日志
canal.deployer-1.0.24/logs/example/example.log
2018-07-23 20:27:47.429 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2018-07-23 20:27:47.436 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]
2018-07-23 20:27:47.444 [canal-instance-scan-0] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2018-07-23 20:27:47.451 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx
2018-07-23 20:27:47.453 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2018-07-23 20:27:47.666 [destination = xxx , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
停止canal
sh stop.sh
或./bin/stop.sh
2018-07-23 21:45:08.241 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## stop the canal server
2018-07-23 21:45:08.296 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[10.0.62.130:11111]
2018-07-23 21:45:08.296 [Thread-5] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## canal server is down.
(4) 程序中使用
以下代码仅作为示例
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* CanalTest
*
* @author: weikeqin.cn@gmail.com
* @date: 2020-05-30 08:26
**/
@Slf4j
public class CanalTest {
/**
* @param args
*/
public static void main(String args[]) {
String canalHost = "127.0.0.1";
int canalPort = 11111;
String destination = "example";
InetSocketAddress address = new InetSocketAddress(canalHost, canalPort);
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "", "");
// connector = CanalConnectors.newClusterConnector(addresses, destination, "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
// 链接对应的canal server
connector.connect();
// 客户端订阅,不提交客户端filter,以服务端的filter为准
connector.subscribe();
// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();
int totalEmptyCount = 12000000;
// 退出条件 一般是 while true
while (emptyCount < totalEmptyCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
log.info("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.info("", e);
}
} else {
emptyCount = 0;
log.info("message[batchId={},size={}] ", batchId, size);
// 消费
consumeMsg(message.getEntries());
}
// 提交确认
connector.ack(batchId);
// 处理失败, 回滚数据
// connector.rollback(batchId);
}
log.info("empty too many times, exit");
} finally {
// 释放链接
connector.disconnect();
}
}
/**
* 消费消息
*
* @param entries
*/
private static void consumeMsg(List<CanalEntry.Entry> entries) {
// 这里只打印
printEntry(entries);
// TODO 其它操作
}
/**
* @param entrys
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
log.info(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType)
);
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
log.info("------- before");
printColumn(rowData.getBeforeColumnsList());
log.info("------- after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
/**
* @param columns
*/
private static void printColumn(List<CanalEntry.Column> columns) {
Map<String, String> map = new HashMap<>();
for (CanalEntry.Column column : columns) {
map.put(column.getName(), column.getValue());
//log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
log.info("{}", map);
}
}
(5) canal复制原理
复制如何工作,整体上来说,复制有3个步骤:
(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);
(2) slave将master的binary log events复制到它的中继日志(relay log)中;
(3) slave读取中继日志中的事件,将其重放到备库数据之上。下图描述了复制的过程:
(6) 遇到的问题
(6.1) Error When doing Client Authentication:ErrorPacket
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure:java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:208)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:71)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:56)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:157)
at java.lang.Thread.run(Thread.java:748)
原因 用户名密码不正确
(6.2) Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx
[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
with command: show master status
用canal账户登录后发现可以查看对应数据库对应表的数据,但是
show master status
提示 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation1、
instance.properties
配置文件里配置的用户没有REPLICATION权限
2、canal instance.properties 配置错误
3、配置文件里用户名密码不正确
4、MySQL对应用户不存在
5、MySQL配置不对给canal用户对应的replication权限
grant replication client on *.* to 'canal'@'%';
flush privileges
(6.3) Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
[destination = xxx , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:xx[java.io.IOException: Received error packet: errno = 1227, sqlstate = 42000 errmsg = Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:95)
Access denied 没权限 需要给对应账户授权
REPLICATION SLAVE 常用于建立复制时所需要用到的用户权限,也就是slave server必须被master server授权具有该权限的用户,才能通过该用户复制。
并且”SHOW SLAVE HOSTS”这条命令和REPLICATION SLAVE权限有关,否则执行时会报错:REPLICATION CLIENT 不可用于建立复制,有该权限时,只是多了可以使用如”SHOW SLAVE STATUS”、”SHOW MASTER STATUS”等命令。
在5.6.6版本以后,也可以使用”SHOW BINARY LOGS”。
GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'
flush privileges
(6.4) canal用了UseConcMarkSweepGC不能用JDK14
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option PermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option MaxPermSize; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option UseConcMarkSweepGC; support was removed in 14.0
Java HotSpot(TM) 64-Bit Server VM warning: Ignoring option CMSParallelRemarkEnabled; support was removed in 14.0
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
修改 bin/start.sh
文件,修改对应的JAVA路径
## set java path
if [ -z "$JAVA" ] ; then
#JAVA=$(which java)
JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java"
fi
(6.5) com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
Exception in thread "main" com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)
canal没启动 或者 canal挂了
配置被删了,检查对应destination
和instance.properties
instance.properties
没配置
(7) canal-admin后台管理
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
简单来说,canal-admin是一个后台维护系统,简化了配置canal的工作,提高了效率,终于不用到服务器上一个一个配了
(7.1) canal-admin的核心模型主要有
instance,对应canal-server里的instance,一个最小的订阅mysql的队列
server,对应canal-server,一个server里可以包含多个instance
集群,对应一组canal-server,组合在一起面向高可用HA的运维
References
[1] canal/wiki
[2] canal-AdminGuide
[3] ClientExample
[4] Canal-Admin-QuickStart
[5] Canal-Admin-Guide
[6] canal配置使用
[7] Mysql 普通账户授权replication client后登录失败问题
[8] REPLICATION SLAVE 与 REPLICATION CLIENT 权限
[9] 对replication slave,replication client的一点说明
[10] MySQL 5.6 Reference Manual – 6.2.1 Privileges Provided by MySQL
[11] SimpleCanalClientTest
[12] ClusterCanalClientTest