写在前面的话
企业实战开发中,事件监听的运用场景非常多,当某事件发生的时候,会触发某个响应处理,其主要优势体现在多负载实例的场景下。与前几篇博文《知识点扫盲 · 监听器 Listener》《后端程序猿 · 基于 Lettuce 实现缓存容错策略》提到的观察者模式、发布订阅模式等,有异曲同工之妙。
本篇文章先介绍一下,框架封装人员如何处理事件监听场景,默认基于 RedisMessageListenerContainer
实现,下面以此技术加以说明。
技术入门
【技术简介】
RedisMessageListenerContainer 是 Spring Data Redis 提供的一个类,用于异步处理 Redis 中的发布/订阅消息。它利用 Redis 的发布/订阅机制,通过消息通道(channel)或模式(pattern)订阅消息,并在消息到达时触发相应的监听器方法。该机制广泛应用于实时数据处理、消息广播等场景。
【使用入门】
Step1、引入 Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Step2、注册消息监听 Bean,订阅事件
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setTaskExecutor(this.defaultTaskExecutor());
Topic websocket = new ChannelTopic("websocket");
Topic versionUpdate = new ChannelTopic("versionUpdate");
List<Topic> list = new ArrayList<>();
list.add(websocket);
list.add(versionUpdate);
WebsocketRedisMessageListener listener = new WebsocketRedisMessageListener();
listener.setRedisTemplate(redisTemplate);
container.addMessageListener(listener, list);
return container;
}
Step3、实现监听消费逻辑
public class WebsocketRedisMessageListener implements MessageListener {@Override
public void onMessage(Message message, byte[] pattern) {
try {
String msgChannel = new String(pattern);
String msgBody = (String) getRedisTemplate().getValueSerializer()
.deserialize(message.getBody());
switch (msgChannel) {
case TOPIC_VERSION:
VersionSocket versionSocket = new VersionSocket();
versionSocket.sendMessageAll(msgBody);
break;
default:
LOGGER.warn("处理redis主题, 找不到对应的主题,{}", msgChannel);
break;
}
} catch (Exception e) {
LOGGER.error("处理redis事件失败:{}", ExceptionUtil.stacktraceToString(e));
}
}
}
Step4、按需发布事件
redisTemplate.convertAndSend("versionUpdate", JSON.toJSONString(versionUpdate));
【拓展说明】
通过 RedisMessageListenerContainer 还可以实现针对 Redis-Key 增删改以及过期的监听。
这不是本篇文章重点,详情搜索:KeyExpirationEventMessageListener、 PatternTopic 等关键词。
实战分享
设计思路:
技术基础入门介绍完了,功能可以实现,但是步骤略多,作为框架封装开发人员,肯定要帮忙加工一下,不可能放任各业务部门的开发人员随意添加,那可能出现各种奇葩问题,还需要架构人员兜底。
接下来分享一下实战经验:
1、事件监听的实现方式有多种,框架集成了 Redis 监听方式,作为事件总线模块的默认底层实现。如果想使用其他中间件来替代默认实现,也预留了接口,方便替换,Redis 监听实现的关键技术依然是RedisMessageListenerContainer
2、将消息监听器Bean的定义工作放到框架核心包处理,包含设定默认线程池等
3、提供快速使用消费功能的接口,开发人员只需要按规约实现接口即可完成订阅工作
4、提供发布消息的API,统一操作入口
下面贴一下代码,展示封装后,开发人员如何使用:
//订阅事件
public class PortalEventListener implements ZhanshenEventListener {
/**
* 订阅事件KEY
*/
public static final Set<String> PATTERN_KEY = Set.of("zhanshen.portal");
@Override
public Set<String> patterns() {
return PATTERN_KEY;
}
@Override
public void handleEvent(ZhanshenEvent zhanshenEvent) {
log.info("收到事件消息,pattern:{},data:{} ",
zhanshenEvent.getPattern(), zhanshenEvent.getData());
}
}
//发布事件
ZhanshenEventListener.publishEvent("zhanshen.portal", "测试事件消息");
//补充:可以定义多个ZhanshenEventListener实现类,框架会统一帮忙触发。
题外话:注意事项
封装过程中,遇到一个小坑,分享一下:
RedisMessageListenerContainer 的默认使用线程池是SimpleAsyncTaskExecutor,每次消费都会创建一个线程来处理,这样就会有大量的新线程被创建。生产环境下建议使用自定义线程池,减少性能损耗。
运用场景
在实际开发中,每个后端服务都会有多个实例,在这种情况下,当一个接口触发的时候,需要所有实例都做出响应,那事件监听机制就非常有用了。
场景1:WebSocket 在线用户通知
博主所在公司采用WebSocket
技术实现了统一门户工作站的消息通知推送功能,用户登录的时候,需要调用后端接口,存储在线用户列表数据,这时候可能信息存储在某一个实例中。
当需要给用户发通知的时候,需要拉取到所在后端的用户 Session,进行 WebSocket 的 send操作。这时候由于后端是多实例,有可能没有存储相关用户信息,这时候可以通过事件监听方式,通知各个实例触发该操作。
场景2:static 静态变量的更新
某些情况,会使用静态变量维护一些数据,当要对这些数据进行修改,仅仅触发某个后端实例的接口,是不够的,可以通过事件监听机制,其他实例也订阅该动作,同步更新相关变量。
总结陈词
上文介绍了框架封装人员,如何处理事件监听逻辑,提供了一些思路分享。
事件监听更多情况下是订阅了事件,需要由外部主动发布事件,才能触发响应逻辑。其实还有数据监听、缓存监听等技术方案,即数据变化等情况下,会自动触发响应,比如:Redis-Key 的过期监听、OGG for Bigdata 的 Oracle 数据变化监听,关于这一点后面再开文章介绍。
本系列博文将介绍框架搭建人员如何以恰当的方式应对各式各样的情况,这也是此专栏的主题。
💗 后续将持续更新,请多多支持!