问题描述
我的输入是一个csv文件,如下所示:
My input is a csv file as given below:
USER_ID, USER_NAME, FREQUENCY, FREQUENCY_DETAIL
A123, AAA, ANNUALLY, 1-JUN
B123, BBB, INVALID_FREQUENCY, 21-JUN
C123, CCC, ANNUALLY, APR
D123, DDD, WEEKLY, 1-DEC
验证:
USER_ID -> alphanumeric
USERNAME -> alphabets only
FREQUENCY -> must be one of DAILY, WEEKLY, MONTHLY , ANNUALLY
FREQUENCY DETAIL -> Pattern \\d{1,2}-\\w{3}
我的Bean如下:
class UserBean {
String userID;
String userName;
String frequency;
String frequencyDetail;
String status = "SUCCESS"; //Code generated. If any of the above fields is not valid, set as "ERROR, <field that failed>". E.g.: ERROR, FREQUENCY_DETAIL
}
我的流程如下:
- 从文件夹读取csv文件
- 转换为UserBean(使用openCsv)
- 如果任何Bean的状态包含 ERROR,请写入一个单独的通道,称为 errorSummaryReportChannel。
此通道将写入userID +文件状态。
This channels writes userID + status to a file.
4。对于所有bean(状态为 SUCCESS或状态为 ERROR),转换为JSON并记录输出。
4. For all beans (with status = "SUCCESS" or status = "ERROR"), convert to JSON and log output.
我需要帮助的地方:
WHERE I NEED HELP:
第3步
如果userBean的状态为= ERROR,写入 errorSummaryReportChannel。
对于所有状态,请按正常流程进行。
If the userBean has status = "ERROR", write to "errorSummaryReportChannel".For all status, proceed with normal flow.
我认为输出通道&丢弃通道必须添加,但找不到任何示例。
I think an output channel & discard channel must be added, but could not find any example.
我的代码:
@Configuration
public class CreateUserConfiguration {
@Bean
public IntegrationFlow createUser() {
return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)))
.enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
.transform(csvToUserBeanTransformer, "convertCsvToUserBean")
.split(userBeanSplitter, "splitUserBeans")
.wireTap(flow -> flow.<PrimeAOBean>filter(primeAOBean -> primeAOBean.getStatus().equalsIgnoreCase("SUCCESS"), errorFlow -> errorFlow.discardChannel("errorSummaryReportGenerationChannel")))
.transform(userBeanToJSONTransformer, "convertUserBeanToJSON")
.handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIRECTORY)))
.get();
}
@Bean
public IntegrationFlow logErrorSummary() {
return IntegrationFlows.from("errorSummaryReportGenerationChannel")
.handle((p,h) -> {
return ((UserBean)(p)).getUserID() + "\t" + ((UserBean)(p)).getStatus();
})
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(OUTPUT_FAILED_REPORT_FILE_NAME)))
.get();
}
@Bean
public IntegrationFlow logError() {
return IntegrationFlows.from("exceptionChannel")
.enrichHeaders(h -> h.headerExpression("errorFileName", "payload.failedMessage.headers.fileName"))
.transform(Transformers.toJson())
.handle(Files.outboundAdapter(new File(generateOutputDirectory(OUTPUT_FAILED_DIRECTORY))))
.get();
}
}
有人可以帮忙吗?
推荐答案
用于过滤器
的输出通道(或流中的任何其他端点)是在流中的端点之前自动创建的端点。因此,在您的情况下,是这样的:
The output channel for filter
(or any other endpoint) in the flow is the one created automatically before the endpoint in the flow. So, in your case it is like this:
.filter()
.transform()
每当消息通过过滤器时,它就会发送到该转换器中。我们可以将其视为 SUCCESS
进程。
Whenever message has passed the filter, it is sent into that transformer. We can treat it as a SUCCESS
process.
现在让我们首先编写一个过滤器函数以生成这些消息才能正常流动!
Now let's write a filter function first of all to make those message to flow properly!
有一种适合您的方法:
/**
* Populate a {@link MessageFilter} with {@link MethodInvokingSelector}
* for the provided {@link GenericSelector}.
* Typically used with a Java 8 Lambda expression:
* <pre class="code">
* {@code
* .filter("World"::equals)
* }
* </pre>
* Use {@link #filter(Class, GenericSelector)} if you need to access the entire
* message.
* @param genericSelector the {@link GenericSelector} to use.
* @param <P> the source payload type.
* @return the current {@link IntegrationFlowDefinition}.
*/
public <P> B filter(GenericSelector<P> genericSelector) {
因此,我们的任务是实现<$ c您的 UserBean
的$ c> GenericSelector :
So, our task is to implement that GenericSelector
for your UserBean
:
.<UserBean>filter(userBean -> userBean.getStatus().equalsIgnoreCase("SUCCESS"))
要将所有其他( ERROR
)消息发送到丢弃通道( errorSummaryReportGenerationChannel
),我们需要将该 discardChannel
选项添加到过滤器中。为此,Java DSL中有一个重载的 filter()
方法:
To send all other (ERROR
) message to the discard channel (errorSummaryReportGenerationChannel
), we need to add that discardChannel
option into the filter somehow. And for this purpose there is an overloaded filter()
method in Java DSL:
.<UserBean>filter(userBean -> userBean.getStatus().equalsIgnoreCase("SUCCESS"),
endpoint -> endpoint.discardChannel("errorSummaryReportGenerationChannel"))
有关更多信息,请参见其JavaDoc。
See their JavaDocs for more information.
这篇关于Spring Integration-过滤器-将消息发送到其他端点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!