NATS Streaming
NATS Streaming是一个以NATS为驱动的数据流系统且它的源码也是由Golang语言编写的。其中NATS Streaming服务是一个可执行的文件名为:nats-streaming-server。NATS Streaming与底层NATS服务平台无缝嵌入、扩展和互动。NATS Streaming服务作为开源软件在MIT许可下载,Apcera也积极的在维护和支持NATS Streaming 服务。
以下为NATS Streaming与NATS服务整体关系结构图:
功能
除了NATS平台核心的特性外,NATS Streaming提供了以下一些功能特性
增强消息协议
NATS Streaming使用谷歌协议缓冲区实现自己的增强型消息格式。这些消息通过二进制数据流在NATS核心平台进行传播,因此不需要改变NATS的基本协议。NATS Streaming信息包含以下字段:
序列 - 一个全局顺序序列号为主题的通道
主题 - 是NATS Streaming 交付对象
答复内容 - 对应"reply-to"对应的对象内容
数据 - 真是数据内容
时间戳 - 接收的时间戳,单位是纳秒
重复发送 - 标志这条数据是否需要服务再次发送
CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通讯领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法
消息/事件的持久性
NATS Streaming提供了可配置的消息持久化,持久目的地可以为内存或者文件。另外,对应的存储子系统使用了一个公共接口允许我们开发自己自定义实现来持久化对应的消息
至少一次的发送
NATS Streaming提供了发布者和服务器之间的消息确认(发布操作) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其他外部存储器)用来为需要重新接受消息的订阅者进行重发消息。
发布者发送速率限定
NATS Streaming提供了一个连接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任何时候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。
每个订阅者的速率匹配/限制
NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止
以主题重发的历史数据
新订阅的可以在已经存储起来的订阅的主题频道指定起始位置消息流。通过使用这个选项,消息就可以开始发送传递了:
1. 订阅的主题存储的最早的信息
2. 与当前订阅主题之前的最近存储的数据,这通常被认为是 "最后的值" 或 "初值" 对应的缓存
3. 一个以纳秒为基准的 日期/时间
4. 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒
5. 一个特定的消息序列号
持久订阅
订阅也可以指定一个“持久化的名称”可以在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者重新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最早的未被确认的消息处恢复
安装 NATS Streaming 服务
提供了多种安装方式
1. 从github上下载对应版本的二进制安装包进行安装
2. 直接使用 go 命令进行一键式安装
3. 如果是Docker环境下可以通过Docker hub上进行安装
4. windows 和 macOS上也有对应的安装方式,具体的请查看官网 install
我这里使用的是第二种方式即直接通过go命令进行安装
$ go get github.com/nats-io/nats-streaming-server
启动NATS Streaming服务
1. 直接启动:
$ nats-streaming-server
如果看到以下内容说明启动成功了:
[] // ::13.735047 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.4.
[] // ::13.735131 [INF] STREAM: ServerID: P717ypV9bkgOlj0z4wNVl5
[] // ::13.737291 [INF] Starting nats-server version 0.9.
[] // ::13.737307 [INF] Listening for client connections on 0.0.0.0:
[] // ::13.738248 [INF] Server is ready
[] // ::14.024368 [INF] STREAM: Message store is MEMORY
[] // ::14.024401 [INF] STREAM: --------- Store Limits ---------
[] // ::14.024414 [INF] STREAM: Channels: *
[] // ::14.024417 [INF] STREAM: -------- channels limits -------
[] // ::14.024421 [INF] STREAM: Subscriptions: *
[] // ::14.024425 [INF] STREAM: Messages : *
[] // ::14.024442 [INF] STREAM: Bytes : 976.56 MB *
[] // ::14.024446 [INF] STREAM: Age : unlimited *
[] // ::14.024449 [INF] STREAM: --------------------------------
2. 启动NATS Streaming服务并开启NATS监控
$ nats-streaming-server -m 8222
注意:
NATS Streaming 主题名称是不支持通配符的,在NATS中主题名是可以使用 * 和 > 的,这里不支持。
其他的大部分和NATS一致,因为它就是基于NATS实现的
NATS Streaming使用
1. 首先进入已经下载好的 nats-streaming-server 对应的路径,并启动
$ cd $GOPATH/src/github.com/nats-io/nats-streaming-server
$ go run nats-streaming-server.go
2. 运行发布者(publisher)客户端并且发送消息到指定的主题上
$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples
$ go run stan-pub.go foo "msg one"
Published [foo] : 'msg one'
$ go run stan-pub.go foo "msg two"
Published [foo] : 'msg two'
$ go run stan-pub.go foo "msg three"
Published [foo] : 'msg three'
3. 运行订阅者(subscriber)客户端,使用 -all 参数来接受由发布者发布对应主题上的所有消息
$ go run stan-sub.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196
以上可以看到使用参数 -all 可以获取到所有发布的数据,其实还有一些其他的参数供我们使用,具体如下所示:
--seq <seqno> 从指定的序列号开始
--all 接受所有发送的数据
--last 从上一次最后一次发送的那条数据开始
--since <duration> 从当前往前的时间段开始接收(例如:1s, 1hr, 具体可以参看:https://golang.org/pkg/time/#ParseDuration)
--durable <name> 永久订阅者的名称
--unsubscribe 退出时解除永久订阅
如果想要使用 GoLang 语言自己代码开发的话,可以参看我的下一篇博客:NATS_13:NATS Streaming案例讲解