Producer线程模型分析

线程传递过程:网络eventloop线程接收请求->传给group线程处理->group处理完后->返还给网络线程去响应

eventloop线程找业务线程:通过查找operationMeta获取,operationMeta是存储处理业务逻辑元数据信息,代码AbstractRestInvocation中体现这一过程


    try {
      operationMeta.getExecutor().execute(() -> {
        synchronized (this.requestEx) {
          try {
            if (isInQueueTimeout()) {
              throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
            }
            if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
              // already timeout
              // in this time, request maybe recycled and reused by web container, do not use requestEx
              LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
                  operationMeta.getHttpMethod(),
                  operationMeta.getMicroserviceQualifiedName());
              return;
            }

            runOnExecutor();
          } catch (InvocationException e) {
            LOGGER.error("Invocation failed, cause={}", e.getMessage());
            sendFailResponse(e);
          } catch (Throwable e) {
            LOGGER.error("Processing rest server request error", e);
            sendFailResponse(e);
          }
        }
      });
    } catch (Throwable e) {
      LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
      sendFailResponse(e);
    }

业务线程找eventloop线程:eventloop网络线程在放到responseEx上下文中,这个在VertxRestDispatcher可以看到responseEx预置了网络线程

    HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());

//vertx context 设置
  public VertxServerResponseToHttpServletResponse(HttpServerResponse serverResponse) {
    this.context = Vertx.currentContext();
    this.serverResponse = serverResponse;

    Objects.requireNonNull(context, "must run in vertx context.");
  }

线程切换过程详细

1.接收请求的vertx eventloop线程

2.从vertx eventloop线程转入业务线程处理

3.进入ProducerOperationHandler的handler,仍然是业务线程group

4.进入ProducerOperationHandler的syncInvoke的,最终进入sendResponseQuietly,线程仍然是group

// ProducerOperationHandler的asyncResp
  public void syncInvoke(Invocation invocation, SwaggerProducerOperation producerOperation, AsyncResponse asyncResp) {
....
    asyncResp.handle(response);
  }

//上述的asyncResp正是AbstractRestInvocation传进去的sendResponseQuietly

  protected void doInvoke() throws Throwable {
    invocation.onStartHandlersRequest();
    invocation.next(resp -> sendResponseQuietly(resp));
  }


5.响应 group线程,通过驱动responseEx去响应,把响应交由eventloop线程去处理,eventloop线程在responseEx的context,即context.runOnContext(V -> internalFlushBuffer());

// responseEx
  protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) {
...
      responseEx.flushBuffer();
...
}

// 进入VertxServerResponseToHttpServletResponse的flushBuffer
  public void flushBuffer() {
    if (context == Vertx.currentContext()) {
      internalFlushBuffer();
      return;
    }
    context.runOnContext(V -> internalFlushBuffer());
  }

//通过serverResponse.end(bodyBuffer)发送响应,vertx框架做的事情

  public void internalFlushBuffer() {
...
    serverResponse.end(bodyBuffer);
  }

通过以上分析可知,Producer对client的请求处理链:VertxRestDispatcher->Invocation(VertxRestInvocation)->通过next方法不断遍历Handler,最后一个Handler是ProducerOperationHandler

业务执行线程创建

默认通过Spring xml初始化servicecomb.executor.groupThreadPool

ExecutorManager通过BeanUtils获取

OperationMeta 获取执行业务线程

OperationMeta executor初始化:
this.executor = schemaMeta.getMicroserviceMeta().getScbEngine().getExecutorManager().findExecutor(this);

03-05 23:39