下面的代码在运行的15分钟内将EC2实例抛出OOO(java config xms 1024 xmx2G),但是在intellij上运行时不会抛出任何错误。
SqsSource(queueUrl,
//parallelism = maxBufferSize / maxBatchSize 20 10
SqsSourceSettings().withWaitTime(10 seconds)
.withMaxBatchSize(10).withMaxBufferSize(20)
).map {
msg => {
val out = Source.single(msg)
.via(messageToLambdaRequest)
.via(lambdaRequestToLambdaResp)
.via(lambdaRespToAggregationKeySet)
.via(workFlow)
.log("error while consuming events internally.")
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
val reducedResponse = out.map(response => {
response.foldLeft[Response](OK)((a, b) =>
if (a == OK && b == OK) OK else NotOK)
})
val messageAction = reducedResponse
.map(res =>
if (res == OK) {
//log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
delete(msg)
} else
ignore(msg)
)
messageAction
}
}
.mapAsync(1)(identity)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
// For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
// must be less than or equal to the thread pool size.
.log("error log")
.runWith(SqsAckSink(queueUrl, SqsAckSettings(1)))
}
我尝试同时使用1.0-M3和1.0-RC1。
有没有解决的办法?
使用jhat的前5个对象创建直方图-
Class Instance Count Total Size
class [C 1376284 2068640582
class software.amazon.awssdk.services.sqs.model.Message 332718 18632208
class java.lang.String 1375675 16508100
class [Lakka.dispatch.forkjoin.ForkJoinTask; 227 14880304
class scala.collection.immutable.$colon$colon 334396 5350336
我也在这里找到了类似的问题-https://github.com/akka/alpakka/issues/1588
我想知道是否有其他替代方法可以解决此问题。
最佳答案
您可以等待RC2 / 1.0.0 Alpakka版本的发布,也可以同时创建自己的SQS源代码,因为它的代码行并不多:
object MyVeryOwnSqsSource {
def apply(
queueUrl: String,
settings: SqsSourceSettings = SqsSourceSettings.Defaults
)(implicit sqsClient: SqsAsyncClient): Source[Message, NotUsed] =
Source
.repeat {
val requestBuilder =
ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(settings.attributeNames.map(_.name).map(QueueAttributeName.fromValue).asJava)
.messageAttributeNames(settings.messageAttributeNames.map(_.name).asJava)
.maxNumberOfMessages(settings.maxBatchSize)
.waitTimeSeconds(settings.waitTimeSeconds)
settings.visibilityTimeout match {
case None => requestBuilder.build()
case Some(t) => requestBuilder.visibilityTimeout(t.toSeconds.toInt).build()
}
}
.mapAsync(settings.maxBufferSize / settings.maxBatchSize)(sqsClient.receiveMessage(_).toScala)
.map(_.messages().asScala.toList)
.takeWhile(messages => !settings.closeOnEmptyReceive || messages.nonEmpty)
.mapConcat(identity)
.buffer(settings.maxBufferSize, OverflowStrategy.backpressure)
}
然后使用它:
MyVeryOwnSqsSource(queueUrl,
//parallelism = maxBufferSize / maxBatchSize 20 10
SqsSourceSettings().withWaitTime(10 seconds)
.withMaxBatchSize(10).withMaxBufferSize(20)
).map {
msg => {
val out = Source.single(msg)
.via(messageToLambdaRequest)
.via(lambdaRequestToLambdaResp)
.via(lambdaRespToAggregationKeySet)
.via(workFlow)
.log("error while consuming events internally.")
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
val reducedResponse = out.map(response => {
response.foldLeft[Response](OK)((a, b) =>
if (a == OK && b == OK) OK else NotOK)
})
val messageAction = reducedResponse
.map(res =>
if (res == OK) {
//log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
delete(msg)
} else
ignore(msg)
)
messageAction
}
}
.mapAsync(1)(identity)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
// For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
// must be less than or equal to the thread pool size.
.log("error log")