上一节我们讲解了常用的事务,也提及了Saga,这是在分布式环境下被经常使用的一种处理复杂业务和分布式事务的设计模式。本章我们的主要目标是编写一个简单版本的Saga处理器,不同于Seata框架中那种可独立部署的事务服务,我们所编写的Saga和业务集成在一起也不支持通过手画流程的方式实现,因为我们的目标是将Saga作为一种设计模式(不是框架)来使用,类似于您经常使用的“工厂”、“策略”等,重点学习它的思想,在真实项目中使用肯定是需要根据需求做二次加工的。而且,简单版本的优势就是足够简单,投入虽然不多但能从中获取的收益却很大。在代码演示后我们还会重点描述一下如何解决Saga事务的隔离性问题。

一、Saga种类说明

  常用的Saga包含两类:协同式和编排式。前者把流程的走向与协调全盘由事务的参考者来完成,比如最简单的场景:下单同时对库存进行扣减,订单服务本地事务完成后就把事件消息发送给库存服务,库存服务如果本地事务处理失败则由它将回滚的消息发送给订单服务。虽然整个流程当中订单服务与库存服务并没有产生耦合,但由于没有一个总的事务协调者,一旦服务参与者多起来那业务流程的可理解性就非常差,出了问题也不好定位。第二类为编排型事务也是我们本章要主要介绍的,通过把Saga的执行顺序交由一个集中的Saga编排器,由它指挥并决策业务的流向,相对于协同式整个流程要清晰很多。除非您使用的是足够成熟的第三方的框架,要不然集中式Saga也可能会存在事务参与者不清晰的问题,比如本文我们要介绍的Saga就会有类似的问题,毕竟以个人的精力很多事情的确无法做到极致,有牺牲也很正常。基于消息式的Saga很多时候你并不知道谁订阅了消息,所有的消费情况都体现在代码中,非常不方便后续的业务扩展以及代码阅读。个人在使用的时候偷了一个懒:通过图形文档的方式说明整个事务的走向包括会由谁发送命令,由谁来订阅事件,也就是把流程的逻辑定义与代码进行了分离,好的方式当然是把这些信息作为元数据来用,谁都喜欢好的但付出的成本也很高的。况且我是在2015年开始使用Saga,我倒是想用Seata呢,没有啊。

二、基于编排式的设计思想

  基于编排的模式设计思路我们在前文中已经大概介绍,那要如何实现呢?可参考如下图所示。这里使用了四个设计原则:1)方便起见Saga会和全局事务发起的一起部署,这样就不用花费精力考虑如何独立部署Saga服务;2)Saga所有的服务遵循了这样的一个模式:发送命令,接受事件。也就是说Saga只向外发布领域命令,事件参与者执行完本地事务后发送领域事件并由Saga进行订阅,各服务间使用消息队列进行解耦;3)Saga的实现也被分为应用服务和领域实体。所有的命令其实都是由Saga领域实体在处理事件后生成的,在应用服务中调用“RabbitMQ”的客户端进行发送;4)只存储命令消息,不存储事件。

戏说领域驱动设计(廿七)——Saga设计模型-LMLPHP

三、代码实现

  根据上述的模式说明您会发现发布命令与事件并不需要进行代码的说明,通过RabbitMQ的客户端即可搞定。由于我们并不会将Saga实现为一个框架,所以也不涉及到框架内部复杂的逻辑代码,那么唯一需要介绍的是Saga如何与业务集成,这也是为什么我为本章所起的标题是“Saga设计模式”。

1、命令分发与处理

  我们把发送命令和事件的组件称之为“命令总线”或“事件总线”,这样的封装在使用的时候不用使用者(一般是工程师)考虑消息队列使用的各种细节也可以实现组件的复用,毕竟您使用Saga的场景可能不只一个。

  熟悉RabbitMQ的朋友都知道想要让消费者正确的收到消息我们可以使用“Topic”模式,相当于给消息一个路由的策略,Exchange会根据Topic的名称把消息投递到某个队列中,而消费者通过这与个队列进行绑定就可以进行消费了。针对命令总线我们仍然这用这个模式,我们把Topic命名为“command.bus”。由于命令的特性是只有一个消费者,所以不论消费者启动了多少个实例,反正只要有一个能消费就行。这样做以后……这样做就什么都做不了了,这个思路是错误的。以上图为例,因为所有命令型的消息的Topic都是“command.bus”,很有可能这些命令全被“事务参与者2”消费了,他处理“命令1”自然没问题,“命令2”他可真搞不定了。所以,很可惜,“Topic”模式根本不适用。

  让我们再翻翻其它的模式,“Direct”貌似也差点意思,搞不定!那只能使用“Fanout”做点文章了。所以正确的方式应该是使用这个模式,消息被发出去后让所有的事务参与者都去订阅,然后在每个事务参考者内部维护一个类似路由表的东西,可以完成“通过某个命令的名称便可知道应该哪段代码来处理这个命令”的需求,正好是一个键值对:键是命令的名称,值是命令处理器。当根据键找不到对应的处理器时则把消息直接扔掉,因为事件参与者使用的是广播模式,但是其只关心自己能够处理的命令。至此我们已经大致把方案确认了,那就让我们一点点搞起来,music……

  首先我们先说命令对象的构成。由于有“命令路由表”的存在且这个表是一个键值对的形式,其中键是命令的名称,程序会根据这个名称去寻找对应的命令处理器。所以我们就得给每个命令一个独一无二的名称(如果全称重复,那就出现Bug了),在我给出的实现中使用的是这个命令的类全名。此外,命令的路由过程我们直接写在消息的消费者里,这样我们在获得命令名称后就可以第一时间找到对应的处理器了,参考代码如下所示。之所以引入了一个“Message”的类是因为对事件的处理和命令是类似的,所以它实际上是事件与命令共同的父类。Command中的方法“from”用于将消息反序列为命令对象,其返回的结果是真实的命令对象而不是“Command”这个父类。友情提示一下:清注意“Message”是从什么对象继承的。

public abstract class Message extends EntityModel {
    private String name;

    public Message() {
        this.name = this.getClass().getName();
    }
}

public class Command extends Message {
    public static Command from(String json) {
        JsonNode jsonNode = objectMapper.readValue(json, JsonNode.class);
        String className = jsonNode.get("name").asText();
        Command command = (Command)objectMapper.readValue(json, Class.forName(className));
    }
}

  我们再展示一下消息监听的代码,如下所示。其实很简单就两行:反序列化命令同时使用“CommandDispatcher”进行命令的分发处理,分发过程其实就是根据命令的名称找到对应的命令处理器。这个类就是我们前方中提及的“命令路由器”

@RabbitListener(queues = “command.bus”)
public void listenCommand(String message, Message message1, Channel channel) {
    Command command = Command.from(message);
    CommandDispatcher.INSTANCE.dispatch(command);
}

  按一般的习惯我们应该开始介绍“CommandDispatcher”,但我们一般都喜欢不走寻常路,所以请移动您的尊驾我们先看看命令处理器要如何搞,代码如下所示  。“CommandHandlerUnite”是一个抽象类,所有的命令处理器都需要从它继承。“process”方法是通过反射的形式调用到实际接收这个命令的方法,这句好是不是很不好理解?那我们细说一下:1)通过前面的代码我们知道命令的消息虽然可以被成功的反序列化成真实的命令对象,但声明它的类型仍然是抽象类型“Command”;2)一个应用服务中可能会有多个命令的处理方法,比如下文中的“AccountService ”所示。那么我要如何根据一个声明为抽象类型的对象调用到能够以这个对象的实际类型为参数的方法呢?答案就是反射。

public abstract class CommandHandlerUnite {

    private static final String COMMAND_HANDLER_METHOD_NAME = "process";

    @Override
    public void process(Command command) {
        if (command == null) {
            return;
        }
        Class clazz = Command.from(command.getName());
        Method handler = this.getClass().getMethod(COMMAND_HANDLER_METHOD_NAME, new Class[]{clazz});
        handler.invoke(this, command);
    }
}

@Service
public class AccountService extends CommandHandlerUnite {
    public void handle(IncreaseRewardPoints command) {

    }

    public void handle(DecreaseRewardPoints command) {

    }
}

  到这里相信聪明的您应该还可以跟得上节奏,那咱们再把前面的坑埋上,也就是“CommandDispatcher”,这是一个命令路由表对象,代码好下所示。“dispatch”用于分发命令,调用的是“CommandHandlerUnite”中声明的方法“process”;“register”用于注册命令处理器,也就是把命令处理器放到HashMap中。

public class CommandDispatcher {

    //命令处理器对象列表
    private Map<String, CommandHandlerUnite> commandHandlers = new HashMap<>();

    /**
     * 命令分发器实例
     */
    public final static CommandDispatcher INSTANCE = new CommandDispatcher();


    /**
     * 分发消息方法
     */
    public void dispatch(Command command) {
        CommandHandler commandHandler = this.commandHandlers.get(command.getName());
        if (commandHandler != null) {
            commandHandler.process(command);
        }
    }

    /**
     * 注册处理分发器
     *
     */
    public void register(String commandName, CommandHandlerUnite commandHandler) {
        this.commandHandlers.put(commandName, commandHandler);
    }
}

  到此,命令相关的处理已经讲完了,虽然没有说到Saga但离我们的目标已经不远了。下面我们再讲一下事件的处理。

2、事件分发与处理

  事件的分发与处理其实和命令的处理是一样的,唯一的区别是一个事件可以被多个消费者同时消费 。如果事件的消费者分布在多个服务中,在使用了“Fanout”模式后消息可以被同时分发至对应的服务;如果是分布在同一个服务中则仍然使用类似上面的路由表的形式,只是我们给它一个新的名称“事件路由器”,代码和命令路由器略有不同,所下所示。

public class EventDispatcher {

    //事件处理器对象列表
    private Map<String, List<EventHandlerUnite>> eventHandlers = new HashMap<>();


    /**
     * 分发消息方法
     */
    public void dispatch(Event event) {
        List<EventHandler> eventHandlers = this.eventHandlers.get(event.getName());
        for (EventHandler eventHandler : eventHandlers) {
            if (eventHandler != null) {
                eventHandler.process(event);
            }
        }
    }

    /**
     * 注册处理分发器
     */
    public void register(String eventName, EventHandlerUnite eventHandler) {
        List<EventHandlerUnite> eventHandlers = this.eventHandlers.get(eventName);
        if (eventHandlers == null) {
            eventHandlers = new ArrayList<>();
        }
        eventHandlers.add(eventHandler);
        this.eventHandlers.put(eventName, eventHandlers);
    }
}

3、Saga

  我们的主角开始上场,不过讲Saga还得有案例才行,所以还得使用我们前面介绍过的那个一句话业务“订单支付后用户积分加10”。其实屏幕前的朋友不要觉得案例简单,我们学习的是模式不是表面的代码。书归正文,Saga要分成两个层次:应用服务和领域模型。在应用服务中我们注册自己为事件处理器并编写用于处理事件的方法,代码如下所示。再强调一下它的工作模式:发送命令——接收事件。

@Service
public class OrderProcessSagaService extends EventHandlerUnite {

    OrderProcessSaga {
        EventDispatcher.INSTANCE.register(OrderPaid.getClass().getName(), this);
        EventDispatcher.INSTANCE.register(RewardPointsIncreased.getClass().getName(), this);
    }

    public void handle(OrderPaid event) {
        OrderProcessSaga saga = this.orderProcessSagaRepository.findBy(event.getOrderId());
        if (saga == null) {
            saga = new OrderProcessSaga();
        }
        List<Command> commands = saga.handle(event);
        //命令和Saga对象一起进行保存
        this.orderProcessSagaRepository.updateOrSave(saga);
        this.commandBus.post(commands);
    }

    public void handle(RewardPointsIncreased event) {
        OrderProcessSaga saga = this.orderProcessSagaRepository.findBy(event.getOrderId());
        sava.handle(event);
        //命令和Saga对象一起进行保存
        this.orderProcessSagaRepository.update(saga);
        this.commandBus.post(commands);
    }
}

  上面代码中的“OrderProcessSaga”其实就是Saga的领域模型,所有的事件处理都在它里面进行处理,贴一段代码供参考。两个“handle”方法分别应对两个不同的事件,在这些事件中您可以按自己的需求决策业务的走向也可以进行业务的补偿。本案例只演示了正向的业务流程,其实反向(业务补偿)的也是一样的,只不过是处理不同的事件。注意一点:业务补偿的代码要写在“OrderProcessSagaService ”中,不可以写在“OrderProcessSaga ”里面,因为它只负责处理Saga的业务(只做业务的调度)不负责处理业务。什么?怎么决策业务的走向?命令啊,您通过发送不同的命令不就实现了业务流程的控制了吗?

public class OrderProcessSaga extends EntityModel {
    private String orderId;
    private String accountId;
    private Status status;
    private List<Command> commands = new ArrayList<>();

    //处理订单支付事件
    public List<Command> handle(OrderPaid event) {
        if (this.status == Status.FINISHED || this.status == Status.CLOSED) {
            throw new OrderProcessException();
        }
        this.status = Status.STARTING_REWARD_POINTS_PROCESS;
        //发送账户增加10积分的命令
        this.commands.add(new IncreaseRewardPoints(10L, this.accountId, this. orderId));

        return this.commands;
    }

    //处理积分增加事件
    public List<Command> handle(RewardPointsIncreased event) {
        if (this.status != Status.STARTING_REWARD_POINTS_PROCESS) {
            throw new OrderProcessException();
        }
        this.status = Status.FINISHED;
        return this.commands;
    }
}

  您知道为什么我每次都会把Saga对象存储起来吗?一是为了流程更好的观察,根据Saga的状态便可知晓每个流程的当前状态,加个页面当然也可以啦;二是流程伴随着命令信息在一个事务内共同存储不仅可以保障消息不丢失,当遇到命令无法被正确发送或发送后消息丢失、消息没有被路由到消费者的情况,我们只需要把命令查询出来再发送一下就解决了,随随便便就实现了断点续传。

总结

  本节主要讲了Saga模式的实现方式及相关的代码。本来还想讲一下如何处理Saga的隔离性问题,奈何最近精力有限,我们先发出一部分,后面我再把坑填上。对了,针对上面说过的命令或事件的名称 我使用了类全名其实并不是很好,在分布架构下反而容易泄露内部信息。您其实也可以使用“服务名+命令类名”的方式,反正只要保障名称不重复就行。此外,案例代码仅用于演示,真实环境中还需要做很多的验证处理,请务必注意。

05-23 16:02