TThreadPoolServer直接继承自TServer,实现类serve和stop操作。

在serve中可以接受多个连接,每个连接单独开一个线程进行处理,在每个线程中,按顺序处理该线程所绑定连接的请求,因此对同一个连接来说,是同步的。

serve函数主要代码:

while (!stopped_) {
int failureCount = 0;
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
} catch (TTransportException ttx) {
if (!stopped_) {
++failureCount;
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
}

其中线程池以及最大线程数通过ThreadPoolExecutor类来实现。

在每个WorkerProcess中,处理请求关键代码如下:

    public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null; TServerEventHandler eventHandler = null;
ServerContext connectionContext = null; try {
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); eventHandler = getEventHandler();
if (eventHandler != null) {
connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
}
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
while (true) { if (eventHandler != null) {
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
} if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
break;
}
}
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
} catch (Exception x) {
LOGGER.error("Error occurred during processing of message.", x);
} if (eventHandler != null) {
eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
} if (inputTransport != null) {
inputTransport.close();
} if (outputTransport != null) {
outputTransport.close();
}
}

注意一下类型为ServerContext的connectionContext变量的生命周期以及角色:
1)在处理所有请求之前,通过eventHandler.createContext(inputProtocol, outputProtocol)创建;

2)在处理每个请求前,通过eventHandler.processContext(connectionContext, inputTransport, outputTransport)进行处理;

3)在停止处理时,通过eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol) 释放相关资源,生命宣告停止。

使用者可以通过eventHandler插入自己的代码。

【Tips】

根据上面对connectionContext生命周期的分析,我们可以自己实现TServerEventHandler接口,然后在实现类中保存ServerContext信息以及其它附属信息,因为TServerEventHandler可以访问传输层级别,因此可以在其中保存一些连接的底层信息,比如客户端ip,端口等;在processContext函数中,可以更新每次请求前的临时信息供该次请求使用,这样,在我们的IDL中定义的实现函数中,可以访问TServerEventHandler对象,取得保存在其中的信息进行处理,这是典型的【黑板报】设计模式。

04-15 07:40