阿里Canal项目请先了解:canal

考虑可能binlog大批量变更,如果直接通过Canal订阅binlog变动,会造成CanalClient会瞬间爆掉。为了解决这个问题,我们可以引入kafka做一层封装,可以解决这个问题。

公司实现一套框架,拿来分享大家。感谢原作者.

1. 服务端-封装Canal订阅binlog消息并推送到kafka

binlogService server 启动端:
import java.util.concurrent.Executors

import com.today.data.transfer.UTIL._
import com.today.data.transfer.canal.CanalClient
import com.today.data.transfer.kafka.BinlogKafkaProducer
import com.today.data.transfer.util.SysEnvUtil
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory /**
*
* 描述: binlogService server 启动端
*
* @author hz.lei
* @since 2018年03月07日 上午1:08
*/
object BinLogServer {
val logger = LoggerFactory.getLogger(getClass) def main(args: Array[String]) {
startServer()
} /**
* 以Java 环境变量模式启动
*/
def startServer(): Unit = {
logger.info(s"启动服务 binlogServer...") val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST
val topic = SysEnvUtil.CANAL_KAFKA_TOPIC val canalServerIp = SysEnvUtil.CANAL_SERVER_IP
val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt val destination = SysEnvUtil.CANAL_DESTINATION
val username = SysEnvUtil.CANAL_USERNAME
val password = SysEnvUtil.CANAL_PASSWORD val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
kafkaProducer.init() val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
canalClient.registerBinlogListener(kafkaProducer) val executorService = Executors.newFixedThreadPool(1) executorService.execute(canalClient) logger.info("启动服务 binlogService 成功...") } def startServerWithScala(): Unit = {
logger.info(s"启动服务 binlogServer...") val config = ConfigFactory.load() val producerBrokerHost = config.getStringProxy("kafka.producerBrokerHost")
val topic = config.getStringProxy("kafka.topic") val canalServerIp = config.getStringProxy("canal.canalServerIp")
val canalServerPort = config.getStringProxy("canal.canalServerPort").toInt
val destination = config.getStringProxy("canal.destination")
val username = config.getStringProxy("canal.username")
val password = config.getStringProxy("canal.password") val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
kafkaProducer.init() val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
canalClient.registerBinlogListener(kafkaProducer) val executorService = Executors.newFixedThreadPool(1) executorService.execute(canalClient) logger.info("启动服务 binlogService 成功...")
} }
将收到的cannal 消息 发送到kafka:
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.today.data.transfer.listener.CanalBinaryListener;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.Properties; /**
* 描述: 将收到的cannal 消息 发送到kafka
*
* @author hz.lei
* @date 2018年03月07日 上午12:44
*/
public class BinlogKafkaProducer implements CanalBinaryListener {
private static Logger logger = LoggerFactory.getLogger(BinlogKafkaProducer.class);
private String topic;
private String host; protected Producer<Integer, byte[]> producer; public BinlogKafkaProducer(String kafkaHost, String topic) {
this.topic = topic;
this.host = kafkaHost;
} public void init() {
logger.info("[KafkaStringProducer] [init] " +
") broker-list(" + host + " )"); Properties properties = KafkaConfigBuilder.defaultProducer().bootstrapServers(host)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(ByteArraySerializer.class)
.build(); producer = new KafkaProducer<>(properties);
} /**
* 异步回调模式发送消息
*
* @param topic
* @param message
*/
public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> {
if (e != null) {
logger.error("[" + getClass().getSimpleName() + "]: 消息发送失败,cause: " + e.getMessage(), e);
}
logger.info("[binlog]:消息发送成功,topic:{}, offset:{}, partition:{}, time:{}",
metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp()); });
} @Override
public void onBinlog(CanalEntry.Entry entry) {
send(topic, entry.toByteArray());
}
}
Canal 客户端,监听处理逻辑:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.Message;
import com.today.data.transfer.listener.CanalBinaryListener;
import com.today.data.transfer.listener.CanalGsonListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List; /**
* 描述: Canal 客户端,监听处理 逻辑
*
* @author hz.lei
* @date 2018年03月06日 下午8:21
*/
public class CanalClient implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(CanalClient.class); private String hostname;
private int port;
private String destination;
private String username;
private String password; private CanalConnector connector; private final static int BatchSize = 1000;
private final static long Sleep = 1000;
private boolean runing = false; private List<CanalGsonListener> gsonListeners = new ArrayList<>();
private List<CanalBinaryListener> binaryListeners = new ArrayList<>(); /**
* 构造函数
*
* @param hostname canal服务端的ip
* @param port canal服务端 port
* @param destination canal 实例地址
* @param username canal用户名
* @param password canal密码
*/
public CanalClient(String hostname, int port, String destination, String username, String password) {
this.hostname = hostname;
this.port = port;
this.destination = destination;
this.username = username;
this.password = password;
init();
} public void init() {
try {
logger.info(new StringBuffer("[Canal实例信息 CanalClient] [start] ")
.append("hostname: (").append(hostname)
.append("), port: (").append(port)
.append("), destination: (").append(destination)
.append("), username: (").append(username)
.append("), password: (").append(password).append(")").toString()); connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password); connector.connect();
connector.subscribe(".*\\..*");
} catch (Exception e) {
logger.error("[CanalClient] [init] " + e.getMessage(), e);
}
} public void registerBinlogListener(CanalBinaryListener listener) {
if (listener != null) {
binaryListeners.add(listener);
}
} public void unregisterBinlogListener(CanalBinaryListener listener) {
if (listener != null) {
binaryListeners.remove(listener);
}
} @Override
public void run() { logger.info("[CanalClient] [run] "); runing = true; work();
} /**
* 处理工作 work
*/
private void work() { try {
while (runing) { Message message = connector.getWithoutAck(BatchSize); long batchId = message.getId();
int size = message.getEntries().size(); if (batchId == -1 || size == 0) {
try {
Thread.sleep(Sleep);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} } else {
if(logger.isDebugEnabled()) {
logger.debug("读取binlog日志 batchId: {}, size: {}, name: {}, offsets:{}", batchId, size,
message.getEntries().get(0).getHeader().getLogfileName(),
message.getEntries().get(0).getHeader().getLogfileOffset());
}
//处理消息
process(message.getEntries());
}
// 提交确认
connector.ack(batchId);
} } catch (Exception e) {
connector.disconnect();
logger.error("[CanalClient] [run] " + e.getMessage(), e);
} finally {
reconnect();
}
} /**
* 重连策略
*/
private void reconnect() {
logger.info("[CanalClient reconnect] 重新连接 ..."); runing = false; while (!runing) {
try {
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password);
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback(); runing = true;
} catch (Exception e) {
connector.disconnect();
logger.error("[CanalClient] [reconnect] " + e.getMessage(), e);
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
logger.error(e1.getMessage(), e1);
}
}
}
logger.info("[CanalClient reconnect] 重新连接成功!");
work();
} private void process(List<Entry> entries) {
try {
for (Entry entry : entries) {
if(logger.isDebugEnabled()){
logger.debug("mysql binlog : " + entry.getHeader().getLogfileName() + "=>" + entry.getHeader().getLogfileOffset());
}
/**
* 忽略 事务开启 、结束 ,query 的 binlog 内容
*/
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND || entry.getHeader().getEventType() == EventType.QUERY) {
continue;
}
logger.info("解析偏移量:" + entry.getHeader().getLogfileName() + "=>" + entry.getHeader().getLogfileOffset() + " ," +
"操作表[" + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + "]," +
"变更类型[" + entry.getHeader().getEventType() + "]," +
"执行时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(entry.getHeader().getExecuteTime()))); RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
logger.error("[CanalClient] [process] 解析RowChange事件错误: " + e.getMessage(), entry.toString());
continue;
} log(entry.getHeader(), rowChange); if (gsonListeners.size() > 0) {
GsonEntry binlog = new GsonEntry(entry.getHeader(), rowChange); for (CanalGsonListener listener : gsonListeners) {
listener.onBinlog(binlog);
}
} if (binaryListeners.size() > 0) {
for (CanalBinaryListener listener : binaryListeners) {
listener.onBinlog(entry);
}
} }
} catch (Exception e) {
logger.error("[CanalClient] [process] " + e.getMessage(), e);
} } private void log(Header header, RowChange rowChange) {
EventType eventType = rowChange.getEventType(); if(logger.isDebugEnabled()){
logger.debug(String.format("binlog[%s:%s], name[%s,%s], eventType : %s",
header.getLogfileName(), header.getLogfileOffset(),
header.getSchemaName(), header.getTableName(),
eventType));
} for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
log(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
log(rowData.getAfterColumnsList());
} else {
log(rowData.getBeforeColumnsList());
log(rowData.getAfterColumnsList());
}
}
} private void log(List<Column> columns) {
for (Column column : columns) {
if(logger.isDebugEnabled()){
logger.debug(new StringBuffer()
.append(column.getName()).append(" = ").append(column.getValue())
.append(" update[").append(column.getUpdated()).append("]").toString());
}
}
}
}

kafka消息实体定义:

import java.sql.Timestamp

import com.alibaba.otter.canal.protocol.CanalEntry.EventType

/**
* desc: BinlogEvent bean
*
* @author hz.lei 2018年03月07日 下午3:43
*/
case class BinlogEvent(schema: String, tableName: String, eventType: EventType, timestamp: Timestamp, before: String, after: String)

2. 客户端-订阅kafka消息获取想要的binlog变更

binlog监听类:

import com.today.binlog.BinlogEvent
import com.today.eventbus.annotation.{BinlogListener, KafkaConsumer}
import com.today.service.binlog.action._
import org.springframework.transaction.annotation.Transactional import scala.collection.JavaConverters._ /**
*
* 描述: binlog 监听类
*
* @author hz.lei
* @since 2018年03月08日 下午7:18
*/
@KafkaConsumer(groupId = "GOODS_0.0.1_EVENT", topic = "Binlog")
@Transactional(readOnly = true)
class GoodsBinlogListener {
@BinlogListener
def onBinlog(event: java.util.List[BinlogEvent]): Unit = {
event.asScala.foreach(new GoodsOnBinlogAction(_).action())
}
}

注意必须添加如下两个注解:

@KafkaConsumer(groupId = "GOODS_0.0.1_EVENT", topic = "Binlog")
@BinlogListener
添加注解扫描类:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:soa="http://soa-springtag.dapeng.com/schema/service"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://soa-springtag.dapeng.com/schema/service
http://soa-springtag.dapeng.com/schema/service/service.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="postProcessor" class="com.today.eventbus.spring.MsgAnnotationBeanPostProcessor"/>
</beans>

注解扫描类定义:

import com.today.eventbus.ConsumerEndpoint;
import com.today.eventbus.annotation.BinlogListener;
import com.today.eventbus.annotation.KafkaConsumer;
import com.today.eventbus.annotation.KafkaListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ReflectionUtils;
import com.today.eventbus.utils.Constant; import java.lang.reflect.Method;
import java.util.*; /**
* 描述: MsgAnnotationBeanPostProcessor bean 后处理器,扫描自定义注解 @KafkaListener
*
* @author hz.lei
* @see KafkaListenerRegistrar,BeanFactory,BeanPostProcessor,SmartInitializingSingleton
* @since 2018年03月01日 下午9:36
*/
public class MsgAnnotationBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware, Ordered, SmartInitializingSingleton {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* hold beanFactory ,real impl is {@link DefaultListableBeanFactory}
* for create bean dynamically, bean {@link KafkaListenerRegistrar}
*/
private BeanFactory beanFactory;
/**
* 处理 kafka 消费者 的注册与创建
*/
private KafkaListenerRegistrar registrar; /**
* beanFactory 回调,让bean持有容器的引用
*
* @param beanFactory
* @throws BeansException
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
createKafkaRegistryBean(); } /**
* 动态创建bean KafkaListenerRegistrar
*/
private void createKafkaRegistryBean() {
// 获取bean工厂并转换为DefaultListableBeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
// 通过BeanDefinitionBuilder创建bean定义
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaListenerRegistrar.class);
// 注册bean
defaultListableBeanFactory.registerBeanDefinition(Constant.KAFKA_LISTENER_REGISTRAR_BEAN_NAME, beanDefinitionBuilder.getRawBeanDefinition()); this.registrar = (KafkaListenerRegistrar) beanFactory.getBean(Constant.KAFKA_LISTENER_REGISTRAR_BEAN_NAME);
} /**
* 所有单例 bean 初始化完成后,调用此方法
*/
@Override
public void afterSingletonsInstantiated() {
this.registrar.afterPropertiesSet();
} /**
* 实例化及依赖注入完成后、在任何初始化代码(比如配置文件中的init-method)调用之前调用
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean;
} /**
* 实例化及依赖注入完成后、在任何初始化代码(比如配置文件中的init-method)调用之后调用
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
logger.debug("access to postProcessAfterInitialization bean {}, beanName {}", bean, beanName); Class<?> targetClass = AopUtils.getTargetClass(bean);
//获取类上是否有注解 @KafkaConsumer
Optional<KafkaConsumer> kafkaConsumer = findListenerAnnotations(targetClass);
//类上是否有注解
final boolean hasKafkaConsumer = kafkaConsumer.isPresent(); if (hasKafkaConsumer) {
//方法列表 ,查找方法上标有 @KafkaListener 的注解
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}); //查找方法上标有 @BinlogListener 的注解
Map<Method, Set<BinlogListener>> binlogMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<BinlogListener>>) method -> {
Set<BinlogListener> listenerMethods = findBinlogListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}); if (annotatedMethods.isEmpty() && binlogMethods.isEmpty()) {
throw new IllegalArgumentException("@KafkaConsumer found on class type , " +
"but no @KafkaListener or @BinlogListener found on the method ,please set it on the method");
} if (!annotatedMethods.isEmpty() && !binlogMethods.isEmpty()) {
throw new IllegalArgumentException("@KafkaListener or @BinlogListener only one could on the same bean class");
} if (!annotatedMethods.isEmpty()) {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// process annotation information
processKafkaListener(kafkaConsumer.get(), listener, method, bean, beanName);
}
}
logger.info("there are {} methods have @KafkaListener on This bean ", binlogMethods.size());
} if (!binlogMethods.isEmpty()) {
// Non-empty set of methods
for (Map.Entry<Method, Set<BinlogListener>> entry : binlogMethods.entrySet()) {
Method method = entry.getKey();
// process annotation information
processBinlogListener(kafkaConsumer.get(), method, bean);
}
logger.info("there are {} methods have @BinlogListener on This bean ", binlogMethods.size());
} if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
} else {
this.logger.info("No @KafkaConsumer annotations found on bean type: " + bean.getClass());
}
return bean;
} /**
* 扫描 bean 类上 是否有注解 @KafkaConsumer,只有有此注解才说明 是kafka message 消费者
*/
private Optional<KafkaConsumer> findListenerAnnotations(Class<?> clazz) {
KafkaConsumer ann = AnnotationUtils.findAnnotation(clazz, KafkaConsumer.class);
return Optional.ofNullable(ann);
} /**
* 扫描bean 方法上 是否有注解 @KafkaListener
*
* @param method
* @return
*/
private Set<KafkaListener> findListenerAnnotations(Method method) {
Set<KafkaListener> listeners = new HashSet<>();
KafkaListener ann = AnnotationUtils.findAnnotation(method, KafkaListener.class);
if (ann != null) {
listeners.add(ann);
} return listeners;
} /**
* 扫描bean 方法上 是否有注解 @BinlogListener
*
* @param method
* @return
*/
private Set<BinlogListener> findBinlogListenerAnnotations(Method method) {
Set<BinlogListener> listeners = new HashSet<>();
BinlogListener ann = AnnotationUtils.findAnnotation(method, BinlogListener.class);
if (ann != null) {
listeners.add(ann);
} return listeners;
} /**
* 处理有 @KafkaListener 注解的 方法上注解元信息,封装成 consumerEndpoint,注册
*
* @param consumer
* @param listener
* @param method
* @param bean
* @param beanName
*/
protected void processKafkaListener(KafkaConsumer consumer, KafkaListener listener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
ConsumerEndpoint endpoint = new ConsumerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBean(bean);
endpoint.setParameterTypes(Arrays.asList(method.getParameterTypes()));
// class annotation information
endpoint.setGroupId(consumer.groupId());
endpoint.setTopic(consumer.topic());
endpoint.setKafkaHostKey(consumer.kafkaHostKey());
// method annotation information
endpoint.setSerializer(listener.serializer());
//session timeout
if (consumer.sessionTimeout() < Constant.DEFAULT_SESSION_TIMEOUT) {
throw new RuntimeException("抛出该异常原因为: kafkaConsumer session 超时时间设置太小 ,请设置至少为 10000L 以上,单位为 ms(毫秒)");
}
endpoint.setTimeout(consumer.sessionTimeout()); this.registrar.registerEndpoint(endpoint);
} private void processBinlogListener(KafkaConsumer consumer, Method method, Object bean) {
Method methodToUse = checkProxy(method, bean); ConsumerEndpoint endpoint = new ConsumerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBean(bean);
// class annotation information
endpoint.setGroupId(consumer.groupId());
endpoint.setTopic(consumer.topic());
endpoint.setKafkaHostKey(consumer.kafkaHostKey());
// method annotation information
endpoint.setBinlog(true); this.registrar.registerEndpoint(endpoint); } /**
* 获取目标方法,如果是代理的,获得其目标方法
*
* @param methodArg
* @param bean
* @return
*/
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
try {
// Found a @KafkaListener method on the target class for this JDK proxy ->
// is it also present on the proxy itself?
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<?> iface : proxiedInterfaces) {
try {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
} catch (NoSuchMethodException noMethod) {
}
}
} catch (SecurityException ex) {
ReflectionUtils.handleReflectionException(ex);
} catch (NoSuchMethodException ex) {
throw new IllegalStateException(String.format(
"@KafkaListener method '%s' found on bean target class '%s', " +
"but not found in any interface(s) for bean JDK proxy. Either " +
"pull the method up to an interface or switch to subclass (CGLIB) " +
"proxies by setting proxy-target-class/proxyTargetClass " +
"attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
}
}
return method;
} @Override
public int getOrder() {
return LOWEST_PRECEDENCE;
} }

处理 binlog 缓存监听事件:
import com.github.dapeng.core.SoaException;
import com.today.eventbus.common.MsgConsumer;
import com.today.eventbus.common.retry.BinlogRetryStrategy;
import com.today.eventbus.ConsumerEndpoint;
import com.today.eventbus.config.KafkaConfigBuilder;
import com.today.eventbus.serializer.KafkaIntDeserializer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors; /**
* 描述: 处理 binlog 缓存 监听 事件
*
* @author hz.lei
* @since 2018年03月07日 上午1:42
*/
public class BinlogKafkaConsumer extends MsgConsumer<Integer, byte[], ConsumerEndpoint> { /**
* @param kafkaHost host1:port1,host2:port2,...
* @param groupId
* @param topic
*/
public BinlogKafkaConsumer(String kafkaHost, String groupId, String topic) {
super(kafkaHost, groupId, topic);
} @Override
protected void init() {
logger.info("[KafkaConsumer] [init] " +
"kafkaConnect(" + kafkaConnect +
") groupId(" + groupId +
") topic(" + topic + ")"); KafkaConfigBuilder.ConsumerConfiguration builder = KafkaConfigBuilder.defaultConsumer(); final Properties props = builder.bootstrapServers(kafkaConnect)
.group(groupId)
.withKeyDeserializer(KafkaIntDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withOffsetCommitted(false)
.excludeInternalTopic(false)
.maxPollSize("100")
.build(); consumer = new KafkaConsumer<>(props);
} @Override
protected void buildRetryStrategy() {
retryStrategy = new BinlogRetryStrategy();
} @Override
protected void dealMessage(ConsumerEndpoint consumer, byte[] value, Integer keyId) throws SoaException {
List<BinlogEvent> binlogEvents = BinlogMsgProcessor.process(value);
// > 0 才处理
if (binlogEvents.size() > 0) {
try {
consumer.getMethod().invoke(consumer.getBean(), binlogEvents);
} catch (IllegalAccessException e) {
logger.error("BinlogConsumer::实例化@BinlogListener 注解的方法 出错", e);
} catch (InvocationTargetException e) {
throwRealException(e, consumer.getMethod().getName());
} logger.info("BinlogConsumer::[dealMessage(id: {})] end, method: {}, groupId: {}, topic: {}, bean: {}",
keyId, consumer.getMethod().getName(), groupId, topic, consumer.getBean());
}
}
}

匹配binlog变动,获取变更前后信息:

import com.alibaba.otter.canal.protocol.CanalEntry
import com.today.binlog.BinlogEvent
import com.today.service.GoodsDataSource
import com.today.service.binlog.bean.SkuBean
import com.today.service.commons.cache.MemcacheProcessor
import com.today.service.commons.cache.dto.RedisBean
import redis.clients.jedis.JedisPool
import spray.json._
import wangzx.scala_commons.sql._
import com.today.service.commons.help.BizHelp.withJedis
import com.today.service.commons.help.SqlHelp
import com.today.service.commons.util.DateTools
import com.today.service.commons.`implicit`.Implicits._ import scala.collection.JavaConverters._ class GoodsOnBinlogAction(binlogEvent: BinlogEvent) extends MemcacheProcessor("goods_db_sku") {
def action() = {
logger.info(s"${getClass.getSimpleName} onBinlog ")
logger.info(s"binlogEvent:$binlogEvent") binlogEvent match {
case BinlogEvent("goods_db", "sku", CanalEntry.EventType.INSERT, timestamp, before,
json"""{"id": $id,"sku_no":${skuNoJsValue}}"""
) => {
logger.info(s"${getClass.getSimpleName} onInsert...")
val skuNo = toStringValue(skuNoJsValue)
val skuBean = reloadBySkuNo(skuNo)
if (skuBean.isDefined) insertSortSet(skuBean.get) }
case BinlogEvent("goods_db", "sku", CanalEntry.EventType.UPDATE, timestamp, json"""{"id": $beforeSkuId,"sku_no":${beforeSkuNoJsValue}}""",
json"""{"id": $afterSkuId,"sku_no":${afterSkuNoJsValue}}"""
) => {
logger.info(s"${getClass.getSimpleName} onUpdate...")
val beforeSkuNo = toStringValue(beforeSkuNoJsValue)
val afterSkuNo = toStringValue(afterSkuNoJsValue)
if (!beforeSkuNo.equals(afterSkuNo)) {
deleteSortSet(beforeSkuNo)
}
val skuBean = reloadBySkuNo(afterSkuNo)
if (skuBean.isDefined) updateSortSet(skuBean.get.primaryKey, skuBean.get)
}
case BinlogEvent("goods_db", "sku", CanalEntry.EventType.DELETE, timestamp, json"""{"id": $id,"sku_no":${skuNo}}""", after) => {
logger.info(s"${getClass.getSimpleName} onDelete...")
deleteSortSet(toStringValue(skuNo))
}case _ =>
}
}
}
05-11 15:36