在前面的的讨论里已经介绍了CQRS读写分离模式的一些原理和在akka-typed应用中的实现方式。通过一段时间akka-typed的具体使用对一些经典akka应用的迁移升级,感觉最深的是EvenSourcedBehavior和akka-cluster-sharding了。前者是经典akka中persistenceActor的替换,后者是在原有组件基础上在使用方面的升级版。两者都在使用便捷性方面提供了大幅度的提升。在我看来,cluster-sharding是分布式应用的核心,如果能够比较容易掌握,对开发正确的分布式系统有着莫大的裨益。但这篇讨论的重点将会集中在EventSourcedBehavior上,因为它是实现CQRS的关键。而CQRS又是大数据应用数据采集(输入)管理最新的一个重要模式。

EventSourcedBehaviro是akka-typed为event-sourcing事件源模式提供的开发支持。具体的原理和使用方法在前面的博客里都介绍过了,在这篇就不再重复。我们把时间精力放到对event-sourcing的了解和应用上。

可以说,event-sourcing是一种数据库操作的模式。简单来说:event-sourcing的工作原理是把对数据库的操作动作保存起来,不直接对数据库进行即时更新,而是在一个阶段之后通过回溯replay这些动作才对数据库进行实质的更新。event-sourcing与传统数据库操作模式的最大分别就是:event-sourcing对数据库的更新过程可以重复,在一个既定的原点开始重演所有动作可以得出同样的结果,即同样的数据库状态。在大数据、高并发应用中最难控制的应该就是用户操作了。用户可能在任何时间同时对同一项数据进行更新。通用的传统方式是通过“锁”来保证数据的正确性,但“锁”会给系统带来更多的麻烦如响应慢甚至系统锁死。而一旦出现系统锁死重启后并无有效办法恢复数据库正确状态。event-sourcing恰恰就能有针对性的解决这些问题。

感觉到,event-sourcing模式应该可以避免对“锁”的使用:在高并发环境里,event-sourcing系统的每个用户在任何时间都有可能对数据库进行操作。但他们并不直接改变数据库内容,而是将这些对数据库操作的动作保存起来。因为用户保存的是各自的动作,互不关联,所以不需要任何锁机制。当系统完成一个阶段的工作后,从这个阶段的起点开始,把所有用户的动作按发生时间顺序重演并对数据库进行实质的更新。可以看到,这个具体的数据库更新过程是单一用户的,所以不需要“锁”了。阶段的起点是由数据库状态快照来表示。在完成了这个阶段所有动作重演后数据库状态一次性更新。整个过程即是CQRS读写分离模式了,其中:保存动作为写部分,动作重演是读部分。动作重演可以在之后的任何时间进行,因而读、写是完全分离的。实际上CQRS就是一个数据库更新管理的状态机器:从数据起始状态到终结状态的一种过程管理方法。下面就用一个实际的应用设计例子来介绍CQRS在应用系统中的具体使用。

下面讨论一个超市收款机pos软件的例子:

收款流程比较简单:收款员登录=>扫码录入销售项目=>录入折扣=>其它操作=>支付=>打小票

最终结果是在数据库产生了一张销售单,即一组交易数据,是实际反映在交易数据库里的。从CQRS流程来解释:这组销售数据在开单时为空,然后在完成所有单据操作后一次性产生,也就是在CQRS模式的读部分产生的。在这个过程中一直是写部分的操作,不影响交易数据库状态。当然,我们还必须在内存里维护一个模拟的状态来对每项操作进行控制,如:用户未登录时不容许任何操作动作。所以必须有个状态能代表用户登录的,而这个状态应该可以通过动作重演来重现,所以用户登录也是一个必须保存的动作。如此,每张销售单在内存里都应该有一个状态,这个状态包括了单据状态和一个动态的交易项目集合。这个项目集合就代表即将产生的数据库交易数据。下面是单据状态的定义:

  case class VchStates(
                        opr: String = "", //收款员
                        num: Int = 1, //当前单号
                        seq: Int = 1, //当前序号
                        void: Boolean = false, //取消模式
                        refd: Boolean = false, //退款模式
                        susp: Boolean = false, //挂单
                        canc: Boolean = false, //废单
                        due: Boolean = false, //当前余额
                        su: String = "",
                        mbr: String = "",
                        disc: Int = 0, //预设折扣,如:会员折扣
                        mode: Int = 0 //当前操作流程:0=logOff, 1=LogOn, 2=Payment
                      ) extends CborSerializable { ... }

交易项目是交易数据的直接对应:

  case class TxnItem(
                      txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
                      , txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
                      , opr: String = "" //工号
                      , num: Int = 0 //销售单号
                      , seq: Int = 1 //交易序号
                      , txntype: Int = TXNTYPE.sales //交易类型
                      , salestype: Int = SALESTYPE.nul //销售类型
                      , qty: Int = 1 //交易数量
                      , price: Int = 0 //单价(分)
                      , amount: Int = 0 //码洋(分)
                      , disc: Int = 0 //折扣率 (%) 100% = 1
                      , dscamt: Int = 0 //折扣额:负值  net实洋 = amount + dscamt
                      , member: String = "" //会员卡号
                      , code: String = "" //编号(商品、部类编号、账户编号、卡号...)
                      , refnum: String = "" //参考号,如退货单号
                      , acct: String = "" //账号
                      , dpt: String = "" //部类
                    ) extends CborSerializable {

为了提高系统效率,根据操作动作实时对交易项目进行了更新,如遇到折扣动作时需要更新上一条交易项目的优惠金额等。这也是在读部分动作重演必须的,因为CQRS的读部分目的是把正确的交易数据写到数据库里。所以,CQRS的写部分就代表对内存中这个交易项目集的动态更新过程。

单据状态在结单时用EventSourcedBehavior拿了个snapshot作为下一单的起始状态。销售中途出现异常退出后可以在上一单状态快照的基础上实施动作重演把状态恢复到出现异常之前。

由于每个阶段都可以清晰的用一张销售单的生命周期来代表,所以在整单操作完成后就可以进行CQRS的读部分了。操作结束的方式最明显的是单据完成支付操作了,如下:

      case PaymentMade(acct, dpt, num, ref,amount) =>
        if (curItem.txntype != TXNTYPE.voided) {
          val due = items.totalSales - items.totalPaid
          val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
          log.step(s"#${vchState.num} PaymentMade with input totalSales[${items.totalSales}], totalPaid[${items.totalPaid}], txnItems[${items}].")
          val vchs = vchState.copy(
            seq = vchState.seq + 1,
            due = (if ((items.totalPaid.abs + curItem.amount.abs) >= items.totalSales.abs) false else true),
            mode = (if (items.totalPaid.abs > 0) 2 else 1)
          )
          val vItems = items.addItem(curItem.copy(
            salestype = SALESTYPE.ttl,
            price = due,
            amount = curItem.amount,
            dscamt = bal
          )).txnitems

          if (replay) {
            Voucher(vchs, vItems)
          } else {
            if (vchs.due) {
              val vch = Voucher(vchs,vItems)
              log.step(s"#${vchState.num} PaymentMade with current item: ${vch.items.head}")
              vch
            }
            else {
              writerInternal.lastVoucher = Voucher(vchs, vItems)
              if (!writerInternal.afterRecovery)
                endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)
              Voucher(vchs.nextVoucher, List())
            }
          }
        }
        else {
          log.step(s"#${vchState.num} PaymentMade with current item: $curItem")
          Voucher(vchState.copy(
            seq = vchState.seq + 1)
            , items.addItem(curItem).txnitems)
        }

确认了完成支付调用endVoucher. endVoucher启动读部分reader, 如下:

  def endVoucher(voucher: Voucher, txntype: Int)(implicit writerInternal: WriterInternal,pid:Messages.PID) = {

    log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with state: ${writerInternal.lastVoucher.header}, txns: ${writerInternal.lastVoucher.items}")

    val readerShard = writerInternal.optSharding.get   //ClusterSharding(writerInternal.actorContext.system)
    val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")
    val eseq = EventSourcedBehavior.lastSequenceNumber(writerInternal.optContext.get)
    val bseq = eseq - writerInternal.listOfActions.size + 1

    log.step(s"#${writerInternal.lastVoucher.header.num } sending PerformRead(${pid.shopid}, ${pid.posid},${writerInternal.lastVoucher.header.num},${writerInternal.lastVoucher.header.opr},$bseq,$eseq,$txntype,${writerInternal.expurl},${writerInternal.expacct},${writerInternal.exppass}) ...")

//    log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")
    readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)
    writerInternal.clearListOfAction()
    log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")
  }

reader是在一个sharding上即时构建的一个actor。这个actor的主要功能就是从journal里读出这张单所有动作进行重演得出交易项目集后写进交易数据库:

 def readActions(ctx: ActorContext[Command],vchnum: Int, cshr: String, startSeq: Long, endSeq: Long, trace: Boolean, nodeAddress: String, shopId: String, posId: String, txntype: Int): Future[List[TxnItem]] = {
    implicit val classicSystem = ctx.system.toClassic
    implicit val ec = classicSystem.dispatcher
    implicit var vchState = VchStates().copy(num = vchnum, opr = cshr)
    implicit var vchItems = VchItems()
    implicit var curTxnItem = TxnItem()
    implicit val pid = PID(shopId,posId)
    implicit val writerInternal = new Messages.WriterInternal(nodeAddress = nodeAddress, pid = pid, trace=trace)

    log.stepOn = trace

    log.step(s"POSReader: readActions($vchnum,$cshr,$startSeq,$endSeq,$trace,$nodeAddress,$shopId,$posId), txntype=$txntype")

    def buildVoucher(actions: List[Any]): List[TxnItem] = {
      log.step(s"POSReader: read actions: $actions")
      val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])
      val listOfActions = actions.reverse zip (LazyList from 1)   //zipWithIndex
      listOfActions.foreach { case (txn,idx) =>
        txn.asInstanceOf[Action] match {
          case ti@_ =>
            curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)
            if (!ti.isInstanceOf[Voided]) {
              if (voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {
                curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr = cshr)
                log.step(s"POSReader: voided txnitem: $curTxnItem")
              }
            }
            val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)
            vchState = vch.header
            vchItems = vch.txnItems
            log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")
        }
      }
      log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")
      vchItems.txnitems
    }

    val query =
    PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra")

    // issue query to journal
    val source: Source[EventEnvelope, NotUsed] =
      query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)

    // materialize stream, consuming events
    val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

    for {
      lst1 <- readActions    //read list from Source
      lstTxns <- if (lst1.length < (endSeq -startSeq))    //if imcomplete list read again
        readActions
        else FastFuture.successful(lst1)
      items <- FastFuture.successful( buildVoucher(lstTxns) )
      _ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
      _ <- session.close(ec)
    } yield items

   }
08-16 14:34