一、问题背景
最近离职来到了一家新的公司,原先是在乙方工作,这回到了甲方,在这一个月中,发现目前的业务很大一部分是靠轮询实现的,例如:通过轮询判断数据处于B状态了,则轮询到数据后执行某种动作,这个其实是非常浪费的,并且对于数据的实时性也会不怎么友好,基于以上的情况,在某天开车堵车时候,想到了之前偶然了解过的事件总线(EventBus),对比了公司当前的场景后,觉得事件总线应该是可以满足需求的(PS:只是我觉得这个有问题,很多人不觉得有问题),那既然想到了,那就想自己是否可以做个事件总线的轮子
二、什么是事件总线
我们知道事件是由一个Publisher跟一个或多个的Subsriber组成,但是在实际的使用过程中,我们会发现,Subsriber必须知道Publisher是谁才可以注册事件,进而达到目的,那这其实就是一种耦合,为了解决这个问题,就出现了事件总线的模式,事件总线允许不同的模块之间进行彼此通信而又不需要相互依赖,如下图所示,通过EventBus,让Publisher以及Subsriber都只需要对事件源(EventData)进行关注,不用管Publisher是谁,那么EventBus主要是做了一些什么事呢?
三、EventBus做了什么事?
1、EventBus实现了对于事件的注册以及取消注册的管理
2、EventBus内部维护了一份事件源与事件处理程序的对应关系,并且通过这个对应关系在事件发布的时候可以找到对应的处理程序去执行
3、EventBus应该要支持默认就注册事件源与处理程序的关系,而不需要开发人员手动去注册(这里可以让开发人员去控制自动还是手动)
四、具体实现思路
首先在事件总线中,存在注册、取消注册以及触发事件这三种行为,所以我们可以将这三种行为抽象一个接口出来,最终的接口代码如下:
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public interface IEventBus { #region 接口注册 void Register<TEventData>(Type handlerType) where TEventData : IEventData; void Register(Type eventType, Type handlerType); void Register(string eventType, Type handlerType); #endregion #region 接口取消注册 void Unregister<TEventData>(Type handler) where TEventData : IEventData; void Unregister(Type eventType, Type handlerType); void Unregister(string eventType, Type handlerType); #endregion void Trigger(string pubKey, IEventData eventData); Task TriggerAsync(string pubKey, IEventData eventData); Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData; void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData; } }
在以上代码中发现有些方法是有IEventData约束的,这边IEventData就是约束入参行为,原则上规定,每次触发的EventData都需要继承IEventData,而注册的行为也是直接跟入参类型相关,具体代码如下:
using System; using System.Collections.Generic; using System.Text; namespace MEventBus.Core { public interface IEventData { string Id { get; set; } DateTime EventTime { get; set; } object EventSource { get; set; } } }
接下来我们看下具体的实现代码
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public class EventBus : IEventBus { private static ConcurrentDictionary<string, List<Type>> dicEvent = new ConcurrentDictionary<string, List<Type>>(); private IResolve _iresolve { get; set; } public EventBus(IResolve resolve) { _iresolve = resolve; InitRegister(); } public void InitRegister() { if (dicEvent.Count > 0) { return; } //_iresolve = ioc_container; dicEvent = new ConcurrentDictionary<string, List<Type>>(); //自动扫描类型并且注册 foreach (var file in Directory.GetFiles(AppDomain.CurrentDomain.BaseDirectory, "*.dll")) { var ass = Assembly.LoadFrom(file); foreach (var item in ass.GetTypes().Where(p => p.GetInterfaces().Contains(typeof(IEventHandler)))) { if (item.IsClass) { foreach (var item1 in item.GetInterfaces()) { foreach (var item2 in item1.GetGenericArguments()) { if (item2.GetInterfaces().Contains(typeof(IEventData))) { Register(item2, item); } } } } } } } //注册以及取消注册的时候需要加锁处理 private static readonly object obj = new object(); #region 注册事件 public void Register<TEventData>(Type handlerType) where TEventData : IEventData { //将数据存储到mapDic var dataType = typeof(TEventData).FullName; Register(dataType, handlerType); } public void Register(Type eventType, Type handlerType) { var dataType = eventType.FullName; Register(dataType, handlerType); } public void Register(string pubKey, Type handlerType) { lock (obj) { //将数据存储到mapDic if (dicEvent.Keys.Contains(pubKey) == false) { dicEvent[pubKey] = new List<Type>(); } if (dicEvent[pubKey].Exists(p => p.GetType() == handlerType) == false) { //IEventHandler obj = Activator.CreateInstance(handlerType) as IEventHandler; dicEvent[pubKey].Add(handlerType); } } } #endregion #region 取消事件注册 public void Unregister<TEventData>(Type handler) where TEventData : IEventData { var dataType = typeof(TEventData); Unregister(dataType, handler); } public void Unregister(Type eventType, Type handlerType) { string _key = eventType.FullName; Unregister(_key, handlerType); } public void Unregister(string eventType, Type handlerType) { lock (obj) { if (dicEvent.Keys.Contains(eventType)) { if (dicEvent[eventType].Exists(p => p.GetType() == handlerType)) { dicEvent[eventType].Remove(dicEvent[eventType].Find(p => p.GetType() == handlerType)); } } } } #endregion #region Trigger触发 //trigger时候需要记录到数据库 public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData { var dataType = eventData.GetType().FullName; //获取当前的EventData绑定的所有Handler Notify(dataType, eventData); } public void Trigger(string pubKey, IEventData eventData) { //获取当前的EventData绑定的所有Handler Notify(pubKey, eventData); } public async Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData { await Task.Factory.StartNew(new Action(()=> { var dataType = eventData.GetType().FullName; Notify(dataType, eventData); })); } public async Task TriggerAsync(string pubKey, IEventData eventData) { await Task.Factory.StartNew(new Action(() => { var dataType = eventData.GetType().FullName; Notify(pubKey, eventData); })); } //通知每成功执行一个就需要记录到数据库 private void Notify<TEventData>(string eventType, TEventData eventData) where TEventData : IEventData { //获取当前的EventData绑定的所有Handler var handlerTypes = dicEvent[eventType]; foreach (var handlerType in handlerTypes) { var resolveObj = _iresolve.Resolve(handlerType); IEventHandler<TEventData> handler = resolveObj as IEventHandler<TEventData>; handler.Handle(eventData); } } #endregion } }
代码说明:
1、如上的EventBus是继承了IEventBus后的具体实现,小伙伴可能看到在构造函数里,有一个接口参数IResolve,这个主要是为了将解析的过程进行解耦,由于在一些WebApi的项目中,更加多的是使用IOC的机制进行对象的创建,那基于IResolve就可以实现不同的对象创建方式(内置的是通过反射实现)
2、InitRegister方法通过遍历当前目录下的dll文件,去寻找所有实现了IEventHandler<IEventData>接口的信息,并且自动注册到EventBus中,所以在实际使用过程中,应该是没有机会去适用register注册的
3、触发机制实现了同步以及异步的调用,这个从方法命名中就可以看出来
五、程序Demo
TestHandler2(继承IEventHandler)
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Windows.Forms; using MEventBus.Core; namespace MEventBusHandler.Test { public class TestHandler2 : IEventHandler<TestEventData> { public void Handle(TestEventData eventData) { Thread.Sleep(2000); MessageBox.Show(eventData.EventTime.ToString()); } } }
TestEventData(继承EventData,EventData是继承了IEventData的代码)
using MEventBus.Core; using System; using System.Collections.Generic; using System.Text; namespace MEventBusHandler.Test { public class TestEventData : EventData { } }
调用代码
using MEventBus.Core; using MEventBusHandler.Test; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace MEventBus.Test { public partial class Form1 : Form { public Form1() { InitializeComponent(); TestHandler.OnOut += TestHandler_OnOut; } private void TestHandler_OnOut(object sender, EventArgs e) { MessageBox.Show("Hello World"); } private void button1_Click(object sender, EventArgs e) { var task = new MEventBus.Core.EventBus(new ReflectResolve()).TriggerAsync(new TestEventData()); task.ContinueWith((obj) => { MessageBox.Show("事情全部做完"); }); } private void button2_Click(object sender, EventArgs e) { new EventBus(new ReflectResolve()).Trigger(new TestEventData()); } } }
执行结果
我在真正的Demo中,其实是注册了2个handler,可以在后续公布的项目地址里看到
六、总结
从有这个想法开始,到最终实现这个事件总线,大概总共花了2,3天的时间(PS:晚上回家独自默默干活),目前只能说是有一个初步可以使用的版本,并且还存在着一些问题:
1、在.NetFrameWork下(目前公司还不想升级到.NetCore,吐血。。),如果使用AutoFac创建EventBus(单例模式下),如果Handler也使用AutoFac进行创建,会出现要么对象创建失败,要么handler里的对象与调用方的对象不是同一个实例,为了解决这个问题,我让EventBus不再是单例模式,将dicEvent变成了静态,暂时表面解决
2、未考虑跨进程的实现(感觉用savorboard大佬的CAP就可以了)
3、目前这个东西在一个小的新项目里使用,暂时在测试环境还算没啥问题,各位小伙伴如果有类似需求,可以做个参考
由于个人原因,在测试上可能会有所不够,如果有什么bug的话,还请站内信告知,感谢(ps:文字表达弱鸡,技术渣渣,各位多多包涵)
最后:附上项目地址:https://gitee.com/OneMango/MEventBus
作者: Mango
出处: http://www.cnblogs.com/OMango/
关于自己:专注.Net桌面开发以及Web后台开发,对.NetCore、微服务、DevOps,K8S等感兴趣,最近到了个甲方公司准备休养一段时间
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,如有问题, 可站内信告知.