我有一个正在处理的CSV格式,如下所示:
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01指定标题行,而02指定正文行。
我需要获取标头数据并将其添加到正文消息中,因此最终发送这样的消息:
H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5
我已经尝试进行汇总,但是在这种情况下这似乎不是正确的EIP,因为我只是一次又一次地将同一条消息而不是多条消息组合在一起...从根本上讲,我需要访问标头数据以处理身体(实际上这只是一个字段)。我只是不知道如何设置变量,因为每次交换都会清除标头和属性。有小费吗?提前致谢。让我知道骆驼路线的现状是否会有所帮助。
这是骆驼路线,可能会有所帮助:
from("direct:inventory")
.split(body().tokenize("\n")).streaming()
.throttle(100)
.choice()
.when(property("CamelSplitComplete").isEqualTo(true))
.log("Processed ${property.CamelSplitSize} updates")
.end()
.unmarshal(csv)
.log("${body}")
.aggregate(header("CamelFileLastModified"), new InventoryAggregationStrategy())
.completionPredicate(header("aggregationComplete").isEqualTo(true))
.to("freemarker://templates/inventory.ftl")
.unmarshal().string("UTF-8")
.unmarshal().json(JsonLibrary.Jackson)
.convertBodyTo(JsonObject.class)
.to("endpoint");
Here is the spec for the data
最佳答案
您总是可以使用简单的方法并使用bean。
public class CamelHeadersAndRows {
public static class HeaderBean {
String header = null;
public void setHeader(String body) {
header = body.substring("01,".length());
}
public String useHeader(String body) {
return header + "," + body.substring("02,".length());
}
}
public static void main(String[] args) throws Exception {
final HeaderBean headerBean = new HeaderBean();
Main camelMain = new Main();
camelMain.addRouteBuilder(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer:foo?period=1s&repeatCount=1")
.setBody(constant(
"01,H2,H3\n" +
"02,B2,B3,B4,B5\n" +
"02,B2,B3,B4,B5\n" +
"02,B2,B3,B4,B5\n" +
"02,B2,B3,B4,B5\n" +
"01,H2,H3\n" +
"02,B2,B3,B4,B5\n" +
"02,B2,B3,B4,B5\n" +
"01,H2,H3\n" +
"02,B2,B3,B4,B5\n" +
"02,B2,B3,B4,B5\n" +
"02,B2,B3,B4,B5"
))
.to("direct:inventory");
from("direct:inventory")
.split(body().tokenize("\n")).streaming()
.choice()
.when().simple("${body} regex '^01.*'")
.bean(headerBean, "setHeader")
.stop()
.otherwise()
.bean(headerBean, "useHeader")
.end()
.log("message: ${body}")
;
}
});
camelMain.run();
}
}