sofa-pbrpc高级用法

sofa-pbrpc高级用法-LMLPHP

sofa-pbrpc开源地址: https://github.com/baidu/sofa-pbrpc

sofa-pbrpc高级用法包括:

  • 超时控制
  • 压缩控制
  • 带宽限制
  • 日志打印
  • RpcController使用
  • RPC失败处理
  • Server/Client配置参数
  • 多Server负载均衡与容错
  • 辅助开发工具类
  • Mock测试
  • sofa-pbrpc-client工具
  • 统计与监控
  • HTTP支持
  • Web监控

超时控制

sofa-pbrpc-clienta-pbrpc支持三个级别的超时控制,按照生效的优先级由高到低依次为:

  • Request超时:在RpcController中使用SetTimeout()函数设置超时时间(毫秒),该超时只对本次请求有效;如果不设置,则使用“Method超时”或者“Service超时”;
  • Method超时:在proto文件中指定method的超时时间(毫秒),该超时对该方法的所有请求有效;如果不设置,则使用Service超时;
  • Service超时:在proto文件中指定service的超时时间(毫秒),该超时对该服务的所有方法的所有请求都有效;如果不设置,则使用默认值(10000毫秒)。

在proto文件中指定“method超时”和“Service超时”的样例如下:

import "sofa/pbrpc/rpc_option.proto";

package sofa.pbrpc.test;

option cc_generic_services = true;

message SleepRequest {
        required int32 sleep_time = 1; // in seconds
}

message SleepResponse {
        required string message = 1;
}

service SleepServer {
        // The service timeout is 2 seconds.
        option (sofa.pbrpc.service_timeout) = 2000;

        rpc SleepWithServiceTimeout(SleepRequest) returns(SleepResponse);

        // The method timeout is 4 seconds.
        rpc SleepWithMethodTimeout(SleepRequest) returns(SleepResponse) {
                option (sofa.pbrpc.method_timeout) = 4000;
        }
}

RpcController中设置超时时间的接口如下:

void SetTimeout(int64 timeout_in_ms);

压缩控制

sofa-pbrpc支持两个级别的数据压缩控制,按照生效优先级由高到低依次为:

  • Request压缩控制:在RpcController中使用SetRequestCompressType()和SetResponseCompressType()函数设置请求和回复的数据压缩类型,该设置只对本次请求有效;如果不设置,则使用“Method压缩控制”;
  • Method压缩控制:在proto文件中使用扩展option来指定method的Request或者Response的压缩类型,该超时对该方法的所有请求有效;如果不设置,则使用默认值(不压缩);

目前支持的压缩类型包括:

  • None
  • Gzip
  • Zlib
  • Snappy
  • LZ4

在proto文件中指定压缩类型的样例如下:

import "sofa/pbrpc/rpc_option.proto";

package sofa.pbrpc.test;

option cc_generic_services = true;

message EchoRequest {
        required string message = 1;
}

message EchoResponse {
        required string message = 1;
}

service EchoServer {
        rpc Echo(EchoRequest) returns(EchoResponse) {
                option (sofa.pbrpc.request_compress_type) = CompressTypeGzip;
                option (sofa.pbrpc.response_compress_type) = CompressTypeGzip;
        }
}

RpcController中设置压缩类型的接口如下:

// Set compress type of the request message.
// Supported types:
//   CompressTypeNone
//   CompressTypeGzip
//   CompressTypeZlib
//   CompressTypeSnappy
//   CompressTypeLZ4
void SetRequestCompressType(CompressType compress_type);

// Set expected compress type of the response message.
// Supported types:
//   CompressTypeNone
//   CompressTypeGzip
//   CompressTypeZlib
//   CompressTypeSnappy
//   CompressTypeLZ4
void SetResponseCompressType(CompressType compress_type);

带宽限制

sofa-pbrpc支持在RpcServer和RpcClient级别在Options中指定参数就可以控制网络带宽限制,包括“出带宽”、“入带宽”,使流量控制对应用层透明,简化应用层的工作。 在RpcServerOptions和RpcClientOptions中可以分别指定Server端和Client端的最大入带宽“max_throughput_in”和最大出带宽“max_throughput_out”,参数用法与语义如下(默认为-1,表示不控制):


// Network throughput limit.
// The network bandwidth is shared by all connections:
// * busy connections get more bandwidth.
// * the total bandwidth of all connections will not exceed the limit.
int max_throughput_in;     // max network in throughput for all connections.
                          // in MB/s, should >= -1, -1 means no limit, default -1.
int max_throughput_out;    // max network out throughput for all connections.
                          // in MB/s, should >= -1, -1 means no limit, default -1.

对于client端,如果限定了带宽流量,同时请求发送速度过快,来不及发出去的请求就会积压在Request Pending Buffer中。用户可以在Options指定该buffer的大小,如果buffer满了,请求就会立即返回RPC_ERROR_SEND_BUFFER_FULL的错误,用户可以根据需要进行处理,譬如减缓发送速度。在 RpcServerOptions 和 RpcClientOptions 中都可以指定“max_pending_buffer_size”:

int max_pending_buffer_size; // max size of pending buffer for one connection.
                            // in MB, should >= 0, 0 means no buffer, default 2.

另外需注意:流量控制依据的是用户业务数据的数据量,不包含socket底层的包头协议数据的数据量,所以一般实际出入的数据量会比指定的略大。

以下是某实际系统中的网络流量监控图(带宽限制设置为15MB/s): sofa-pbrpc高级用法-LMLPHP

日志打印

sofa-pbrpc提供日志打印函数SLOG(),将日志打印到标准错误输出,使用方式类似于printf()。

日志级别由高到低为:

  • FATAL
  • ERROR
  • WARNING
  • INFO
  • TRACE
  • DEBUG

通过打印级别控制日志输出,只会输出高于或者等于打印级别的日志,默认级别为ERROR。

用户可通过SOFA_PBRPC_SET_LOG_LEVEL()设置打印级别,譬如设置为INFO:

SOFA_PBRPC_SET_LOG_LEVEL(INFO);

日志使用样例:

SLOG(ERROR, "error message: %s", message);

RpcController使用

RpcController用于控制单次请求的调用过程。

在Client端调用方法时,都需要传入一个RpcController,其作用是:

  • 在发送请求前:控制请求的超时时间,设置Request或者Response的压缩类型;
  • 在请求完成后:获取本地和远程的网络地址,获取(成功/失败)状态,如果失败可获取错误信息,检查请求是否已发送至远端; 在Server端服务实现中,外层也会传入一个RpcController,其作用是:

获取本地和远程的网络地址; 如果服务失败,设置失败标志和错误信息;

RPC失败处理

RPC请求失败时,RpcController的Failed()接口会返回true,同时错误信息可以从ErrorCode()和ErrorText()中获得。

错误的原因主要有两大类:

  • RPC类的错误:包括缓冲区满、请求没有发出去(网络错误)、请求发出去了但没有在规定时间内得到回复(超时);其中网络错误的原因又有多种,譬如连接不上、网络异常、请求的服务不存在等。
  • 服务执行本身的错误(非RPC错误):Server端在服务执行过程中遇到了错误,使用SetFailed(reason)接口设置错误标识和失败原因;Client端的RpcController的操作如下:
    1. Failed()会返回true;
    2. ErrorCode()返回的错误码是RPC_ERROR_FROM_USER(错误码的更多定义可参见文件"sofa/pbrpc/rpc_error_code.h");
    3. ErrorText()返回的错误描述就是Server端通过SetFailed()传入的reason;

Server/Client配置参数

RpcServerOptions

work_thread_num 工作线程数
max_pending_buffer_size pengding buffer 大小 (MB)
max_throughput_in 最大入带宽限制 (MB/s)
max_throughput_out 最大出带宽限制 (MB/s)
keep_alive_time 空闲连接维持时间 (s)

RpcClientOptions

work_thread_num 工作线程数
callback_thread_num 回调线程数
max_pending_buffer_size pengding buffer 大小 (MB)
max_throughput_in 最大入带宽限制 (MB/s)
max_throughput_out 最大出带宽限制 (MB/s)

多Server负载均衡与容错

在创建RpcChannel时,可以指定多个功能对等的Server地址。在调用服务时,会根据负载均衡策略选择合适的Server发送请求,并自动进行容错和探活处理。

RpcChannel支持点对多点的构造函数:


    RpcChannel(RpcClient* rpc_client,
               const std::vector<std::string>& address_list,
               const RpcChannelOptions& options = RpcChannelOptions());

    // Create multiple server points by address provider.
    // The "rpc_client" is owned by the caller.
    // The "address_provider" is owned by the caller.
    RpcChannel(RpcClient* rpc_client,
               AddressProvider* address_provider,
               const RpcChannelOptions& options = RpcChannelOptions());

在上面第二个函数中,用户可以提供一个AddressProvider,支持动态增加或者删除server地址。

负载均衡策略:

  • 对于每个单点channel,记录其“未完成的调用数(Not Done Calling Count)”,数量越少,表示负载越轻;
  • 下次选择server的时候,优先选择“未完成的调用数”最少的机器;
  • 在“未完成的调用数”相同的情况下,优先选择“最近最长时间未使用的机器”。

容错与探活策略:

  • 每个server内置一个BuiltinService,提供Health()方法用于探活;
  • 在Client端维护两个队列:“活动队列”和“待探活队列”;
  • RPC调用选择server的时候,优先从“活动队列”中选择;
  • 一旦某个server的RPC调用出错,则将其加入“待探活队列”;
  • Client周期性地(每隔5秒)向“待探活队列”中的机器发送探活请求,如果探活成功,则重新放回到“活动队列”。

具体策略实现可以参考 src/sofa/pbrpc/dynamic_rpc_channel_impl.cc

使用样例代码可以参考 test/perf_test/client_multi_server.cc

AddressProvider的高级使用可以参考 sample/multi_server_sample

辅助开发工具类

sofa-pbrpc提供了一些方便用户开发的工具类,包括:

智能指针 sofa/pbrpc/smart_ptr/smart_ptr.hpp 包括scoped_ptr,shared_ptr,weak_ptr等
原子操作 sofa/pbrpc/atomic.h 支持fetch,inc,dec,cas等
锁操作 sofa/pbrpc/locks.h 提供了互斥锁,自旋锁,读写锁的封装
定时管理 sofa/pbrpc/timeout_manager.h 高效的提供了定时器功能

具体使用方法请直接参考头文件。

Mock测试

sofa-pbrpc提供了Mock测试支持,便于用户编写测试程序。

Mock功能的接口主要参考"sofa/pbrpc/mock_test_helper.h"文件,使用样例可参考"sample/mock_sample/"。

主要提供了五个宏:

// Enable or disable the mock feature.  Default disabled.
// The mock channel and mock methods will take effect iff mock enabled.
#define SOFA_PBRPC_ENABLE_MOCK() ::sofa::pbrpc::enable_mock()
#define SOFA_PBRPC_DISABLE_MOCK() ::sofa::pbrpc::disable_mock()

// If you create a channel with address of SOFA_PBRPC_MOCK_CHANNEL_ADDRESS, then the channel is a mock
// channel.  The mock channel will not create real socket connection, but just uses mock methods.
#define SOFA_PBRPC_MOCK_CHANNEL_ADDRESS "/mock/"

// All mock method implements should use this function signature.
typedef ExtClosure<void(
::google::protobuf::RpcController*,
const ::google::protobuf::Message*,
::google::protobuf::Message*,
::google::protobuf::Closure*)> MockMethodHookFunction;

// Register a mock method implement.  If mock enabled, all channels will prefer to call mock
// method first. If the corresponding mock method is not registered, then call the real method.
//
// "method_name" is the full name of the method to be mocked, should be a c-style string.
// "mock_method" is the mock method hook function, should be type of "MockMethodHookFunction*".
//
// For example:
//     MockMethodHookFunction* mock_method = sofa::pbrpc::NewPermanentExtClosure(&MockEcho);
//     SOFA_PBRPC_REGISTER_MOCK_METHOD("sofa.pbrpc.test.EchoServer.Echo", mock_method);
#define SOFA_PBRPC_REGISTER_MOCK_METHOD(method_name, mock_method) \
::sofa::pbrpc::MockTestHelper::GlobalInstance()->RegisterMockMethod(method_name, (mock_method))

// Clear all registered mock methods.  This will not delete the cleared hook functions, which
// are take ownership by user.
#define SOFA_PBRPC_CLEAR_MOCK_METHOD() \
::sofa::pbrpc::MockTestHelper::GlobalInstance()->ClearMockMethod()#

sofa-pbrpc-client工具

sofa-pbrpc提供了一个客户端工具sofa-pbrpc-client(位于"bin/"),用于查询server状态、获取服务列表及proto描述、获取服务调用统计、构建文本快速发送rpc请求等。

具体功能:

  • 查询server的健康状况(health)、配置参数(option)、负载情况(status);
  • 查询server对外提供的服务列表(list);
  • 获取服务的protobuf描述信息(desc);
  • 获取服务调用统计(stat),包括处理请求数、平均处理时间、最大处理时间等;
  • 使用text格式的请求数据,以通用的方式向server的某个method发送rpc请求调用(call); 使用帮助:

Usage: sofa-pbrpc-client <server-address> <sub-command> [args]
Available subcommands:
  * help : print this usage help.
  * health :check if the server is healthy.
  * status :get status of the server.
  * option : get RpcServerOptions of the server.
  * list : list all services provided by the server.
  * desc  <protobuf-type-name> :get descriptor of a protobuf type (service/message/enum).
  * call  <method-full-name> <request-message-text> [timeout-in-ms] : call a method using the text format of request message.The "timeout-in-ms" is optional, default is 3000 milli-seconds.
  * stat  [service-full-name] [period-in-seconds] :  get the service statistics in the latest period of seconds.The "service-full-name" is optional, default is "all".The "period-in-seconds" is optional, default is 60 seconds.

使用样例: 向监听地址为127.0.0.1:12321的server发送rpc调用"sofa.pbrpc.test.EchoServer.Echo"

$ ./sofa-pbrpc-client 127.0.0.1:12321 call sofa.pbrpc.test.EchoServer.Echo \
'message:"hello from qinzuoyan01"'
Response:
------------------------------
message: "echo message: hello from qinzuoyan01"
------------------------------

统计与监控

sofa-pbrpc支持对服务的调用进行统计,并以内建服务的方式导出统计信息。内建服务的proto定义参见"sofa/pbrpc/builtin_service.proto"。

包含如下统计项:

  • 服务在最近N秒中调用的总成功次数、总失败次数;
  • 服务各个方法在最近N秒中调用的总成功次数、总失败次数、成功请求的平均耗时和最大耗时、失败请求的平均耗时和最大耗时;
  • server端最多存储最近600秒的统计数据; 使用sofa-pbrpc-client工具,可以很容易地获取统计信息,譬如:
$ ./sofa-pbrpc-client 127.0.0.1:12321 stat sofa.pbrpc.test.EchoServer 10

上述命令用于获取"sofa.pbrpc.test.EchoServer.Echo"在最近10秒的调用统计,结果样例如下:

Response:
------------------------------
service_stats {
  service_name: "sofa.pbrpc.test.EchoServer"
  period_seconds: 10
  succeed_count: 1151699
  failed_count: 0
  method_stats {
    method_name: "sofa.pbrpc.test.EchoServer.Echo"
    succeed_count: 1151699
    succeed_avg_time_us: 0.98021704
    succeed_max_time_us: 1108
    failed_count: 0
    failed_avg_time_us: 0
    failed_max_time_us: 0
  }
}
------------------------------

除此之外,内建服务还提供了一些监控server状态的接口。sofa-pbrpc-client工具也提供了这些子命令:

  • 使用"health"子命令检查server是否存在;
  • 使用"status"子命令查看server的一些状态信息,包括连接数、服务数、以及Pending Buffer的一些信息;
  • 使用"option"子命令查看server当前的配置参数;
  • 使用"list"子命令查看server注册了哪些服务;
  • 使用"desc"子命令查询服务相关类型的proto描述;

HTTP支持

sofa-pbrpc从1.0.1开始支持HTTP方式调用服务,同时支持原生POST,GET以及扩展的POST PROTOBUF三种方式提交。另外还提供了Web监控页面,方便开发和测试。

样例可以参见sample/echo/client_http.sh。

POST请求

curl -d '{"message":"Hello, world!"}' http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo

规则:

  • URL:http://host:port/method_full_name
  • Body:Request Message的json字符串;
  • 返回:Content-Type: application/json;Body: Response Message的json字符串;

GET请求

curl http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo?request=%7B%22message%22%3A%22Hello%2C%20world%21%22%7D

规则:

POST PROTOBUF请求

普通的GET POST请求使用JSON传递数据,在字段增多的情况下性能下降严重。 所以在条件许可的情况下请使用HTTP POST PROTOBUF的接口,样例可以参见:python/sample/client_http_protobuf.py

Web监控

提供的页面(假设server监听端口为8080):

主页示例

sofa-pbrpc高级用法-LMLPHP

04-18 13:35