canal client例子:
package com.study.canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
public class App {
public static void main(String[] args) throws InterruptedException {
// 第一步:与canal进行连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("xxx.xxx.xxx.xxx", 11111),
"example", "", "");
connector.connect();
// 第二步:开启订阅
connector.subscribe();
connector.rollback();
// 第三步:循环订阅
while (true) {
try {
// 每次读取 1000 条
Message message = connector.getWithoutAck(1000);
long batchID = message.getId();
int size = message.getEntries().size();
if (batchID == -1 || size == 0) {
System.out.println("当前暂时没有数据");
Thread.sleep(1000); // 没有数据
} else {
System.out.println("-------------------------- 有数据啦 -----------------------");
PrintEntry(message.getEntries());
}
// position id ack (方便处理下一条)
connector.ack(batchID);
} catch (Exception e) {
e.printStackTrace();
} finally {
Thread.sleep(1000);
}
}
}
// 获取每条打印的记录
@SuppressWarnings("static-access")
public static void PrintEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 第一步:拆解entry 实体
Header header = entry.getHeader();
EntryType entryType = entry.getEntryType();
// 第二步: 如果当前是RowData,那就是我需要的数据
if (entryType == EntryType.ROWDATA) {
String tableName = header.getTableName();
String schemaName = header.getSchemaName();
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType = rowChange.getEventType();
System.out.println(String.format("当前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));
// 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
System.out.println("rowchange sql ----->" + rowChange.getSql());
return;
}
// 第三步:追踪到 columns 级别
rowChange.getRowDatasList().forEach((rowData) -> {
// 获取更新之前的column情况
List<Column> beforeColumns = rowData.getBeforeColumnsList();
// 获取更新之后的 column 情况
List<Column> afterColumns = rowData.getAfterColumnsList();
// 当前执行的是 删除操作
if (eventType == EventType.DELETE) {
PrintColumn(beforeColumns);
}
// 当前执行的是 插入操作
if (eventType == eventType.INSERT) {
PrintColumn(afterColumns);
}
// 当前执行的是 更新操作
if (eventType == eventType.UPDATE) {
PrintColumn(afterColumns);
}
});
}
}
}
// 每个row上面的每一个column 的更改情况
public static void PrintColumn(List<Column> columns) {
columns.forEach((column) -> {
String columnName = column.getName();
String columnValue = column.getValue();
String columnType = column.getMysqlType();
boolean isUpdated = column.getUpdated(); // 判断 该字段是否更新
System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,
columnValue, columnType, isUpdated));
});
}
}
pom.xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
修改数据库数据发现客户端没有反应,可能情况如下:
canal官方文档可以点击这里查看。
按照官网的教程完成配置后会发现,在修改mysql时java客户端还是没有反应。暂时发现有以下两种原因:
1.需要修改canal.properties配置,但是官网没有讲解。(大概率)
进入canal解压文件 ,编辑conf/canal.properties文件
vim conf/canal.properties
有这么一行
canal.instance.parser.parallelThreadSize = 16
默认是被注释掉的,需要打开注释,然后重启canal
cd bin
./restart.sh
这种是第一次配置时,大概率碰到的情况。
2.修改conf/example/instance.properties
第一个红框全部注释掉,第二个红框值修改为登录mysql的账号和密码,第三个框注释掉(注释掉意味着监听整个库),然后重启canal