问题描述
Spring的反应堆具有一个有趣的功能:对冲.这意味着产生许多请求并获得第一个返回的结果,并自动清除其他上下文. Josh Long 最近一直在积极推广此功能.谷歌搜索春季反应堆套期显示了相对结果.如果有人好奇,请此处是示例代码.简而言之,Flux.first()
简化了所有底层的麻烦,这令人印象深刻.
Spring's reactor has an interesting feature : Hedging . It means spawning many requests and get the first returned result , and automatically clean other contexts. Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here is the sample code . In short , Flux.first()
simplifies all the underlaying hassles , which is very impressive.
我想知道如何使用Kotlin的协程和多线程(以及也许使用Flow
或Channel
)实现这一点.我想到了一个简单的场景:一个服务接受longUrl并将其生成为许多URL缩短服务(例如IsGd,TinyUrl ...),然后返回第一个返回的URL ...(并终止/清除其他线程/协程资源)
I wonder how this can be achieved with Kotlin's coroutine and multithread , (and maybe with Flow
or Channel
) . I thought of a simple scenario : One service accepts longUrl and spawns the longUrl to many URL shorten service ( such as IsGd , TinyUrl ...) , and returns the first returned URL ... (and terminates / cleans other thread / coroutine resources)
有一个定义此工作的接口UrlShorter
:
There is an interface UrlShorter
that defines this work :
interface UrlShorter {
fun getShortUrl(longUrl: String): String?
}
共有三种实现,一种用于 is.gd ,另一种用于 tinyUrl ,第三个是Dumb实现,它会阻止10秒并返回null:
And there are three implementations , one for is.gd , another for tinyUrl , and the third is a Dumb implementation that blocks 10 seconds and return null :
class IsgdImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
// isGd api url blocked by SO , it sucks . see the underlaying gist for full code
val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
class TinyImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by stackoverflow , see the underlaying gist for full code
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
class DumbImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
TimeUnit.SECONDS.sleep(10)
return null
}
}
有一个UrlShorterService
接受所有UrlShorter
实现,并尝试生成协程并获得第一个结果.
And there is a UrlShorterService
that takes all the UrlShorter
implementations , and try to spawn coroutines and get the first result .
这就是我想到的:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
private val esDispatcher = es.asCoroutineDispatcher()
suspend fun getShortUrl(longUrl: String): String {
return method1(longUrl) // there are other methods , with different ways...
}
private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
for (element in this) {
val result = transform(element)
if (result != null) return result
}
return null
}
客户端也很简单:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {
@Test
fun testHedging() {
val impls = listOf(DumbImpl(), IsgdImpl(), TinyImpl()) // Dumb first
val service = UrlShorterService(impls)
runBlocking {
service.getShortUrl("https://www.google.com").also {
logger.info("result = {}", it)
}
}
}
}
注意,我将DumbImpl
放在第一位,因为我希望它可以首先产生并阻塞其线程.其他两种实现方式都可以得到结果.
Notice I put the DumbImpl
first , because I hope it may spawn first and blocking in its thread. And other two implementations can get result.
好的,这是问题所在,如何在Kotlin中实现套期保值?我尝试以下方法:
OK , here is the problem , how to achieve hedging in kotlin ? I try the following methods :
private suspend fun method1(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}.flowOn(esDispatcher)
}.first()
.also { esDispatcher.cancelChildren() } // doesn't impact the result
}
我希望method1
应该工作,但它完全可以执行10秒:
I hope method1
should work , but it totally executes 10 seconds :
00:56:09,253 INFO TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
00:56:13,604 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
00:56:19,261 INFO UrlShorterServiceTest$testHedging$1 - result = // tiny url blocked by SO , it sucks
然后,我以为其他method2,method3,method4,method5 ...都不能用:
Then , I thought other method2 , method3 , method4 , method5 ... but all not work :
/**
* 00:54:29,035 INFO IsgdImpl - running : pool-1-thread-3
* 00:54:29,036 INFO DumbImpl - running : pool-1-thread-2
* 00:54:29,035 INFO TinyImpl - running : pool-1-thread-1
* 00:54:30,228 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 00:54:30,797 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 00:54:39,046 INFO UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method2(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 00:52:30,681 INFO IsgdImpl - running : pool-1-thread-2
* 00:52:30,682 INFO DumbImpl - running : pool-1-thread-1
* 00:52:30,681 INFO TinyImpl - running : pool-1-thread-3
* 00:52:31,838 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 00:52:33,721 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 00:52:40,691 INFO UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method3(longUrl: String): String {
return coroutineScope {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 01:58:56,930 INFO TinyImpl - running : pool-1-thread-1
* 01:58:56,933 INFO DumbImpl - running : pool-1-thread-2
* 01:58:56,930 INFO IsgdImpl - running : pool-1-thread-3
* 01:58:58,411 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 01:58:59,026 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 01:59:06,942 INFO UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method4(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
我不熟悉Channel
,对不起↓
/**
* 01:29:44,460 INFO UrlShorterService$method5$2 - channel closed
* 01:29:44,461 INFO DumbImpl - running : pool-1-thread-2
* 01:29:44,460 INFO IsgdImpl - running : pool-1-thread-3
* 01:29:44,466 INFO TinyImpl - running : pool-1-thread-1
* 01:29:45,765 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 01:29:46,339 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
*
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
*
*/
private suspend fun method5(longUrl: String): String {
val channel = Channel<String>()
withContext(esDispatcher) {
impls.forEach { impl ->
launch {
impl.getShortUrl(longUrl)?.also {
channel.send(it)
}
}
}
channel.close()
logger.info("channel closed")
}
return channel.consumeAsFlow().first()
}
好的,我不知道是否还有其他方法...但是以上所有方法均无效...所有阻止至少10秒钟(被DumbImpl
阻止).
OK , I don't know if there are any other ways ... but all above are not working... All blocks at least 10 seconds ( blocked by DumbImpl
) .
完整的源代码可以在 github gist 上找到.
The whole source code can be found on github gist .
如何在Kotlin中实现套期保值?通过Deferred
或Flow
或Channel
还是其他更好的主意?谢谢.
How can hedging be achieved in kotlin ? By Deferred
or Flow
or Channel
or any other better ideas ? Thank you.
提交问题后,我发现所有tinyurl,isGd url被SO阻止了.真的很烂!
After submitting the question , I found all tinyurl , isGd url are blocked by SO . It really sucks !
推荐答案
如果要并行执行的实际工作包括网络获取,则应选择一个异步网络库,以便可以与其一起正确使用非阻塞协程.例如,从版本11开始,JDK提供了一个异步HTTP客户端,您可以按以下方式使用它:
If the actual work you want to do in parallel consists of network fetches, you should choose an async networking library so you can properly use non-blocking coroutines with it. For example, as of version 11 the JDK provides an async HTTP client which you can use as follows:
val httpClient: HttpClient = HttpClient.newHttpClient()
suspend fun httpGet(url: String): String = httpClient
.sendAsync(
HttpRequest.newBuilder().uri(URI.create(url)).build(),
BodyHandlers.ofString())
.await()
.body()
在上述的可暂停实现中,这是一个完成请求对冲的功能:
Here's a function that accomplishes request hedging given a suspendable implementation like above:
class UrlShortenerService(
private val impls: List<UrlShortener>
) {
suspend fun getShortUrl(longUrl: String): String? = impls
.asFlow()
.flatMapMerge(impls.size) { impl ->
flow<String?> {
try {
impl.getShortUrl(longUrl)?.also { emit(it) }
}
catch (e: Exception) {
// maybe log it, but don't let it propagate
}
}
}
.onCompletion { emit(null) }
.first()
}
请注意,没有任何自定义调度程序,您不需要它们即可进行可挂起的工作.任何调度程序都会这样做,并且所有工作都可以在单个线程中运行.
Note the absence of any custom dispatchers, you don't need them for suspendable work. Any dispatcher will do, and all the work can run in a single thread.
当所有URL缩短程序均失败时,onCompletion
部分将开始执行.在这种情况下,flatMapMerge
阶段什么也不发出,并且first()
会在没有向流中注入额外的null
的情况下死锁.
The onCompletion
parts steps into action when your all URL shorteners fail. In that case the flatMapMerge
stage doesn't emit anything and first()
would deadlock without the extra null
injected into the flow.
要测试它,我使用了以下代码:
To test it I used the following code:
class Shortener(
private val delay: Long
) : UrlShortener {
override suspend fun getShortUrl(longUrl: String): String? {
delay(delay * 1000)
println("Shortener $delay completing")
if (delay == 1L) {
throw Exception("failed service")
}
if (delay == 2L) {
return null
}
return "shortened after $delay seconds"
}
}
suspend fun main() {
val shorteners = listOf(
Shortener(4),
Shortener(3),
Shortener(2),
Shortener(1)
)
measureTimeMillis {
UrlShortenerService(shorteners).getShortUrl("bla").also {
println(it)
}
}.also {
println("Took $it ms")
}
}
这会执行各种失败情况,例如返回null或出现异常失败.对于此代码,我得到以下输出:
This exercises the various failure cases like returning null or failing with an exception. For this code I get the following output:
Shortener 1 completing
Shortener 2 completing
Shortener 3 completing
shortened after 3 seconds
Took 3080 ms
我们可以看到,起酥油1和2已完成但有故障,起酥油3返回了有效的响应,而起酥油4已取消,直到完成.我认为这符合要求.
We can see that the shorteners 1 and 2 completed but with a failure, shortener 3 returned a valid response, and shortener 4 was cancelled before completing. I think this matches the requirements.
如果您不能摆脱阻止请求,则您的实现将不得不启动num_impls * num_concurrent_requests
线程,这不是很好.但是,如果这是您所能拥有的最好的解决方案,那么这是一个对冲阻止请求,但可以挂起或取消等待的请求的实现.它将向运行请求的工作线程发送中断信号,但是如果您的库的IO代码不可中断,则这些线程将挂起,等待其请求完成或超时.
If you can't move away from blocking requests, your implementation will have to start num_impls * num_concurrent_requests
threads, which is not great. However, if that's the best you can have, here's an implementation that hedges blocking requests but awaits on them suspendably and cancellably. It will send an interrupt signal to the worker threads running the requests, but if your library's IO code is non-interruptible, these threads will hang waiting for their requests to complete or time out.
val es = Executors.newCachedThreadPool()
interface UrlShortener {
fun getShortUrl(longUrl: String): String? // not suspendable!
}
class UrlShortenerService(
private val impls: List<UrlShortener>
) {
suspend fun getShortUrl(longUrl: String): String {
val chan = Channel<String?>()
val futures = impls.map { impl -> es.submit {
try {
impl.getShortUrl(longUrl)
} catch (e: Exception) {
null
}.also { runBlocking { chan.send(it) } }
} }
try {
(1..impls.size).forEach { _ ->
chan.receive()?.also { return it }
}
throw Exception("All services failed")
} finally {
chan.close()
futures.forEach { it.cancel(true) }
}
}
}
这篇关于Kotlin实现多线程请求套期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!