这个问题是关于Siddhi的Java库-CEP
说明:
我试图建立一个HTTP源来接收数据。创建运行时并启动它没有错误。
[nioEventLoopGroup-2-1] INFO org.wso2.transport.http.netty.listener.ServerConnectorBootstrap$HTTPServerConnector - HTTP(S) Interface starting on host localhost and port 9056
[main] INFO org.wso2.extension.siddhi.io.http.source.HttpConnectorPortBindingListener - siddhi: started HTTP server connector localhost:9056
[main] INFO org.wso2.extension.siddhi.io.http.source.HttpSourceListener - Source Listener has created for url http://localhost:9056/endpoints/
但是,当我向指定地址发送POST请求时。我收到一个错误:
[nioEventLoopGroup-3-1] ERROR org.wso2.extension.siddhi.io.http.source.HTTPConnectorListener - Error in http server connector
java.lang.NoSuchMethodError: io.netty.handler.codec.http.HttpRequest.method()Lio/netty/handler/codec/http/HttpMethod;
at org.wso2.transport.http.netty.listener.CustomHttpContentCompressor.decode(CustomHttpContentCompressor.java:44)
at org.wso2.transport.http.netty.listener.CustomHttpContentCompressor.decode(CustomHttpContentCompressor.java:14)
at io.netty.handler.codec.MessageToMessageCodec$2.decode(MessageToMessageCodec.java:81)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:354)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
谁能提出我做错事情的原因?先感谢您。
受影响的产品版本:
4.1.17
操作系统,数据库,其他环境详细信息和版本:
IntelliJ IDEA 2017.3.5(社区版)
Build#IC-173.4674.33,建于2018年3月6日
JRE:1.8.0_152-release-1024-b15 amd64
JVM:JetBrains s.r.o的OpenJDK 64位服务器VM
Windows 10 10.0
重现步骤:
我编写的测试代码:
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.core.util.EventPrinter;
//import org.wso2.extension.siddhi.io.http.source.*;
public class httpTest
{
public static void main(String[] args) {
String siddhiString = "@App:name(\"haha\") " +
"@App:description(\"fasd\") " +
"@App:statistics(reporter = \"jmx\", interval = \"30\") " +
"@source(type=\"http\",receiver.url=\"http://localhost:9056/endpoints/\",@map(type=\"text\",fail.on.missing.attribute=\"true\",regex.A=\"(.*)\",@attributes(data=\"A\"))) " +
"@sink(type=\"mqtt\",url=\"tcp://120.78.71.179:1883\",topic=\"34\",@map(type=\"text\")) " +
"define stream a4P068X5YCK(data String);";
SiddhiManager siddhiManager = new SiddhiManager();
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiString);
siddhiAppRuntime.addCallback("a4P068X5YCK", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
}
});
siddhiAppRuntime.start();
}
}
然后,我将POST请求发送到http://localhost:9056/endpoints/。它返回上面发布的异常。
更新:
我回去检查Siddhi-io-http github documentation page。我发现它说:
...此扩展仅在WSO2 Data Analytic Server内部起作用,不能与独立siddhi一起运行。
我想这可能表明siddhi库目前不支持http。我已在siddhi存储库页面上提交了问题以要求确认。
更新2:
我更改了Siddhi查询,以便将源流复制到其他接收器流。代码的其他部分保持不变:
String siddhiString = "@App:name(\"haha\") " +
"@App:description(\"fasd\") " +
"@App:statistics(reporter = \"jmx\", interval = \"30\") " +
"@source(type=\"http\",receiver.url=\"http://localhost:9056/endpoints/\",@map(type=\"text\",fail.on.missing.attribute=\"true\",regex.A=\"(.*)\",@attributes(data=\"A\"))) " +
"define stream a4P068X5YCK(data String); " +
"@sink(type=\"mqtt\",url=\"tcp://120.78.71.179:1883\",topic=\"34\",@map(type=\"text\")) " +
"define stream pout(data String); " +
"from a4P068X5YCK " +
"select * " +
"insert into pout; " +
"";
同样的问题仍然存在。我尝试了wso2处理器,它工作正常。现在我的猜测是:
1.版本不匹配
2. wso2处理器依赖项中缺少某些软件包。
我将尝试在这两个方向上进行标识,并在找到新内容后立即在此处和“发行”页面进行更新。
更新3:
当我不断添加更新时,格式似乎有问题,但是幸运的是这个问题也告一段落。我试图包括来自wso2处理器源代码的所有依赖关系,并且我的测试程序开始工作。因此,我假设wso2处理器中缺少siddhi库的组件。
我试图一个一个地删除依赖项,以查看我的测试程序是否仍然有效。终于我找到了那个包。有了这个软件包,我的代码运行良好。
<dependency>
<groupId>org.wso2.msf4j</groupId>
<artifactId>org.wso2.msf4j.feature</artifactId>
<version>${msf4j.version}</version>
<type>zip</type>
</dependency>
因为我是编码的初学者,所以我并不是问题所在。如果有人可以向我解释问题背后的原因,我将不胜感激。我感谢在此过程中获得的所有帮助,这对我来说也是一次很好的经历。
更新4: @Grainier我尝试了您发布的示例代码,它实际上有效!虽然我仍然不知道为什么。我试图将您的确切代码复制到项目中的新.java文件中。它仍然无法正常工作。因此,我想这与POM文件有关。
我注意到的是,当我运行示例代码时,在控制台上打印了一些警告:小更新:我发现出现警告是因为我正在使用JDK10。切换回1.8后,警告消失了,并且代码仍然有效。所以也许这不是原因。
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by io.netty.util.internal.ReflectionUtil (file:/C:/Users/ktz001/.m2/repository/io/netty/netty-common/4.1.16.Final/netty-common-4.1.16.Final.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of io.netty.util.internal.ReflectionUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
第二个区别是在POM文件中。与我相比,您在其中添加了另一个存储库。
<repository>
<id>wso2-nexus</id>
<name>WSO2 internal Repository</name>
<url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>daily</updatePolicy>
<checksumPolicy>ignore</checksumPolicy>
</releases>
</repository>
如果您可以提出任何理由,那就太好了。
感谢您的所有工作!这真的很有帮助。
最佳答案
文档似乎存在问题...这应该与独立的Siddhi一起使用。您所要做的就是在项目中添加以下依赖项(还有mqtt,我在下面没有包括);
<dependencies>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-annotations</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-query-compiler</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.extension.siddhi.io.http</groupId>
<artifactId>siddhi-io-http</artifactId>
<version>${siddhi.io.http.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.extension.siddhi.map.text</groupId>
<artifactId>siddhi-map-text</artifactId>
<version>${siddhi.mapper.text.version}</version>
</dependency>
</dependencies>
但是,查询存在一个问题,即您已为单个流定义了
@source
和@sink
。错了如果要使其成为直通,则必须定义两个流(一个用于源,一个用于接收器),并编写查询以将事件从源流插入到接收器流。更新:
可以找到一个样本here;请尝试一下,看看它是否有效。