我正在使用Direct-VM在同一JVM中运行的两个骆驼上下文之间进行通信。但是它不起作用,我的生产者一直在寻找消费者:(我在这里错过了什么吗?感谢您的任何帮助,谢谢:)
这是我的生产者,将数据从文件夹中的文件发送到端点
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class OrderRouter {
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("file:src/data?noop=true").to("direct-vm:pipeRequestDR91");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(10000);
// stop the CamelContext
context.stop();
}
}
这是我的消费者在端点上的轮询。
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class App {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("direct-vm:pipeRequestDR91").process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Reached here");
System.out.println("Received XML order: "
+ exchange.getIn().getHeader("CamelFileName"));
String strBody = exchange.getIn().getBody(String.class);
System.out.println(strBody);
}
});
}
});
context.start();
Thread.sleep(100000);
}
}
这是错误:
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [file://src/data?noop=true ] [ 5]
[route1 ] [to1 ] [direct-vm:pipeRequestDR91 ] [ 0]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-LP0G0085-56374-1502833874430-0-418
ExchangePattern InOnly
Headers {breadcrumbId=ID-LP0G0085-56374-1502833874430-0-417, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\Prashant\camelinaction-master\chapter2\cbr\src\data\message2.csv, CamelFileContentType=application/vnd.ms-excel, CamelFileLastModified=1436197636000, CamelFileLength=53, CamelFileName=message2.csv, CamelFileNameConsumed=message2.csv, CamelFileNameOnly=message2.csv, CamelFileParent=src\data, CamelFilePath=src\data\message2.csv, CamelFileRelativePath=message2.csv, CamelRedelivered=false, CamelRedeliveryCounter=0}
BodyType org.apache.camel.component.file.GenericFile
Body [Body is file based: GenericFile[message2.csv]]
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.directvm.DirectVmConsumerNotAvailableException: No consumers available on endpoint: Endpoint[direct-vm://pipeRequestDR91]. Exchange[message2.csv]
at org.apache.camel.component.directvm.DirectVmProducer.process(DirectVmProducer.java:51)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:435)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:211)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:175)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)[camel-core-2.15.2.jar:2.15.2]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)[camel-core-2.15.2.jar:2.15.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.7.0_71]
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)[:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)[:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.7.0_71]
at java.lang.Thread.run(Unknown Source)[:1.7.0_71]
[1) thread #0 - file://src/data] GenericFileOnCompletion WARN Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@281eff for file: GenericFile[message3.csl]
[1) thread #0 - file://src/data] DefaultErrorHandler
最佳答案
这是一个最小的示例(但是Thread.Sleep只是显示在没有osgi / spring等的同一个jvm中意味着两个上下文的快速方法):
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class Main {
public static void main(String[] args) throws Exception {
CamelContext first = new DefaultCamelContext();
first.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer:start").to("direct-vm:test");
}
});
CamelContext second = new DefaultCamelContext();
second.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct-vm:test").log("Message received");
}
});
first.start();
second.start();
Thread.sleep(100000);
}
}
实际上,正如我所看到的,真正的要求是在不同应用程序中的不同骆驼上下文之间进行通信,因此JVM将有所不同。类似于“ direct”或“ direct-vm”的最快方法是使用camel-netty4并将选项“ transferExchange”设置为“ true”(http://camel.apache.org/netty4.html)。这是一个示例,其中包含两个使用这种方式进行通信的独立应用程序:
App1.java
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class App1 {
public static void main(String[] args) throws Exception {
CamelContext first = new DefaultCamelContext();
first.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer:start").to("netty4:tcp://127.0.0.1:9999?transferExchange=true");
}
});
first.start();
Thread.sleep(100000);
}
}
App2.java
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class App2 {
public static void main(String[] args) throws Exception {
CamelContext second = new DefaultCamelContext();
second.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("netty4:tcp://127.0.0.1:9999?transferExchange=true").log("Message received");
}
});
second.start();
Thread.sleep(100000);
}
}
通过更改IP地址,可以在不同的计算机等上运行应用程序。