问题描述
我想使用Kotlin Sequence
流式传输Spring JDBC RowCallbackHandler
捕获的结果对象.
I want to stream result objects captured by Spring JDBC RowCallbackHandler
using via a Kotlin Sequence
.
代码基本上是这样的:
fun findManyObjects(): Sequence<Thing> = sequence {
val rowHandler = object : RowCallbackHandler {
override fun processRow(resultSet: ResultSet) {
val thing = // create from resultSet
yield(thing) // ERROR! No coroutine scope
}
}
jdbcTemplate.query("select * from ...", rowHandler)
}
但是我得到了编译错误:
But I get the compilation error:
但是,正是这种协程体"应该存在,因为整个块都包装在sequence
构建器中.但这似乎不适用于嵌套对象.
However, exactly this "coroutine body" should exist, because the whole block is wrapped in a sequence
builder. But it doesn't seem to work with a nested object.
显示不与嵌套对象一起编译的最小示例:
Minimal example to show that it doesn't compile with a nested object:
// compiles
sequence {
yield(1)
}
// doesn't compile
sequence {
object {
fun doit() {
yield(1) // Suspension functions can be called only within coroutine body.
}
}
}
如何将对象从ResultSet
传递到Sequence
?
推荐答案
将Flow
用于异步数据流
不能在RowCallbackHandler
对象中调用yield
的原因是双重的.
Use Flow
for asynchronous data streams
The reason you can't call yield
inside your RowCallbackHandler
object is twofold.
-
processRow
函数不是一个挂起的函数(并且不能这样,因为它是在Java中声明和调用的).像yield
这样的挂起函数只能由另一个挂起函数调用. -
sequence { ... }
构建器返回时,序列总是结束.即使您和我知道query
方法在从序列返回之前都将调用RowCallbackHandler
,但Kotlin编译器无法知道这一点.不允许从功能和对象(而不是序列本身)中产生序列值,因为无法知道它们将在何处或何时运行.
- The
processRow
function isn't a suspending function (and can't be, because it's declared in and called by Java). A suspending function likeyield
can only be called by another suspending function. - A sequence always ends when the
sequence { ... }
builder returns. Even if you and I know that thequery
method will invoke theRowCallbackHandler
before returning from the sequence, the Kotlin compiler has no way of knowing that. Yielding sequence values from functions and objects other than the body of the sequence itself is never allowed, because there's no way of knowing where or when they will run.
要解决此问题,我们需要引入另一种协程:一种协程可以在等待RowCallbackHandler
被调用时挂起.
To solve this problem, we need to introduce a different kind of coroutine: one that can suspend itself while it waits for the RowCallbackHandler
to be invoked.
不幸的是,由于我们在这里谈论的是JDBC,因此引入成熟的协程可能没有太多好处.在后台,对数据库的调用将始终以阻塞的方式进行,从而消除了很多好处.不尝试串流化"结果,而只是以一种无聊的,过时的方式遍历它们,可能会更简单.但是,让我们一起探索各种可能性.
Unfortunately, because we're talking about JDBC here, there may not be much to gain by introducing full-blown coroutines. Under the hood, calls to the database will always be made in a blocking way, removing a lot of the benefit. It might well be simpler not to try and 'stream' results, and just iterate over them in a boring, old-fashioned way. But let's explore the possibilities all the same.
序列是为按需计算而设计的,并且非异步.他们等不及其他异步操作,例如回调.序列生成器的yield
函数只是在等待调用者检索下一个项目时挂起,这是序列允许调用的唯一挂起函数.如果尝试在序列中使用诸如delay
之类的简单挂起调用,则可以演示这一点.您会收到一个编译错误,让您知道您在受限的协程范围内进行操作.
Sequences are designed for on-demand computation, and are not asynchronous. They can't wait for other asynchronous operations, such as callbacks. The sequence builder's yield
function simply suspends while waiting for the caller to retrieve the next item, and it's the only suspending function a sequence is ever allowed to call. You can demonstrate this if you try to use a simple suspending call like delay
inside a sequence. You'll get a compile error letting you know that you're operating in a restricted coroutine scope.
sequence<String> { delay(1000) } // doesn't compile
由于无法调用挂起函数,因此无法等待回调被调用.认识到这一局限性,Kotlin为按需值流提供了一种替代机制,该流确实以异步方式提供数据.称为流.
Without the ability to call suspending functions, there's no way to wait for a callback to be invoked. Recognising this limitation, Kotlin provides an alternative mechanism for streams of on-demand values that do provide data in an asynchronous way. It's called a Flow.
Roman Elizarov在他的中型文章 ="a href =" https://medium.com/@elizarov/callbacks-and-kotlin中很好地描述了使用Flows从回调接口提供值的机制. -flows-2b53aa2525cf"rel =" nofollow noreferrer>回调和Kotlin流 .
The mechanism for using Flows to provide values from a callback interface is described very nicely by Roman Elizarov in his Medium article Callbacks and Kotlin Flows.
如果您确实想使用回调流程,只需将sequence
替换为 callbackFlow
,然后将yield
替换为sendBlocking
.
If you did want to use a callback flow, you'd simply replace sequence
with callbackFlow
, and replace yield
with sendBlocking
.
您的代码可能看起来像这样:
Your code might look something like this:
fun findManyObjects(): Flow<Thing> = callbackFlow {
val rowHandler = object : RowCallbackHandler {
override fun processRow(resultSet: ResultSet) {
val thing = // create from resultSet
sendBlocking(thing)
}
}
jdbcTemplate.query("select * from ...", rowHandler)
close() // the query is finished, so there are no more rows
}
更简单的流程
虽然这是流式处理由回调提供的值的惯用方式,但它可能不是解决此问题的最简单方法.通过完全避免回调,可以使用更常见的flow
构建器,将每个值传递给其emit
函数.但是,现在您已经有了协程形式的异步性,您不能只返回一个流然后让Spring立即关闭结果集.您需要能够延迟结果集的关闭,直到实际消耗完该流为止.这意味着剥去RowCallbackHandler
或ResultSetExtractor
提供的抽象,它们希望以阻塞的方式处理所有结果,而是提供自己的实现.
A simpler flow
While that's the idiomatic way to stream values provided by a callback, it might not be the simplest approach to this problem. By avoiding callbacks altogether, you can use the much more common flow
builder, passing each value to its emit
function. But now that you have asynchrony in the form of coroutines, you can't just return a flow and then allow Spring to immediately close the result set. You need to be able to delay the closing of the result set until the flow has actually been consumed. That means peeling back the abstractions provided by RowCallbackHandler
or ResultSetExtractor
, which expect to process all the results in a blocking way, and instead providing your own implementation.
fun Connection.findManyObjects(): Flow<Thing> = flow {
prepareStatement("select * from ...").use { statement ->
statement.executeQuery().use { resultSet ->
while (resultSet.next()) {
val thing = // create from resultSet
emit(thing)
}
}
}
}
请注意use
块,该块将处理关闭语句和结果集.因为直到while
循环完成并发出所有值后,我们才到达use
块的末尾,因此该流可以自由暂停,而结果集保持打开状态.
Note the use
blocks, which will deal with closing the statement and result set. Because we don't reach the end of the use
blocks until the while
loop has completed and all the values have been emitted, the flow is free to suspend while the result set remains open.
您可能会注意到,如果以此方式进行操作,则可以实际将flow
和emit
替换为sequence
和yield
.那么我们来了整整一圈吗?好吧,有点.区别在于flow
只能从协程中使用,而使用sequence
可以遍历结果值而无需暂停.在这种情况下,很难进行调用,因为 JDBC操作始终会阻塞.
You might notice that if you do it this way, you can actually replace flow
and emit
with sequence
and yield
. So have we come full circle? Well, sort of. The difference is that a flow
can only be consumed from a coroutine, whereas with sequence
, you can iterate over the resulting values without suspending at all. In this particular case, it's a hard call to make, because JDBC operations are always blocking.
- 如果使用序列,则调用线程将在等待接收数据时阻塞.序列中的值始终由使用该序列的事物计算,因此,如果该序列调用阻塞函数,则使用者的线程将阻塞等待该值的对象.在非协程应用程序中,这可能还可以,但是如果您正在使用协程,您确实要避免在无害的序列中隐藏阻塞调用.
- 如果使用流,则可以通过使流在特定调度程序上运行来至少隔离阻塞调用.例如,您可以使用内置的IO调度程序来执行JDBC调用,然后切换回默认调度程序以进行进一步的处理.如果您确实想流式传输值,我认为这是比使用序列更好的方法.
请牢记所有这些,如果您选择其中一种解决方案,则需要谨慎使用协程和调度程序.如果您不想为此担心,那么使用常规的ResultSetExtractor
并暂时忘记序列和流并没有什么错.
With all this in mind, you'll need to be careful with your use of coroutines and dispatchers if you do choose one of these solutions. If you'd rather not worry about that, there's nothing wrong with using a regular ResultSetExtractor
and forgetting about both sequences and flows for now.
这篇关于在Kotlin序列中的嵌套对象中使用yield的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!