我正在使用Spring AWS Cloud Framework轮询队列中的S3事件通知。我正在使用QueueMessagingTemplate来做到这一点。我希望能够设置最大消息数和等待时间以进行轮询:http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html

import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;

public class MyQueue {

    private QueueMessagingTemplate queueMsgTemplate;

    @Autowired
    public MyQueue(QueueMessagingTemplate queueMsgTemplate) {
        this.queueMsgTemplate = queueMsgTemplate;

    }

    @Override
    public S3EventNotification poll() {
        S3EventNotification s3Event = queueMsgTemplate
            .receiveAndConvert("myQueueName", S3EventNotification.class);
    }
}

上下文
@Bean
public AWSCredentialsProviderChain awsCredentialsProviderChain() {
    return new AWSCredentialsProviderChain(
            new DefaultAWSCredentialsProviderChain());
}

@Bean
public ClientConfiguration clientConfiguration() {
    return new ClientConfiguration();
}

@Bean
public AmazonSQS sqsClient(ClientConfiguration clientConfiguration,// UserData userData,
                           AWSCredentialsProviderChain credentialsProvider) {
    AmazonSQSClient amazonSQSClient = new AmazonSQSClient(credentialsProvider, clientConfiguration);
    amazonSQSClient.setEndpoint("http://localhost:9324");
    return amazonSQSClient;
}

@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS sqsClient) {
    return new QueueMessagingTemplate(sqsClient);
}

任何想法如何配置这些?谢谢

最佳答案

QueueMessagingTemplate.receiveAndConvert()基于QueueMessageChannel.receive()方法,您可以在其中找到所需的代码:

@Override
public Message<String> receive() {
    return this.receive(0);
}

@Override
public Message<String> receive(long timeout) {
    ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
            new ReceiveMessageRequest(this.queueUrl).
                    withMaxNumberOfMessages(1).
                    withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
                    withAttributeNames(ATTRIBUTE_NAMES).
                    withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
    if (receiveMessageResult.getMessages().isEmpty()) {
        return null;
    }
    com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
    Message<String> message = createMessage(amazonMessage);
    this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
    return message;
}

因此,如您所见,withMaxNumberOfMessages(1)被硬编码为1。这是正确的,因为receive()只能轮询一条消息。 withWaitTimeSeconds(Long.valueOf(timeout).intValue())完全等于提供的timeout。而且,如果是receiveAndConvert(),则不能修改它。

考虑使用QueueMessageChannel.receive(long timeout)中的messageConverterQueueMessagingTemplate。就像是在完成:
public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass) throws MessagingException {
    Message<?> message = destination.receive();
    if (message != null) {
        return (T) getMessageConverter().fromMessage(message, targetClass);
    } else {
        return null;
    }
}

您可以通过以下代码获取适当的QueueMessageChannel:
String physicalResourceId = this.destinationResolver.resolveDestination(destination);
new QueueMessageChannel(this.amazonSqs, physicalResourceId);

08-17 18:05