我有一个关于爬虫的简单测试,该爬虫应该调用回购协议(protocol)40次:
@Test
fun testX() {
// ...
runBlocking {
crawlYelp.concurrentCrawl()
// Thread.sleep(5000) // works if I un-comment
}
verify(restaurantsRepository, times(40)).saveAll(restaurants)
// ...
}
和此实现:
suspend fun concurrentCrawl() {
cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.async {
val rests = scrapYelp.scrap(loc, start * 10)
restaurantsRepository.saveAll(rests)
}
}
}
}
但是...我明白了:
Wanted 40 times:
-> at ....testConcurrentCrawl(CrawlYelpTest.kt:46)
But was 30 times:
(30一直在变化;因此似乎测试没有在等待……)
为什么我睡觉时会过去?鉴于我遇到了封锁,因此不需要。
顺便说一句,我有一个应该保持异步的 Controller :
@PostMapping("crawl")
suspend fun crawl(): String {
crawlYelp.concurrentCrawl()
return "crawling" // this is supposed to be returned right away
}
谢谢
最佳答案
runBlocking
等待所有挂起函数完成,但是concurrentCrawl
基本上只是使用GlobalScope.async
currentCrawl
在新线程中启动新作业,因此runBlocking
是在所有作业启动之后而不是在所有这些作业完成之后完成的。
您必须等待所有以GlobalScope.async
开头的作业完成,如下所示:
suspend fun concurrentCrawl() {
cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.async {
val rests = scrapYelp.scrap(loc, start * 10)
restaurantsRepository.saveAll(rests)
}
}.awaitAll()
}
}
如果要等待
concurrentCrawl()
在concurrentCrawl()
之外完成,则必须将Deferred
结果传递给调用函数,如以下示例所示。在这种情况下,可以从suspend
中删除concurrentCrawl()
关键字。fun concurrentCrawl(): List<Deferred<Unit>> {
return cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.async {
println("hallo world $start")
}
}
}.flatten()
}
runBlocking {
concurrentCrawl().awaitAll()
}
如注释中所述:在这种情况下,
async
方法不返回任何值,因此最好使用launch:fun concurrentCrawl(): List<Job> {
return cities.map { loc ->
1.rangeTo(10).map { start ->
GlobalScope.launch {
println("hallo world $start")
}
}
}.flatten()
}
runBlocking {
concurrentCrawl().joinAll()
}