问题描述
我现在正在 gRPC 中创建一个小型聊天应用程序,但我遇到了一个问题,如果用户想要作为客户端连接到 gRPC 服务器,我想广播该事件已发生到所有其他连接的客户端.
I'm creating a small chat application in gRPC right now and I've run into the issue where if a user wants to connect to the gRPC server as a client, I'd like to broadcast that the event has occurred to all other connected clients.
我正在考虑使用某种观察者,但我对服务器如何知道谁连接了以及我如何将事件广播给所有客户端而不仅仅是一两个客户端感到困惑.
I'm thinking of using some sort of observer but I"m confused as to how the server knows of who is connected and how I would broadcast the event to all clients and not just one or two.
我知道使用流是答案的一部分,但是因为每个客户端都在使用服务器创建自己的流,所以我不确定它如何订阅其他服务器-客户端流.
I know using streams is part of the answer, but because each client is creating it's own stream with the server, I'm unsure of how it can subscribe to other server-client streams.
推荐答案
另一种选择是使用长轮询方法.那就是尝试像下面这样的东西(Python 中的代码,因为这是我最熟悉的,但 go 应该非常相似).这未经测试,只是为了让您了解如何在 gRPC 中进行长轮询:
Another option would be to use a long-polling approach. That is try something like below (code in Python, since that is what I'm most familiar with, but go should be very similar). This was not tested, and is meant to just give you an idea of how to do long-polling in gRPC:
.PROTO defs
-------------------------------------------------
service Updater {
rpc GetUpdates(GetUpdatesRequest) returns (GetUpdatesResponse);
}
message GetUpdatesRequest {
int64 last_received_update = 1;
}
message GetUpdatesResponse {
repeated Update updates = 1;
int64 update_index = 2;
}
message Update {
// your update structure
}
SERVER
-----------------------------------------------------------
class UpdaterServer(UpdaterServicer):
def __init__(self):
self.condition = threading.Condition()
self.updates = []
def post_update(self, update):
"""
Used whenever the clients should be updated about something. It will
trigger their long-poll calls to return
"""
with self.condition:
# TODO: You should probably remove old updates after some time
self.updates.append(updates)
self.condition.notify_all()
def GetUpdates(self, req, context):
with self.condition:
while self.updates[req.last_received_update + 1:] == []:
self.condition.wait()
new_updates = self.updates[req.last_received_update + 1:]
response = GetUpdatesResponse()
for update in new_updates:
response.updates.add().CopyFrom(update)
response.update_index = req.last_received_update + len(new_updates)
return response
SEPARATE THREAD IN THE CLIENT
----------------------------------------------
request = GetUpdatesRequest()
request.last_received_update = -1
while True:
stub = UpdaterStub(channel)
try:
response = stub.GetUpdates(request, timeout=60*10)
handle_updates(response.updates)
request.last_received_update = response.update_index
except grpc.FutureTimeoutError:
pass
这篇关于如何在 gRPC 中从服务器到客户端广播?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!