Actor
是一种高并发处理模型,每个Actor
都有着自己的状态有序消息处理机制,所以在业务处理的情况并不需要制定锁的机制,从而达到更高效的处理能性。XRPC
是一个基于远程接口调用的RPC
组件,它可以简单地实现高性能的远程接口调用;XRPC
在创建远程接口时是支持针对接口创建对应的Actor
实例。当创建接口Actor
后,所有Client针对这一例实例Actor
的所有方法调用都是有序处理。以下介绍如何在XRPC
创建并使用Actor
什么场景适用于Actor
既然每个Actor
都有着自己的状态和顺序处理机制,那可以针对这优点进行相关业务的展开;在棋牌游戏中的桌子可以是一个Actor
,车票里的每一辆车可以是一个Actor
,秒杀里的每一种商品是一个Actor
。在这些Actor
所有的操作都是有序进行,不存在锁,也不需要事务(通过EventSourcing来保障)和不会产生死锁;数据变更全内存操作,通过这样可以大提高业务的处理性能。
引用XRPC
Install-Package BeetleX.XRPC
定义Actor
的RPC服务
XRPC支持的Actor
服务功能是建立在EventNext
之上,它的好处是直接接口行为的Actor
创建,而不是传统的消息加接收方法,这样在应用设计和调用上也是非常方便灵活。接下来定义一个简单的Actor
服务
- 接口定义以上是一个简单的帐记变更行为接口
1 public interface IAmountService 2 { 3 Task<int> Income(int amount); 4 Task<int> Payout(int amount); 5 Task<int> Get(); 6 }
实现接口
1 [Service(typeof(IAmountService))] 2 public class AmountService : ActorState, IAmountService 3 { 4 5 private int mAmount; 6 7 public override Task ActorInit(string id) 8 { 9 return base.ActorInit(id); 10 } 11 12 public Task<int> Get() 13 { 14 return mAmount.ToTask(); 15 } 16 17 public Task<int> Income(int amount) 18 { 19 mAmount += amount; 20 return mAmount.ToTask(); 21 } 22 23 public Task<int> Payout(int amount) 24 { 25 mAmount -= amount; 26 return mAmount.ToTask(); 27 } 28 }
启动对应的RPC服务
private static XRPCServer mXRPCServer; static void Main(string[] args) { mXRPCServer = new XRPCServer(); mXRPCServer.ServerOptions.LogLevel = LogType.Error; mXRPCServer.Register(typeof(Program).Assembly); mXRPCServer.Open(); Console.Read(); }
以上代码是在默认端口
9090
上绑定RPC
服务,可以通过运行日志查看服务启动情况
创建远程Actor
调用
- 创建RPC Client以上代码是创建一个
1 client = new XRPCClient("192.168.2.18", 9090); 2 client.Connect();
RPC
客户端,通过它的Create
可以创建接口代理 - 创建接口Actor实例以上是针对
IAmountService henry = client.Create<IAmountService>("henry"); IAmountService ken = client.Create<IAmountService>("ken");
IAmountService
创建了两个Actor
对象,这两个对象的操作都是相互隔离互不干扰;每个Actor
对象中的方法在并发下都是有序执行,并不会产生线程安全问题,所以在不同方法中操作对像的数据成员都不需要锁来保证数据安全性。
测试
为了更好地验证Actor
的隔离和并发安全性,简单地并发测试一下
1 for (int i = 0; i < concurrent; i++) 2 { 3 var task = Task.Run(async () => 4 { 5 for (int k = 0; k < requests; k++) 6 { 7 await henry.Income(10); 8 System.Threading.Interlocked.Increment(ref mCount); 9 } 10 }); 11 tasks.Add(task); 12 task = Task.Run(async () => 13 { 14 for (int k = 0; k < requests; k++) 15 { 16 await henry.Payout(10); 17 System.Threading.Interlocked.Increment(ref mCount); 18 } 19 }); 20 tasks.Add(task); 21 task = Task.Run(async () => 22 { 23 for (int k = 0; k < requests; k++) 24 { 25 await ken.Income(10); 26 System.Threading.Interlocked.Increment(ref mCount); 27 } 28 }); 29 tasks.Add(task); 30 task = Task.Run(async () => 31 { 32 for (int k = 0; k < requests; k++) 33 { 34 await ken.Payout(10); 35 System.Threading.Interlocked.Increment(ref mCount); 36 } 37 }); 38 tasks.Add(task); 39 } 40 await Task.WhenAll(tasks.ToArray()); 41 double useTime = EventCenter.Watch.ElapsedMilliseconds - start; 42 Console.WriteLine($"Completed count:{mCount}|use time:{useTime}|rps:{(mCount / useTime * 1000d):###.00} |henry:{await henry.Get()},ken:{await ken.Get()}");
两个程序同时在本机跑了一下,在50并发的情况大概是11万RPS
服务中的Actor
隔离性
服务是通过名称来实例化接口的不同Actor
,同一服务即使多个Client
同时对一名称的Actor
进行创建服务也可以保证它的唯一性。