canal client例子:

package com.study.canal;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

public class App {
	public static void main(String[] args) throws InterruptedException {

		// 第一步:与canal进行连接
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("xxx.xxx.xxx.xxx", 11111),
				"example", "", "");
		connector.connect();

		// 第二步:开启订阅
		connector.subscribe();

		connector.rollback();

		// 第三步:循环订阅
		while (true) {
			try {
				// 每次读取 1000 条
				Message message = connector.getWithoutAck(1000);

				long batchID = message.getId();

				int size = message.getEntries().size();

				if (batchID == -1 || size == 0) {
					System.out.println("当前暂时没有数据");
					Thread.sleep(1000); // 没有数据
				} else {
					System.out.println("-------------------------- 有数据啦 -----------------------");
					PrintEntry(message.getEntries());
				}

				// position id ack (方便处理下一条)
				connector.ack(batchID);

			} catch (Exception e) {
				e.printStackTrace();

			} finally {
				Thread.sleep(1000);
			}
		}
	}

	// 获取每条打印的记录
	@SuppressWarnings("static-access")
	public static void PrintEntry(List<Entry> entrys) {

		for (Entry entry : entrys) {

			// 第一步:拆解entry 实体
			Header header = entry.getHeader();
			EntryType entryType = entry.getEntryType();

			// 第二步: 如果当前是RowData,那就是我需要的数据
			if (entryType == EntryType.ROWDATA) {

				String tableName = header.getTableName();
				String schemaName = header.getSchemaName();

				RowChange rowChange = null;

				try {
					rowChange = RowChange.parseFrom(entry.getStoreValue());
				} catch (InvalidProtocolBufferException e) {
					e.printStackTrace();
				}

				EventType eventType = rowChange.getEventType();

				System.out.println(String.format("当前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));

				// 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来
				if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
					System.out.println("rowchange sql ----->" + rowChange.getSql());
					return;
				}

				// 第三步:追踪到 columns 级别
				rowChange.getRowDatasList().forEach((rowData) -> {

					// 获取更新之前的column情况
					List<Column> beforeColumns = rowData.getBeforeColumnsList();

					// 获取更新之后的 column 情况
					List<Column> afterColumns = rowData.getAfterColumnsList();

					// 当前执行的是 删除操作
					if (eventType == EventType.DELETE) {
						PrintColumn(beforeColumns);
					}

					// 当前执行的是 插入操作
					if (eventType == eventType.INSERT) {
						PrintColumn(afterColumns);
					}

					// 当前执行的是 更新操作
					if (eventType == eventType.UPDATE) {
						PrintColumn(afterColumns);
					}
				});
			}
		}
	}

	// 每个row上面的每一个column 的更改情况
	public static void PrintColumn(List<Column> columns) {

		columns.forEach((column) -> {

			String columnName = column.getName();
			String columnValue = column.getValue();
			String columnType = column.getMysqlType();
			boolean isUpdated = column.getUpdated(); // 判断 该字段是否更新

			System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,
					columnValue, columnType, isUpdated));

		});
	}
}

pom.xml

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.2</version>
</dependency>

修改数据库数据发现客户端没有反应,可能情况如下:

canal官方文档可以点击这里查看
按照官网的教程完成配置后会发现,在修改mysql时java客户端还是没有反应。暂时发现有以下两种原因:

1.需要修改canal.properties配置,但是官网没有讲解。(大概率)
进入canal解压文件 ,编辑conf/canal.properties文件

vim conf/canal.properties

有这么一行

canal.instance.parser.parallelThreadSize = 16

默认是被注释掉的,需要打开注释,然后重启canal

cd bin
./restart.sh

这种是第一次配置时,大概率碰到的情况。

2.修改conf/example/instance.properties

第一个红框全部注释掉,第二个红框值修改为登录mysql的账号和密码,第三个框注释掉(注释掉意味着监听整个库),然后重启canal

canal 修改mysql数据后Java客户端无反应的问题解决方案-LMLPHP

12-19 10:41