Bring up a Kafka-based Ordering Service

这篇文章假设读者对怎样设置Kafka集群和ZooKeeper集合已经初步了解。这篇文章的目的是讲解部署一个基于Kafka集群的ordering service的步骤,以对你的区块链网络提供ordering服务。

关于Kafka和Zookeeper的介绍摘取百度百科如下:

Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

Kafka相关术语:

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer:负责发布消息到Kafka broker

Consumer:消息消费者,向Kafka broker读取消息的客户端。

Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Zookeeper:ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

ZooKeeper的基本运转流程:

1、选举Leader。

2、同步数据。

3、选举Leader过程中算法有很多,但要达到的选举标准是一致的。

4、Leader要具有最高的执行ID,类似root权限。

5、集群中大多数的机器得到响应并follow选出的Leader。

=====================================分割线=============================================

每个channel对应Kafka中一个单独分区的主题。当一个OSN(ordering service node)通过broadcast调用接收到交易时,首先需要验证确认broadcast client有写这个channel的权限,然后转发这些交易到适当的分区。分区也会被OSN消费(生产-消费模型,针对转发的交易),OSN将收到的交易分组到本地的区块中,将区块持久化到本地的账本中同时通过deliver调用分发区块到peer。

我们使用K和Z分别代表Kafka集群和ZooKeeper集群的节点个数。

1)K的最小值应该被设置为4(我们将会在第4步中解释,这是为了满足crash容错的最小节点数。如果有4个代理,那么可以容错一个代理崩溃,一个代理停止服务后,channel仍然可以继续读写,新的channel可以被创建)

2)Z可以为3,5或是7。它的值需要是一个奇数避免脑裂(split-brain)情况,同时选择大于1的值为了避免单点故障。超过7个ZooKeeper servers会被认为overkill。

操作如下:

1、Orderers:将Kafka相关的信息编码进入到网络的genesis block如果你使用工具configtxgen,编辑configtx.yaml或者其他用于配置系统channel的Genesis block的预先设置文件。

A、Orderer.OrdererType设置为Kafka

B、Orderer.Kafka.Brokers包含Kafka集群中至少两个代理的地址信息(IP:port),这个list不需要是完全的(这些是你的种子代理),这个代理表示当前Order所要连接的Kafka代理

2、Orderers:设置最大的区块大小。每个区块最大有Orderer.AbsoluteMaxBytes个字节(不包括头部),这个值可以在configtx.yaml中设置。假定这里你设置的值为A,记住这个值,这会影响你在第4步怎样配置Kafka代理。

3、Orderers:创建genesis block使用configtxgen,其中第1步和第2步的设置都是系统级设置,它们应用于整个网络的所有OSNs。记住genesis block的位置。

4、Kafka集群:配置Kafka代理。确保每个Kafka代理都配置了如下属性:

A、unclean.leader.election.enable = false---数据一致性在区块链环境中是至关重要的。我们不能从in-sync replica(ISR)集合之外选取channel leader,否则我们将会面临对于之前的leader产生的offsets覆盖的风险,这样的结果是,orderers产生的区块可能会重新写入区块链。

B、min.insync.replicas = M---设置一个M值(例如1<M<N,查看下面的

default.replication.factor)。数据提交时会写入至少M个副本(这些数据然后会被同步并且归属到in-sync replica集合或ISR)。其它情况,写入操作会返回一个错误。接下来:

1)如果channel写入的数据多达N-M个副本变的不可用,操作可以正常执行。

2)如果有更多的副本不可用,Kafka不可以维护一个有M数量的ISR集合,因此Kafka停止接收写操作。Channel只有当同步M个副本后才可以重新可以写。

C、default.replication.factor = N---设置一个值N,N<K。设置replication factor参数为N代表着每个channel都保存N个副本的数据到Kafka的代理上。这些都是一个channel的ISR集合的候选。如同在上边min.insync.replicas section设置部分所描述的,不是所有的代理(orderer)在任何时候都是可用的。N的值必须小于K,如果少于N个代理的话,channel的创建是不能成功的。因此,如果设置N的值为K,一个代理失效后,那么区块链网络将不能再创建新的channel---orderering service的crash容错也就不存在了。

D、message.max.bytes 和replica.fetch.max.bytes应该设置一个大于A(第2步设置的Orderer.AbsoluteMaxBytes值)的值。为header增加一些缓冲区空间---1MB已经足够大。上述不同设置值之间满足如下关系:

Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes

(更完整的是,message.max.bytes应该严格小于socket.request.max.bytes的值,socket.request.max.bytes的值默认被设置为100MB。如果你想要区块的大小大于100MB,需要编辑fabric/orderer/kafka/config.go文件里硬编码的值brokerConfig.Producer.MaxMessageBytes,修改后重新编译源码得到二进制文件,这种设置是不建议的。)

E、log.retention.ms = -1。除非orderering service对Kafka日志的修剪增加支持,否则你需要关闭基于时间的日志保留方式并且避免分段到期(基于大小的日志保留方式log.retention.bytes在写本文章时在Kafka中已经默认关闭,因此不需要再次明确设置这个配置)。

基于上面的描述,M和N的最小值分别为2和3。这个配置允许新的channel创建,所有的channel也继续可写。

5、Orderers:指向每个OSN的genesis block编辑orderer.yaml中的General.GenesisFile配置设定指向第3步时所生成的genesis block。

6、Orderers:调整轮询间隔和超时(可选步骤)

A、orderer.yaml配置文件中的Kafka.Retry配置允许调整metadata/producer/consumer请求的频率,同样可以设置超时时间。(这些配置是作为一个Kafka生产者和消费者所需关注的所有配置)

B、另外,当一个新的channel被创建或者当一个已经存在的channel重新加载(假设一个刚刚重启的orderer),orderer和Kafka集群的交互方式如下:

1)orderer为该channel对应的Kafka分区创建一个producer(writer)

2)使用producer发一个空操作连接信息到这个分区

3)为这个分区创建一个consumer

如果上述步骤中任意一步失败了,你可以调整上述步骤重复的频率。特定的,上述过程会在每个Kafka.Retry.ShortInterval重新尝试,尝试共计Kafka.Retry.ShortTotal时间。然后每个Kafka.Retry.LongInterval重新尝试一次,尝试总计Kafka.Retry.LongTotal时间,直到上述步骤成功执行。注意在上述步骤成功执行之前,orderer是不能对一个channel进行读写操作的。

7、设置OSNs和Kafka集群通过SSL通信。(可选步骤,但是高度推荐。)参考 the Confluent guide(http://docs.confluent.io/2.0.0/kafka/ssl.html)Kafka集群的配置值,在每个OSN上设置orderer.yaml中Kafka.TLS下面的key的值。

8、按照如下顺序启动节点:启动Zookeeper集合,Kafka集群,orderering service nodes。

其他注意事项

1、建议消息大小。第2步中,可以通过设置Orderer.Batchsize.PreferredMaxBytes键的值设置每个区块建议的大小。Kafka对于相对小的消息提供更高的吞吐量;区块大小最好不要超过1MB。

2、通过环境变量覆盖设置。当使用Hyperledger提供的Kafka和Zookeeper的docker镜像时(查看images/kafka和images/zookeeper相应的)。可以使用环境变量覆盖一个Kafka代理(broker)和Zookeeper server的配置。环境变量设置举例如下:

KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false 将会修改默认值

unclean.leader.election.enable。ORDERER_KAFKA_RETRY_SHORTINTERVAL=1s 将会修改默认值 Orderer.Kafka.Retry.ShortInterval.

支持的Kafka版本和升级

支持V1的Kafka版本是0.9和0.10。(Hyperledger Fabric使用sarama client library,支持Kafka0.9和0.10)

默认的Kafka版本设置为0.9.0.1。Hyperledger Fabric提供的Kafka镜像和默认版本一致。如果你不是使用Hyperledger Fabric所提供的Kafka镜像,确保你在orderer.yaml文件中指定了Kafka.Version的Kafka版本。

目前支持的Kafka版本包括

Version : 0.9.0.1

Version : 0.10.0.0

Version : 0.10.0.1

Version : 0.10.1.0

调试

设置General.LogLevel的值为DEBUG,orderer.yaml中Kafka.Verbose的值为true。

实例

按照上述建议配置进行配置对应的docker compose配置文件可以在fabric/bddtests目录中找到。查看dc-orderer-kafka-base.yml和dc-orderer-kafka.yml

05-11 14:47