我的团队正在设计一种具有微服务架构的可伸缩解决方案,并计划将gRPC用作层之间的传输通信。我们已经决定使用异步grpc模型。如果我缩放RPC方法的数量,example(greeter_async_server.cc)提供的设计似乎不可行,因为那样我就必须为每个RPC方法创建一个新类,并像这样在HandleRpcs()中创建它们的对象。
Pastebin(简短示例代码)。

   void HandleRpcs() {
            new CallDataForRPC1(&service_, cq_.get());
            new CallDataForRPC2(&service_, cq_.get());
            new CallDataForRPC3(&service, cq_.get());
            // so on...
    }
它将被硬编码,所有的灵活性都将丢失。
我大约有300-400RPC的方法要实现,而当我不得不每秒处理100K RPC请求以上时,具有300-400的类将是繁琐且效率低下的,而该解决方案是一个非常糟糕的设计。我无法承受在每个单个请求上以这种方式创建对象的开销。有人可以为我提供解决方法吗?异步grpc c++能否不像其同步伴侣那样简单?
编辑:为了使情况更加清晰,对于那些可能难以理解这个异步示例流程的人,我正在写我到目前为止所了解的内容,如果在某些地方出错,请让我正确。
在异步grpc中,每次我们必须将唯一标记与完成队列绑定(bind)在一起时,以便在我们轮询时,当特定的RPC将被客户端命中时,服务器可以将其返回给我们,然后我们从返回的内容中进行推断。有关通话类型的唯一标签。service_->RequestRPC2(&ctx_, &request_, &responder_, cq_, cq_,this);在这里,我们将当前对象的地址用作唯一标记。这就像在完成队列上注册RPC调用一样。然后,我们轮询HandleRPCs()以查看客户端是否命中RPC,如果是,则cq_->Next(&tag, &OK)将填充该标记。轮询代码段:
while (true) {
          GPR_ASSERT(cq_->Next(&tag, &ok));
          GPR_ASSERT(ok);
          static_cast<CallData*>(tag)->Proceed();
        }
由于我们注册到队列中的唯一标签是CallData对象的地址,因此我们可以调用Proceed()。这对于RPC来说很好,它的逻辑位于Proceed()中。但是每次有了更多的RPC时,我们都会将其全部包含在CallData中,然后在轮询时,我们将调用唯一的Proceed(),该GenericCallData包含逻辑,例如RPC1(postgres调用),RPC2(mongodb调用), .. 很快。这就像在一个函数中编写所有程序一样。因此,为避免这种情况,我将virtual void Proceed()类与Proceed()一起使用,并派生出派生类,每个RPC一个类,并在自己的proceed()中包含自己的逻辑。这是一个可行的解决方案,但我想避免编写许多类。
我尝试的另一种解决方案是将所有RPC函数逻辑都保留在std::map<long, std::function</*some params*/>>中,并保留在其自己的函数中,并维护全局&tag。因此,每当我将带有唯一标记的RPC注册到队列中时,我都会存储其对应的逻辑功能(我肯定会将其硬编码到语句中并绑定(bind)所有必需的参数),然后将唯一标记作为键。轮询时,当我获得ojit_code时,我会在 map 中为此键进行查找并调用相应的已保存函数。现在,还有一个障碍,我将必须在函数逻辑中执行此操作:
// pseudo code
void function(reply, responder, context, service)
{
    // register this RPC with another unique tag so to serve new incoming request of the same type on the completion queue
     service_->RequestRPC1(/*params*/, new_unique_id);
    // now again save this new_unique_id and current function into the map, so when tag will be returned we can do lookup
     map.emplace(new_unique_id, function);

    // now you're free to do your logic
    // do your logic
}
您会看到,代码现在已经扩展到另一个模块中,并且是基于RPC的。
希望它能清除情况。
我想如果有人可以以更简单的方式实现这种类型的服务器。

最佳答案

到目前为止,这篇文章已经很老了,但是我还没有看到任何答案或示例,因此我将向其他读者展示如何解决它。我大约有30个RPC调用,并且正在寻找一种减少添加和删除RPC调用时占用空间的方法。我花了一些迭代才能找到解决它的好方法。
因此,从我的(g)RPC库获取RPC请求的接口(interface)是接收方需要实现的回调接口(interface)。该界面如下所示:

class IRpcRequestHandler
{
public:
    virtual ~IRpcRequestHandler() = default;
    virtual void onZigbeeOpenNetworkRequest(const smarthome::ZigbeeOpenNetworkRequest& req,
                                            smarthome::Response& res) = 0;
    virtual void onZigbeeTouchlinkDeviceRequest(const smarthome::ZigbeeTouchlinkDeviceRequest& req,
                                                smarthome::Response& res) = 0;
    ...
};
还有一些用于在gRPC服务器启动后设置/注册每个RPC方法的代码:
void ready()
{
    SETUP_SMARTHOME_CALL("ZigbeeOpenNetwork", // Alias that is used for debug messages
                         smarthome::Command::AsyncService::RequestZigbeeOpenNetwork,  // Generated gRPC service method for async.
                         smarthome::ZigbeeOpenNetworkRequest, // Generated gRPC service request message
                         smarthome::Response, // Generated gRPC service response message
                         IRpcRequestHandler::onZigbeeOpenNetworkRequest); // The callback method to call when request has arrived.

    SETUP_SMARTHOME_CALL("ZigbeeTouchlinkDevice",
                         smarthome::Command::AsyncService::RequestZigbeeTouchlinkDevice,
                         smarthome::ZigbeeTouchlinkDeviceRequest,
                         smarthome::Response,
                         IRpcRequestHandler::onZigbeeTouchlinkDeviceRequest);
    ...
}
这是添加和删除RPC方法时需要关心的全部。
SETUP_SMARTHOME_CALL是一个自制的宏,如下所示:
#define SETUP_SMARTHOME_CALL(ALIAS, SERVICE, REQ, RES, CALLBACK_FUNC) \
  new ServerCallData<REQ, RES>(                                       \
      ALIAS,                                                          \
      std::bind(&SERVICE,                                             \
                &mCommandService,                                     \
                std::placeholders::_1,                                \
                std::placeholders::_2,                                \
                std::placeholders::_3,                                \
                std::placeholders::_4,                                \
                std::placeholders::_5,                                \
                std::placeholders::_6),                               \
      mCompletionQueue.get(),                                         \
      std::bind(&CALLBACK_FUNC, requestHandler, std::placeholders::_1, std::placeholders::_2))
我认为ServerCallData类看起来像gRPCs示例中的类,但进行了一些修改。 ServerCallData派生自具有抽象函数void proceed(bool ok)的非templete类,用于CompletionQueue::Next()处理。创建ServerCallData时,它将调用SERVICE方法在CompletionQueue上进行注册,并在每个首次proceed(ok)调用中进行克隆,从而将自己注册另一个实例。如果有人感兴趣,我也可以发布一些示例代码。
编辑:在下面添加了更多示例代码。
GrpcServer
class GrpcServer
{
 public:
  explicit GrpcServer(std::vector<grpc::Service*> services);
  virtual ~GrpcServer();

  void run(const std::string& sslKey,
           const std::string& sslCert,
           const std::string& password,
           const std::string& listenAddr,
           uint32_t port,
           uint32_t threads = 1);

 private:
  virtual void ready();  // Called after gRPC server is created and before polling CQ.
  void handleRpcs();  // Function that polls from CQ, can be run by multiple threads. Casts object to CallData and calls CallData::proceed().

  std::unique_ptr<ServerCompletionQueue> mCompletionQueue;
  std::unique_ptr<Server> mServer;
  std::vector<grpc::Service*> mServices;
  std::list<std::shared_ptr<std::thread>> mThreads;
  ...
}
以及CallData对象的主要部分:
template <typename TREQUEST, typename TREPLY>
class ServerCallData : public ServerCallMethod
{
 public:
  explicit ServerCallData(const std::string& methodName,
                          std::function<void(ServerContext*,
                                             TREQUEST*,
                                             ::grpc::ServerAsyncResponseWriter<TREPLY>*,
                                             ::grpc::CompletionQueue*,
                                             ::grpc::ServerCompletionQueue*,
                                             void*)> serviceFunc,
                          grpc::ServerCompletionQueue* completionQueue,
                          std::function<void(const TREQUEST&, TREPLY&)> callback,
                          bool first = false)
      : ServerCallMethod(methodName),
        mResponder(&mContext),
        serviceFunc(serviceFunc),
        completionQueue(completionQueue),
        callback(callback)
  {
    requestNewCall();
  }

  void proceed(bool ok) override
  {
    if (!ok)
    {
      delete this;
      return;
    }

    if (callStatus() == ServerCallMethod::PROCESS)
    {
      callStatus() = ServerCallMethod::FINISH;
      new ServerCallData<TREQUEST, TREPLY>(callMethodName(), serviceFunc, completionQueue, callback);

      try
      {
        callback(mRequest, mReply);
      }
      catch (const std::exception& e)
      {
        mResponder.Finish(mReply, Status::CANCELLED, this);
        return;
      }

      mResponder.Finish(mReply, Status::OK, this);
    }
    else
    {
      delete this;
    }
  }

 private:
  void requestNewCall()
  {
    serviceFunc(
        &mContext, &mRequest, &mResponder, completionQueue, completionQueue, this);
  }

  ServerContext mContext;
  TREQUEST mRequest;
  TREPLY mReply;
  ServerAsyncResponseWriter<TREPLY> mResponder;
  std::function<void(ServerContext*,
                     TREQUEST*,
                     ::grpc::ServerAsyncResponseWriter<TREPLY>*,
                     ::grpc::CompletionQueue*,
                     ::grpc::ServerCompletionQueue*,
                     void*)>
      serviceFunc;
  std::function<void(const TREQUEST&, TREPLY&)> callback;
  grpc::ServerCompletionQueue* completionQueue;
};

关于c++ - grpc C++中的异步模型,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49318889/

10-11 01:22
查看更多