sofa-pbrpc高级用法
sofa-pbrpc开源地址: https://github.com/baidu/sofa-pbrpc
sofa-pbrpc高级用法包括:
- 超时控制
- 压缩控制
- 带宽限制
- 日志打印
- RpcController使用
- RPC失败处理
- Server/Client配置参数
- 多Server负载均衡与容错
- 辅助开发工具类
- Mock测试
- sofa-pbrpc-client工具
- 统计与监控
- HTTP支持
- Web监控
超时控制
sofa-pbrpc-clienta-pbrpc支持三个级别的超时控制,按照生效的优先级由高到低依次为:
- Request超时:在RpcController中使用SetTimeout()函数设置超时时间(毫秒),该超时只对本次请求有效;如果不设置,则使用“Method超时”或者“Service超时”;
- Method超时:在proto文件中指定method的超时时间(毫秒),该超时对该方法的所有请求有效;如果不设置,则使用Service超时;
- Service超时:在proto文件中指定service的超时时间(毫秒),该超时对该服务的所有方法的所有请求都有效;如果不设置,则使用默认值(10000毫秒)。
在proto文件中指定“method超时”和“Service超时”的样例如下:
import "sofa/pbrpc/rpc_option.proto";
package sofa.pbrpc.test;
option cc_generic_services = true;
message SleepRequest {
required int32 sleep_time = 1; // in seconds
}
message SleepResponse {
required string message = 1;
}
service SleepServer {
// The service timeout is 2 seconds.
option (sofa.pbrpc.service_timeout) = 2000;
rpc SleepWithServiceTimeout(SleepRequest) returns(SleepResponse);
// The method timeout is 4 seconds.
rpc SleepWithMethodTimeout(SleepRequest) returns(SleepResponse) {
option (sofa.pbrpc.method_timeout) = 4000;
}
}
RpcController中设置超时时间的接口如下:
void SetTimeout(int64 timeout_in_ms);
压缩控制
sofa-pbrpc支持两个级别的数据压缩控制,按照生效优先级由高到低依次为:
- Request压缩控制:在RpcController中使用SetRequestCompressType()和SetResponseCompressType()函数设置请求和回复的数据压缩类型,该设置只对本次请求有效;如果不设置,则使用“Method压缩控制”;
- Method压缩控制:在proto文件中使用扩展option来指定method的Request或者Response的压缩类型,该超时对该方法的所有请求有效;如果不设置,则使用默认值(不压缩);
目前支持的压缩类型包括:
- None
- Gzip
- Zlib
- Snappy
- LZ4
在proto文件中指定压缩类型的样例如下:
import "sofa/pbrpc/rpc_option.proto";
package sofa.pbrpc.test;
option cc_generic_services = true;
message EchoRequest {
required string message = 1;
}
message EchoResponse {
required string message = 1;
}
service EchoServer {
rpc Echo(EchoRequest) returns(EchoResponse) {
option (sofa.pbrpc.request_compress_type) = CompressTypeGzip;
option (sofa.pbrpc.response_compress_type) = CompressTypeGzip;
}
}
RpcController中设置压缩类型的接口如下:
// Set compress type of the request message.
// Supported types:
// CompressTypeNone
// CompressTypeGzip
// CompressTypeZlib
// CompressTypeSnappy
// CompressTypeLZ4
void SetRequestCompressType(CompressType compress_type);
// Set expected compress type of the response message.
// Supported types:
// CompressTypeNone
// CompressTypeGzip
// CompressTypeZlib
// CompressTypeSnappy
// CompressTypeLZ4
void SetResponseCompressType(CompressType compress_type);
带宽限制
sofa-pbrpc支持在RpcServer和RpcClient级别在Options中指定参数就可以控制网络带宽限制,包括“出带宽”、“入带宽”,使流量控制对应用层透明,简化应用层的工作。 在RpcServerOptions和RpcClientOptions中可以分别指定Server端和Client端的最大入带宽“max_throughput_in”和最大出带宽“max_throughput_out”,参数用法与语义如下(默认为-1,表示不控制):
// Network throughput limit.
// The network bandwidth is shared by all connections:
// * busy connections get more bandwidth.
// * the total bandwidth of all connections will not exceed the limit.
int max_throughput_in; // max network in throughput for all connections.
// in MB/s, should >= -1, -1 means no limit, default -1.
int max_throughput_out; // max network out throughput for all connections.
// in MB/s, should >= -1, -1 means no limit, default -1.
对于client端,如果限定了带宽流量,同时请求发送速度过快,来不及发出去的请求就会积压在Request Pending Buffer中。用户可以在Options指定该buffer的大小,如果buffer满了,请求就会立即返回RPC_ERROR_SEND_BUFFER_FULL的错误,用户可以根据需要进行处理,譬如减缓发送速度。在 RpcServerOptions 和 RpcClientOptions 中都可以指定“max_pending_buffer_size”:
int max_pending_buffer_size; // max size of pending buffer for one connection.
// in MB, should >= 0, 0 means no buffer, default 2.
另外需注意:流量控制依据的是用户业务数据的数据量,不包含socket底层的包头协议数据的数据量,所以一般实际出入的数据量会比指定的略大。
以下是某实际系统中的网络流量监控图(带宽限制设置为15MB/s):
日志打印
sofa-pbrpc提供日志打印函数SLOG(),将日志打印到标准错误输出,使用方式类似于printf()。
日志级别由高到低为:
- FATAL
- ERROR
- WARNING
- INFO
- TRACE
- DEBUG
通过打印级别控制日志输出,只会输出高于或者等于打印级别的日志,默认级别为ERROR。
用户可通过SOFA_PBRPC_SET_LOG_LEVEL()设置打印级别,譬如设置为INFO:
SOFA_PBRPC_SET_LOG_LEVEL(INFO);
日志使用样例:
SLOG(ERROR, "error message: %s", message);
RpcController使用
RpcController用于控制单次请求的调用过程。
在Client端调用方法时,都需要传入一个RpcController,其作用是:
- 在发送请求前:控制请求的超时时间,设置Request或者Response的压缩类型;
- 在请求完成后:获取本地和远程的网络地址,获取(成功/失败)状态,如果失败可获取错误信息,检查请求是否已发送至远端; 在Server端服务实现中,外层也会传入一个RpcController,其作用是:
获取本地和远程的网络地址; 如果服务失败,设置失败标志和错误信息;
RPC失败处理
RPC请求失败时,RpcController的Failed()接口会返回true,同时错误信息可以从ErrorCode()和ErrorText()中获得。
错误的原因主要有两大类:
- RPC类的错误:包括缓冲区满、请求没有发出去(网络错误)、请求发出去了但没有在规定时间内得到回复(超时);其中网络错误的原因又有多种,譬如连接不上、网络异常、请求的服务不存在等。
- 服务执行本身的错误(非RPC错误):Server端在服务执行过程中遇到了错误,使用SetFailed(reason)接口设置错误标识和失败原因;Client端的RpcController的操作如下:
- Failed()会返回true;
- ErrorCode()返回的错误码是RPC_ERROR_FROM_USER(错误码的更多定义可参见文件"sofa/pbrpc/rpc_error_code.h");
- ErrorText()返回的错误描述就是Server端通过SetFailed()传入的reason;
Server/Client配置参数
RpcServerOptions
work_thread_num | 工作线程数 |
max_pending_buffer_size | pengding buffer 大小 (MB) |
max_throughput_in | 最大入带宽限制 (MB/s) |
max_throughput_out | 最大出带宽限制 (MB/s) |
keep_alive_time | 空闲连接维持时间 (s) |
RpcClientOptions
work_thread_num | 工作线程数 |
callback_thread_num | 回调线程数 |
max_pending_buffer_size | pengding buffer 大小 (MB) |
max_throughput_in | 最大入带宽限制 (MB/s) |
max_throughput_out | 最大出带宽限制 (MB/s) |
多Server负载均衡与容错
在创建RpcChannel时,可以指定多个功能对等的Server地址。在调用服务时,会根据负载均衡策略选择合适的Server发送请求,并自动进行容错和探活处理。
RpcChannel支持点对多点的构造函数:
RpcChannel(RpcClient* rpc_client,
const std::vector<std::string>& address_list,
const RpcChannelOptions& options = RpcChannelOptions());
// Create multiple server points by address provider.
// The "rpc_client" is owned by the caller.
// The "address_provider" is owned by the caller.
RpcChannel(RpcClient* rpc_client,
AddressProvider* address_provider,
const RpcChannelOptions& options = RpcChannelOptions());
在上面第二个函数中,用户可以提供一个AddressProvider,支持动态增加或者删除server地址。
负载均衡策略:
- 对于每个单点channel,记录其“未完成的调用数(Not Done Calling Count)”,数量越少,表示负载越轻;
- 下次选择server的时候,优先选择“未完成的调用数”最少的机器;
- 在“未完成的调用数”相同的情况下,优先选择“最近最长时间未使用的机器”。
容错与探活策略:
- 每个server内置一个BuiltinService,提供Health()方法用于探活;
- 在Client端维护两个队列:“活动队列”和“待探活队列”;
- RPC调用选择server的时候,优先从“活动队列”中选择;
- 一旦某个server的RPC调用出错,则将其加入“待探活队列”;
- Client周期性地(每隔5秒)向“待探活队列”中的机器发送探活请求,如果探活成功,则重新放回到“活动队列”。
具体策略实现可以参考 src/sofa/pbrpc/dynamic_rpc_channel_impl.cc
使用样例代码可以参考 test/perf_test/client_multi_server.cc
AddressProvider的高级使用可以参考 sample/multi_server_sample
辅助开发工具类
sofa-pbrpc提供了一些方便用户开发的工具类,包括:
智能指针 | sofa/pbrpc/smart_ptr/smart_ptr.hpp | 包括scoped_ptr,shared_ptr,weak_ptr等 |
原子操作 | sofa/pbrpc/atomic.h | 支持fetch,inc,dec,cas等 |
锁操作 | sofa/pbrpc/locks.h | 提供了互斥锁,自旋锁,读写锁的封装 |
定时管理 | sofa/pbrpc/timeout_manager.h | 高效的提供了定时器功能 |
具体使用方法请直接参考头文件。
Mock测试
sofa-pbrpc提供了Mock测试支持,便于用户编写测试程序。
Mock功能的接口主要参考"sofa/pbrpc/mock_test_helper.h"文件,使用样例可参考"sample/mock_sample/"。
主要提供了五个宏:
// Enable or disable the mock feature. Default disabled.
// The mock channel and mock methods will take effect iff mock enabled.
#define SOFA_PBRPC_ENABLE_MOCK() ::sofa::pbrpc::enable_mock()
#define SOFA_PBRPC_DISABLE_MOCK() ::sofa::pbrpc::disable_mock()
// If you create a channel with address of SOFA_PBRPC_MOCK_CHANNEL_ADDRESS, then the channel is a mock
// channel. The mock channel will not create real socket connection, but just uses mock methods.
#define SOFA_PBRPC_MOCK_CHANNEL_ADDRESS "/mock/"
// All mock method implements should use this function signature.
typedef ExtClosure<void(
::google::protobuf::RpcController*,
const ::google::protobuf::Message*,
::google::protobuf::Message*,
::google::protobuf::Closure*)> MockMethodHookFunction;
// Register a mock method implement. If mock enabled, all channels will prefer to call mock
// method first. If the corresponding mock method is not registered, then call the real method.
//
// "method_name" is the full name of the method to be mocked, should be a c-style string.
// "mock_method" is the mock method hook function, should be type of "MockMethodHookFunction*".
//
// For example:
// MockMethodHookFunction* mock_method = sofa::pbrpc::NewPermanentExtClosure(&MockEcho);
// SOFA_PBRPC_REGISTER_MOCK_METHOD("sofa.pbrpc.test.EchoServer.Echo", mock_method);
#define SOFA_PBRPC_REGISTER_MOCK_METHOD(method_name, mock_method) \
::sofa::pbrpc::MockTestHelper::GlobalInstance()->RegisterMockMethod(method_name, (mock_method))
// Clear all registered mock methods. This will not delete the cleared hook functions, which
// are take ownership by user.
#define SOFA_PBRPC_CLEAR_MOCK_METHOD() \
::sofa::pbrpc::MockTestHelper::GlobalInstance()->ClearMockMethod()#
sofa-pbrpc-client工具
sofa-pbrpc提供了一个客户端工具sofa-pbrpc-client(位于"bin/"),用于查询server状态、获取服务列表及proto描述、获取服务调用统计、构建文本快速发送rpc请求等。
具体功能:
- 查询server的健康状况(health)、配置参数(option)、负载情况(status);
- 查询server对外提供的服务列表(list);
- 获取服务的protobuf描述信息(desc);
- 获取服务调用统计(stat),包括处理请求数、平均处理时间、最大处理时间等;
- 使用text格式的请求数据,以通用的方式向server的某个method发送rpc请求调用(call); 使用帮助:
Usage: sofa-pbrpc-client <server-address> <sub-command> [args]
Available subcommands:
* help : print this usage help.
* health :check if the server is healthy.
* status :get status of the server.
* option : get RpcServerOptions of the server.
* list : list all services provided by the server.
* desc <protobuf-type-name> :get descriptor of a protobuf type (service/message/enum).
* call <method-full-name> <request-message-text> [timeout-in-ms] : call a method using the text format of request message.The "timeout-in-ms" is optional, default is 3000 milli-seconds.
* stat [service-full-name] [period-in-seconds] : get the service statistics in the latest period of seconds.The "service-full-name" is optional, default is "all".The "period-in-seconds" is optional, default is 60 seconds.
使用样例: 向监听地址为127.0.0.1:12321的server发送rpc调用"sofa.pbrpc.test.EchoServer.Echo"
$ ./sofa-pbrpc-client 127.0.0.1:12321 call sofa.pbrpc.test.EchoServer.Echo \
'message:"hello from qinzuoyan01"'
Response:
------------------------------
message: "echo message: hello from qinzuoyan01"
------------------------------
统计与监控
sofa-pbrpc支持对服务的调用进行统计,并以内建服务的方式导出统计信息。内建服务的proto定义参见"sofa/pbrpc/builtin_service.proto"。
包含如下统计项:
- 服务在最近N秒中调用的总成功次数、总失败次数;
- 服务各个方法在最近N秒中调用的总成功次数、总失败次数、成功请求的平均耗时和最大耗时、失败请求的平均耗时和最大耗时;
- server端最多存储最近600秒的统计数据; 使用sofa-pbrpc-client工具,可以很容易地获取统计信息,譬如:
$ ./sofa-pbrpc-client 127.0.0.1:12321 stat sofa.pbrpc.test.EchoServer 10
上述命令用于获取"sofa.pbrpc.test.EchoServer.Echo"在最近10秒的调用统计,结果样例如下:
Response:
------------------------------
service_stats {
service_name: "sofa.pbrpc.test.EchoServer"
period_seconds: 10
succeed_count: 1151699
failed_count: 0
method_stats {
method_name: "sofa.pbrpc.test.EchoServer.Echo"
succeed_count: 1151699
succeed_avg_time_us: 0.98021704
succeed_max_time_us: 1108
failed_count: 0
failed_avg_time_us: 0
failed_max_time_us: 0
}
}
------------------------------
除此之外,内建服务还提供了一些监控server状态的接口。sofa-pbrpc-client工具也提供了这些子命令:
- 使用"health"子命令检查server是否存在;
- 使用"status"子命令查看server的一些状态信息,包括连接数、服务数、以及Pending Buffer的一些信息;
- 使用"option"子命令查看server当前的配置参数;
- 使用"list"子命令查看server注册了哪些服务;
- 使用"desc"子命令查询服务相关类型的proto描述;
HTTP支持
sofa-pbrpc从1.0.1开始支持HTTP方式调用服务,同时支持原生POST,GET以及扩展的POST PROTOBUF三种方式提交。另外还提供了Web监控页面,方便开发和测试。
样例可以参见sample/echo/client_http.sh。
POST请求
curl -d '{"message":"Hello, world!"}' http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo
规则:
- URL:http://host:port/method_full_name
- Body:Request Message的json字符串;
- 返回:Content-Type: application/json;Body: Response Message的json字符串;
GET请求
curl http://localhost:12321/sofa.pbrpc.test.EchoServer.Echo?request=%7B%22message%22%3A%22Hello%2C%20world%21%22%7D
规则:
- URL:http://host:port/method_full_name?request=request_json_string
- request_json_string需要对特殊字符({}[]:",)进行url encode;
- 返回:同POST;
POST PROTOBUF请求
普通的GET POST请求使用JSON传递数据,在字段增多的情况下性能下降严重。 所以在条件许可的情况下请使用HTTP POST PROTOBUF的接口,样例可以参见:python/sample/client_http_protobuf.py
Web监控
提供的页面(假设server监听端口为8080):
- 主页:http://localhost:8080/
- 配置参数: http://localhost:8080/options
- 状态:http://localhost:8080/status
- 服务列表:http://localhost:8080/services
主页示例