我正在使用Direct-VM在同一JVM中运行的两个骆驼上下文之间进行通信。但是它不起作用,我的生产者一直在寻找消费者:(我在这里错过了什么吗?感谢您的任何帮助,谢谢:)

这是我的生产者,将数据从文件夹中的文件发送到端点

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class OrderRouter {

    public static void main(String args[]) throws Exception {
   // create CamelContext
    CamelContext context = new DefaultCamelContext();

// add our route to the CamelContext
    context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() {

        from("file:src/data?noop=true").to("direct-vm:pipeRequestDR91");

        }
    });

    // start the route and let it do its work
    context.start();
    Thread.sleep(10000);

    // stop the CamelContext
    context.stop();
    }
}


这是我的消费者在端点上的轮询。

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class App {
    public static void main(String[] args) throws Exception {

    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {

            from("direct-vm:pipeRequestDR91").process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    System.out.println("Reached here");
                    System.out.println("Received XML order: "
                            + exchange.getIn().getHeader("CamelFileName"));
                    String strBody = exchange.getIn().getBody(String.class);
                    System.out.println(strBody);
                }
            });

        }
    });

    context.start();
    Thread.sleep(100000);
    }
}


这是错误:

    Message History

---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [file://src/data?noop=true                                                     ] [         5]
[route1            ] [to1               ] [direct-vm:pipeRequestDR91                                                     ] [         0]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
    Id                  ID-LP0G0085-56374-1502833874430-0-418
    ExchangePattern     InOnly
    Headers             {breadcrumbId=ID-LP0G0085-56374-1502833874430-0-417, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\Prashant\camelinaction-master\chapter2\cbr\src\data\message2.csv, CamelFileContentType=application/vnd.ms-excel, CamelFileLastModified=1436197636000, CamelFileLength=53, CamelFileName=message2.csv, CamelFileNameConsumed=message2.csv, CamelFileNameOnly=message2.csv, CamelFileParent=src\data, CamelFilePath=src\data\message2.csv, CamelFileRelativePath=message2.csv, CamelRedelivered=false, CamelRedeliveryCounter=0}
    BodyType            org.apache.camel.component.file.GenericFile
    Body                [Body is file based: GenericFile[message2.csv]]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.directvm.DirectVmConsumerNotAvailableException: No consumers available on endpoint: Endpoint[direct-vm://pipeRequestDR91]. Exchange[message2.csv]
    at org.apache.camel.component.directvm.DirectVmProducer.process(DirectVmProducer.java:51)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:435)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:211)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:175)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)[camel-core-2.15.2.jar:2.15.2]
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)[camel-core-2.15.2.jar:2.15.2]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.7.0_71]
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)[:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)[:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.7.0_71]
    at java.lang.Thread.run(Unknown Source)[:1.7.0_71]
[1) thread #0 - file://src/data] GenericFileOnCompletion        WARN  Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@281eff for file: GenericFile[message3.csl]
[1) thread #0 - file://src/data] DefaultErrorHandler

最佳答案

这是一个最小的示例(但是Thread.Sleep只是显示在没有osgi / spring等的同一个jvm中意味着两个上下文的快速方法):

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class Main {
    public static void main(String[] args) throws Exception {
        CamelContext first = new DefaultCamelContext();
        first.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("timer:start").to("direct-vm:test");
            }
        });

        CamelContext second = new DefaultCamelContext();
        second.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct-vm:test").log("Message received");
            }
        });

        first.start();
        second.start();

        Thread.sleep(100000);
    }
}


实际上,正如我所看到的,真正的要求是在不同应用程序中的不同骆驼上下文之间进行通信,因此JVM将有所不同。类似于“ direct”或“ direct-vm”的最快方法是使用camel-netty4并将选项“ transferExchange”设置为“ true”(http://camel.apache.org/netty4.html)。这是一个示例,其中包含两个使用这种方式进行通信的独立应用程序:

App1.java

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class App1 {
    public static void main(String[] args) throws Exception {
        CamelContext first = new DefaultCamelContext();
        first.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("timer:start").to("netty4:tcp://127.0.0.1:9999?transferExchange=true");
            }
        });

        first.start();
        Thread.sleep(100000);
    }
}


App2.java

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class App2 {
    public static void main(String[] args) throws Exception {
        CamelContext second = new DefaultCamelContext();
        second.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("netty4:tcp://127.0.0.1:9999?transferExchange=true").log("Message received");
            }
        });

        second.start();

        Thread.sleep(100000);
    }
}


通过更改IP地址,可以在不同的计算机等上运行应用程序。

09-26 09:17