我正在使用Spring AWS Cloud Framework轮询队列中的S3事件通知。我正在使用QueueMessagingTemplate
来做到这一点。我希望能够设置最大消息数和等待时间以进行轮询:http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html。
import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
public class MyQueue {
private QueueMessagingTemplate queueMsgTemplate;
@Autowired
public MyQueue(QueueMessagingTemplate queueMsgTemplate) {
this.queueMsgTemplate = queueMsgTemplate;
}
@Override
public S3EventNotification poll() {
S3EventNotification s3Event = queueMsgTemplate
.receiveAndConvert("myQueueName", S3EventNotification.class);
}
}
上下文
@Bean
public AWSCredentialsProviderChain awsCredentialsProviderChain() {
return new AWSCredentialsProviderChain(
new DefaultAWSCredentialsProviderChain());
}
@Bean
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
public AmazonSQS sqsClient(ClientConfiguration clientConfiguration,// UserData userData,
AWSCredentialsProviderChain credentialsProvider) {
AmazonSQSClient amazonSQSClient = new AmazonSQSClient(credentialsProvider, clientConfiguration);
amazonSQSClient.setEndpoint("http://localhost:9324");
return amazonSQSClient;
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS sqsClient) {
return new QueueMessagingTemplate(sqsClient);
}
任何想法如何配置这些?谢谢
最佳答案
QueueMessagingTemplate.receiveAndConvert()
基于QueueMessageChannel.receive()
方法,您可以在其中找到所需的代码:
@Override
public Message<String> receive() {
return this.receive(0);
}
@Override
public Message<String> receive(long timeout) {
ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
new ReceiveMessageRequest(this.queueUrl).
withMaxNumberOfMessages(1).
withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
withAttributeNames(ATTRIBUTE_NAMES).
withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
if (receiveMessageResult.getMessages().isEmpty()) {
return null;
}
com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
Message<String> message = createMessage(amazonMessage);
this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
return message;
}
因此,如您所见,
withMaxNumberOfMessages(1)
被硬编码为1
。这是正确的,因为receive()
只能轮询一条消息。 withWaitTimeSeconds(Long.valueOf(timeout).intValue())
完全等于提供的timeout
。而且,如果是receiveAndConvert()
,则不能修改它。考虑使用
QueueMessageChannel.receive(long timeout)
中的messageConverter
和QueueMessagingTemplate
。就像是在完成:public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass) throws MessagingException {
Message<?> message = destination.receive();
if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass);
} else {
return null;
}
}
您可以通过以下代码获取适当的
QueueMessageChannel
:String physicalResourceId = this.destinationResolver.resolveDestination(destination);
new QueueMessageChannel(this.amazonSqs, physicalResourceId);