测试demo 来自官方例子
nats docker-compose file
version: '3.3'
services:
nats:
image: nats
ports:
- "4222:4222"
- "8222:8222"
- "6222:6222"
benthos stream 配置
整体说明
里面的例子就是数据从文件读取到nats 之后不同的消费者进行nats 消息消费,再到webhook,webhook 标准输出文件读取说明
input:
type: read_until
read_until:
condition:
type: static
static: false
input:
type: file
file:
path: ./sample.json
restart_input: true
pipeline:
processors:
- type: throttle
throttle:
period: 3s
output:
type: nats
nats:
subject: benthos_messages
urls:
- nats://localhost:4222
- nats 消费者(三个)
input:
type: nats
nats:
subject: benthos_messages
urls:
- nats://localhost:4222
pipeline:
processors:
- type: filter
filter:
type: jmespath
jmespath:
query: |
keys(@) | contains(@, 'title')
output:
type: http_client
http_client:
url: http://localhost:4195/webhooks/post/customer1
verb: POST
- webhook 配置
input:
type: broker
broker:
inputs:
- type: http_server
http_server:
path: /post/customer1
processors:
- type: text
text:
operator: prepend
value: "Customer 1 received: "
- type: http_server
http_server:
path: /post/customer2
processors:
- type: text
text:
operator: prepend
value: "Customer 2 received: "
- type: http_server
http_server:
path: /post/customer3
processors:
- type: text
text:
operator: prepend
value: "Customer 3 received: "
output:
type: stdout
运行
- docker-compose
dokcer-compose up -d
- 启动benthos,基于文件stream 配置
benthos --streams --streams-dir ./configs
运行效果
修改sample.json 测试
参考资料
https://github.com/Jeffail/benthos/tree/master/resources/docker/streams