4.Ray-Handler之CoreHandler编写

4.Handler之CoreHandler编写-LMLPHP

如图右上角所示,Ray中有两类Handler(SubHandler和PartSubHandler),在使用中,SubHandler派生Actor的CoreHandler,PartSubHandler派生SQLToReadHandler,SQLToReadHandler派生Actor的ToReadHandler,使用Ray主要写Actor的CoreHandler和ToReadHandler。

CoreHandler是复合消息路由器,包含的功能有:消息路由器、消息处理器、消息分离器、消息聚合器、消息过滤器、消息丰富器、副本同步协调器。

消息路由器:

Tell方法中的Switch块,针对不同的Event类型,将Event事件中的数据分发给不同的事件处理方法CoreHandler承担的是消息路由器的作用。如下代码:

public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case CoinAddressGeneratingResponse value: return AcceptCoinAddressAsync(value);
case CoinWithdrawWithholdingFailedMsg value: return RollbackWithholdingAsync(value);
case CoinWithdrawWithheldEvent value: return CreateWithdrawAsync(value);
case CoinOrderCreatedEvent value: return CreateOrderAsync(value);
case CoinOrderCreatedEventV1 value: return CreateOrderAsyncV1(value);
case CoinPlanOrderCreatedEvent value: return CreatePlanOrderAsync(value);
case CoinDepositIncreasedEvent value: return CoinDepositIncreased(value);
case CoinIcoEvent value: return CoinIcoEventHandle(value);
default: return Task.CompletedTask;
}
}
消息处理器:

针对不同的Event类型,在该Actor的CoreHandler里处理该事件,CoreHandler承担的是消息处理器的作用。如下代码中,AmountAddEventHandler方法处理AmountTransferEvent事件。

在事件处理代码中,可编写的代码如下:

  • 可以只针对当前事件处理。
  • 可以获得其他actor引用,调用其他actor的方法(包括actor的只读方法和写操作方法)。
  • 可以调用数据访问层进行数据库读写。(写方法建议在ToReadHandler中进行)。

public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case AmountTransferEvent value: return AmountAddEventHandler(value);
default: return Task.CompletedTask;
}
} public Task AmountAddEventHandler(AmountTransferEvent value)
{
var toActor = HandlerStart.Client.GetGrain<IAccount>(value.ToAccountId);
return toActor.AddAmount(value.Amount, value.Id);
}
消息分离器:

当需要将较大的消息分割成多个独立的部分,并将这些独立的部分作为其他actor处理的参数时,CoreHandler承担的是分离器的作用。

消息聚合器:

当需要对不同类型消息中的数据进行聚合统计时,CoreHandler承担的是消息聚合器的作用。
例如:在加密货币交易的场景中,有ETH、BTC、USDT不同的市场,EOS在三个市场中都有交易,现在要统计本周内每个用户EOS的交易量,可以如下操作:

public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case CoinTradeSoldEvent value: return ActivitySellStatistic(value);
case CoinTradeBoughtEvent value: return ActivityBuyStatistic(value);
default: return Task.CompletedTask;
}
}
消息过滤器:

CoreHandler有可能收到它不感兴趣额的消息,并且需要丢弃这些无用消息时,CoreHandler可以承担消息过滤器的作用。

public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case AmountTransferEvent value: return AmountAddEventHandler(value);
default: return Task.CompletedTask;//消息过滤
}
}
消息丰富器:

当需要将收到的消息分进一步丰富,并将丰富后的消息作为其他actor处理的参数时,CoreHandler承担的是消息丰富器的作用。

副本同步协调器:

Ray中有主actor和副本actor两类actor,副本actor用于分担主actor的压力,执行一些异步操作。当使用副本actor,需要主actor与副本actor保持同步时,需要CoreHandler将关注的事件交给副本actor,此时CoreHandler承担的是副本同步协调器的作用。

public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
var replicatedRef = HandlerStart.Client.GetGrain<IAccountRep>(data.StateId);//获得副本actor
var task = replicatedRef.Tell(bytes);//通知副本同步
switch (data)
{
case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value));
default: return task;
}
}
示例

参考Example中,Ray.Handler项目内的AccountCoreHandler

总结

CoreHandler实际编写中很简单,主要承担消息路由和消息处理器的作用,其他功能为特殊场景提供了切入点。

05-11 20:48