Producer线程模型分析
线程传递过程:网络eventloop线程接收请求->传给group线程处理->group处理完后->返还给网络线程去响应
eventloop线程找业务线程:通过查找operationMeta获取,operationMeta是存储处理业务逻辑元数据信息,代码AbstractRestInvocation中体现这一过程
try {
operationMeta.getExecutor().execute(() -> {
synchronized (this.requestEx) {
try {
if (isInQueueTimeout()) {
throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
}
if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
// already timeout
// in this time, request maybe recycled and reused by web container, do not use requestEx
LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
operationMeta.getHttpMethod(),
operationMeta.getMicroserviceQualifiedName());
return;
}
runOnExecutor();
} catch (InvocationException e) {
LOGGER.error("Invocation failed, cause={}", e.getMessage());
sendFailResponse(e);
} catch (Throwable e) {
LOGGER.error("Processing rest server request error", e);
sendFailResponse(e);
}
}
});
} catch (Throwable e) {
LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
sendFailResponse(e);
}
业务线程找eventloop线程:eventloop网络线程在放到responseEx上下文中,这个在VertxRestDispatcher可以看到responseEx预置了网络线程
HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());
//vertx context 设置
public VertxServerResponseToHttpServletResponse(HttpServerResponse serverResponse) {
this.context = Vertx.currentContext();
this.serverResponse = serverResponse;
Objects.requireNonNull(context, "must run in vertx context.");
}
线程切换过程详细
1.接收请求的vertx eventloop线程
2.从vertx eventloop线程转入业务线程处理
3.进入ProducerOperationHandler的handler,仍然是业务线程group
4.进入ProducerOperationHandler的syncInvoke的,最终进入sendResponseQuietly,线程仍然是group
// ProducerOperationHandler的asyncResp
public void syncInvoke(Invocation invocation, SwaggerProducerOperation producerOperation, AsyncResponse asyncResp) {
....
asyncResp.handle(response);
}
//上述的asyncResp正是AbstractRestInvocation传进去的sendResponseQuietly
protected void doInvoke() throws Throwable {
invocation.onStartHandlersRequest();
invocation.next(resp -> sendResponseQuietly(resp));
}
5.响应 group线程,通过驱动responseEx去响应,把响应交由eventloop线程去处理,eventloop线程在responseEx的context,即context.runOnContext(V -> internalFlushBuffer());
// responseEx
protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) {
...
responseEx.flushBuffer();
...
}
// 进入VertxServerResponseToHttpServletResponse的flushBuffer
public void flushBuffer() {
if (context == Vertx.currentContext()) {
internalFlushBuffer();
return;
}
context.runOnContext(V -> internalFlushBuffer());
}
//通过serverResponse.end(bodyBuffer)发送响应,vertx框架做的事情
public void internalFlushBuffer() {
...
serverResponse.end(bodyBuffer);
}
通过以上分析可知,Producer对client的请求处理链:VertxRestDispatcher->Invocation(VertxRestInvocation)->通过next方法不断遍历Handler,最后一个Handler是ProducerOperationHandler
业务执行线程创建
默认通过Spring xml初始化servicecomb.executor.groupThreadPool
ExecutorManager通过BeanUtils获取
OperationMeta 获取执行业务线程
OperationMeta executor初始化:
this.executor = schemaMeta.getMicroserviceMeta().getScbEngine().getExecutorManager().findExecutor(this);