问题描述
如何从处理程序外部为RPC生成流响应值? (特别是从IObservable),我目前正在执行以下操作,但这会造成跨线程问题,因为AnRxObservable
在RPC处理程序之间共享...
How would I generate streaming response values for an RPC from outside the handler? (specifically, from a IObservable) I'm currently doing the following, but this is creating cross-thread issues because AnRxObservable
is shared between RPC handlers...
public override Task GetTicker(RequestProto request, ServerCallContext context)
{
var subscription = AnRxObservable.Subscribe(value =>
{
responseStream.WriteAsync(new ResponseProto
{
Value = value
});
});
// Wait for the RPC to be canceled (my extension method
// that returns a task that completes when the CancellationToken
// is cancelled)
await context.CancellationToken.WhenCancelled();
// Dispose of the buffered stream
bufferedStream.Dispose();
// Dispose subscriber (tells rx that we aren't subscribed anymore)
subscription.Dispose();
return Task.FromResult(1);
}
这段代码感觉不合适...但是我看不到任何其他方法来从RPC处理程序外部创建的共享源流式传输RPC响应.
This code doesn't feel right... but I can't see any other way of streaming RPC responses from a shared source created outside the RPC handler.
推荐答案
通常来说,当您尝试从推模型(IObservable)转换为拉模型(枚举响应以进行写和写)时,您需要一个中间消息的缓冲区-例如一个blockingQueue.然后,处理程序主体可以是一个异步循环,该循环试图为队列获取下一条消息(最好以异步方式)并将其写入responseStream.
Generally speaking, when you are trying to convert from push model (IObservable) into pull model (enumerating the responses to write and writing them), you need an intermediate buffer for the message - e.g. a blockingQueue. The handler body can then be an async loop that tries to fetch the next message for the queue (preferably in an async fashion) and writes it to the responseStream.
此外,请注意,gRPC API仅允许您在任何给定时间获得1个进行中的响应-您的代码段不尊重该响应.因此,您需要在开始另一个写操作之前等待WriteAsync()(这就是您需要中间队列的另一个原因).
Also, be aware that gRPC API only allows you to have 1 in-flight response at any given time - and your snippet doesn't respect that. So you need to await the WriteAsync() before starting another write (an that's another reason why you need an intermediate queue).
此链接对于解释推式与拉式范例可能很有用:何时使用IEnumerable与IObservable?
This link might be useful in explaining the push vs pull paradigms: When to use IEnumerable vs IObservable?
这篇关于GRPC异步响应流C#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!