尝试接收自定义对象后,我只能听字符串,它会引发以下错误。看来我需要教Spring处理我的自定义对象(B2BOrder)org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.b2breservas.api.model.B2BOrder] for GenericMessage [payload={"comments":"95d29059-8552-42fa-8fd9-a1d776416269"},
我的SQSConfig
@Configuration
@EnableSqs
public class SqsConfig {
private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(SimpleMessageListenerContainer.class) + "-";
@Bean
public QueueMessagingTemplate myMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resolver) {
ObjectMapper mapper = new ObjectMapper()
.registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JodaModule())
.registerModule(new JavaTimeModule());
// configure the Jackson mapper as needed
// maybe I need to do something here!
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setSerializedPayloadClass(String.class);
converter.setStrictContentTypeMatch(false);
converter.setObjectMapper(mapper);
return new QueueMessagingTemplate(amazonSqs, resolver, converter);
}
@Bean
public ClientConfiguration sqsClientConfiguration() {
return new ClientConfiguration()
.withConnectionTimeout(30000)
.withRequestTimeout(30000)
.withClientExecutionTimeout(30000);
}
@Bean
public ExecutorFactory sqsExecutorFactory() {
return new ExecutorFactory() {
@Override
public ExecutorService newExecutor() {
return new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
};
}
@Value("${b2b.b2b.accesstoken}")
public String accesstoken;
@Value("${b2b.b2b.secretkey}")
public String secretkey;
@Bean
public AmazonSQSAsync amazonSqs(ClientConfiguration sqsClientConfiguration, ExecutorFactory sqsExecutorFactory) {
BasicAWSCredentials credential = new BasicAWSCredentials(accesstoken, secretkey);
return AmazonSQSAsyncClientBuilder.standard()
.withClientConfiguration(sqsClientConfiguration)
.withExecutorFactory(sqsExecutorFactory)
// .withEndpointConfiguration(sqsEndpointConfiguration)
// .withCredentials(credentialsProvider)
.withCredentials(new AWSStaticCredentialsProvider(credential))
.build();
}
@Bean
public AsyncTaskExecutor queueContainerTaskEecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix(DEFAULT_THREAD_NAME_PREFIX);
threadPoolTaskExecutor.setCorePoolSize(2);
threadPoolTaskExecutor.setMaxPoolSize(2);
// No use of a thread pool executor queue to avoid retaining message to long in memory
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(true);
// factory.setQueueMessageHandler();
factory.setMaxNumberOfMessages(1);
factory.setWaitTimeOut(20);
factory.setTaskExecutor(queueContainerTaskEecutor);
return factory;
}
}
听众
@Component
public class SqsHub {
@SqsListener(
"https://sqs.us-west-2.amazonaws.com/3234/32443-checkout.fifo"
)
public void listen(B2BOrder message) {
// public void listen(String message) { THIS WORKS!!
System.out.println("!!!! received message {} {}" + message.toString());
}
}
正在发送
....
@Autowired
AmazonSQSAsync amazonSqs;
@GetMapping("/yay")
public String yay() {
try {
B2BOrder pendingOrder = new B2BOrder();
pendingOrder.setComments(UUID.randomUUID().toString());
String pendingOrderJson = objectMapper.writeValueAsString(pendingOrder);
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);
Map<String, Object> headers = new HashMap<>();
headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "my-application");
headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
queueMessagingTemplate.convertAndSend("booking-checkout.fifo", pendingOrderJson, headers);
} catch (final AmazonClientException | JsonProcessingException ase) {
System.out.println("Error Message: " + ase.getMessage());
}
return "sdkjfn";
}
....
简单的自定义对象
public class B2BOrder implements Serializable {
@JsonProperty
private String comments;
}
更新
@Michiel的答案把我带到了这里,但仍然遇到相同的错误。
@Autowired
public ObjectMapper objectMapper;
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs, AsyncTaskExecutor queueContainerTaskEecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(true);
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSqs);
MappingJackson2MessageConverter jsonMessageConverter = new MappingJackson2MessageConverter();
jsonMessageConverter.setObjectMapper(objectMapper);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(jsonMessageConverter));
factory.setQueueMessageHandler(queueMessageHandlerFactory.createQueueMessageHandler());
// factory.setMaxNumberOfMessages(1);
factory.setWaitTimeOut(20);
factory.setTaskExecutor(queueContainerTaskEecutor);
return factory;
}
```
最佳答案
尽管您已注册MessageConverter
,但仅将其配置为在传出请求中使用(使用QueueMessagingTemplate
)。您的MessageListener
没有MessageConverter
配置。因此,传入消息只能以“原始”类型(如字符串)进行检索。
在您的代码段中,您已注释了以下代码行:
// factory.setQueueMessageHandler();
在这里可以设置本身已附加一个或多个
QueueMessageHandler
的MessageConverters
。[编辑]
当然:
QueueMessageHandlerFactory handlerFactory = new QueueMessageHandlerFactory();
handlerFactory.setMessageConverters(yourJacksonConfig);
QueueMessageHandler messageHandler = handlerFactory.createQueueMessageHandler();
factory.setQueueMessageHandler(messageHandler);
这Spring documentation可能有帮助。