我想使用Siddhi流处理器从NATS服务器读取消息。我已经安装了以下软件:
NATS服务器v2.1.6(在独立服务器上)
Siddhi siddhi-runner-5.1.2(作为同一局域网中另一台服务器上的二进制独立服务器)
适用于NATS 2.0.10的Siddhi扩展,具有以下依赖性:
protobuf-java-3.9.1.jar,java-nats-streaming-2.2.2.jar,
jnats-2.6.5.jar(无论如何,我不使用流式传输选项)
我使用以下siddhi应用程序:
@App:name("TestNATS")
@source(type='nats', @map(type='text'), destination='test.message', bootstrap.servers='nats://192.168.50.173:4222')
define stream inputStream (name string, age int, country string);
@sink(type='log', prefix='LOGGER')
define stream OutputStream(name string, age int, country string);
@info(name='HelloWorldQueryNATS')
from inputStream
select name, age, country
insert into OutputStream;
当Siddhi启动时,似乎与
NATS server
连接没有问题,但是在初始握手之后,Siddhi抛出了以下exception
:[2020-04-10 10:55:00,441] INFO {org.wso2.msf4j.internal.websocket.WebSocketServerSC} - All required capabilities are available of WebSocket service component is available.
[2020-04-10 10:55:00,549] INFO {org.wso2.msf4j.analytics.metrics.MetricsComponent} - Metrics Component is activated
[2020-04-10 10:55:00,552] INFO {org.wso2.carbon.databridge.agent.internal.DataAgentDS} - Successfully deployed Agent Server
[2020-04-10 10:55:01,163] INFO {org.wso2.carbon.analytics.idp.client.core.utils.IdPServiceUtils} - Enabling default IdPClient Local User Store as configuration is not overridden.
[2020-04-10 10:55:01,167] INFO {org.wso2.carbon.analytics.idp.client.core.utils.IdPServiceUtils} - IdP client of type 'local' is started.
[2020-04-10 10:55:06,915] ERROR {io.siddhi.extension.io.nats.source.NATSSource} - Error while connecting to NATS server at destination: test.message
[2020-04-10 10:55:06,918] ERROR {io.siddhi.core.stream.input.source.Source} - Error on 'TestNATS'. Error while connecting to NATS server at destination: test.message Error while connecting at Source 'nats' at 'inputStream'. Will retry in '5 sec'. io.siddhi.core.exception.ConnectionUnavailableException: Error while connecting to NATS server at destination: test.message
Caused by: java.io.IOException: stan: connect request timeout
并连续重新启动连接过程,因此在
NATS server
日志中,我们可以看到以下消息:cid:1 - Client connection created
cid:1 - <<- [CONNECT"lang":"java","version":"2.6.0","name":"1586508901596_66035_83044","protocol":1,"verbose":false,"pedantic":false,"tls_required":false,"echo":true}]
cid:1 - <<- [PING]
cid:1 - ->> [PONG]
cid:1 - <<- [SUB _STAN.acks.TSHykWgo2B6Lv2xW8b1Bfd 1]
cid:1 - <<- [SUB _INBOX.TSHykWgo2B6Lv2xW8b1BaH 2]
cid:1 - <<- [SUB _INBOX.TSHykWgo2B6Lv2xW8b1Bcx 3]
cid:1 - <<- [SUB _INBOX.4bm5CWKphBwldfvnGCxxJd.* 4]
cid:1 - <<- [PUB _STAN.discover.test-cluster _INBOX.4bm5CWKphBwldfvnGCxxJd.4bm5CWKphBwldfvnGCxxa9 88]
cid:1 - <<- MSG_PAYLOAD: ["\n\x191586508901596_66035_83044\x12\x1d_INBOX.TSHykWgo2B6Lv2xW8b1BaH\x18\x01\"\x16TSHykWgo2B6Lv2xW8b1BXb(\x050\x03"]
cid:1 - Client Ping Timer
cid:1 - ->> [PING]
cid:1 - <<- [PONG]
cid:1 - <<- [UNSUB 1]
cid:1 - <-> [DELSUB 1]
cid:1 - <<- [UNSUB 2]
cid:1 - <-> [DELSUB 2]
cid:1 - <<- [UNSUB 3]
cid:1 - <-> [DELSUB 3]
cid:1 - Client connection closed
cid:1 - <-> [DELSUB 4]
cid:2 - Client connection created
NATS server
与使用Nodejs
的发布订阅应用程序使用的相同,并且工作正常。我也尝试使用最后一个Siddhi docker
软件包,但结果是相同的。错误可能在哪里?
最佳答案
使用nats-streaming-server而不是nats-server,连接问题不再发生。
谢谢。
关于java - 使用Siddhi与NATS服务器的连接问题,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61137459/