从(四)server代码跟进

public static void simple(MultiplicationService.Processor processor) {
try {
TServerTransport serverTransport = new TServerSocket(9090);
TServer server = new TSimpleServer(new Args(serverTransport).processor(processor)); System.out.println("Starting the simple server...");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}

跟进server.serve()

public void serve() {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
} // Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
} setServing(true); while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
if(!processor.process(inputProtocol, outputProtocol)) {
break;
}
}
}
} catch (TTransportException ttx) {
// Client died, just move on
} catch (TException tx) {
if (!stopped_) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
}
} catch (Exception x) {
if (!stopped_) {
LOGGER.error("Error occurred during processing of message.", x);
}
} if (eventHandler_ != null) {
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
} if (inputTransport != null) {
inputTransport.close();
} if (outputTransport != null) {
outputTransport.close();
} }
setServing(false);
}

跟进accept方法

protected TSocket acceptImpl() throws TTransportException {
if (serverSocket_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
}
try {
Socket result = serverSocket_.accept();
TSocket result2 = new TSocket(result);
result2.setTimeout(clientTimeout_);
return result2;
} catch (IOException iox) {
throw new TTransportException(iox);
}
}

由于(四)使用的是阻塞IO, 代码也可以看到阻塞直到有客户端连接

跟进process()方法

public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = processMap.get(msg.name);
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
}
fn.process(msg.seqid, in, out, iface);
return true;
}

整体流程,readMessageBegin读客户端请求方法。请求方法在服务端没有注册,返回异常给客户端。如果有方法,对应方法处理。跟进process方法

public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
T args = getEmptyArgsInstance();
try {
args.read(iprot);
} catch (TProtocolException e) {
iprot.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
}
iprot.readMessageEnd();
TSerializable result = null;
byte msgType = TMessageType.REPLY; try {
result = getResult(iface, args);
} catch (TTransportException ex) {
LOGGER.error("Transport error while processing " + getMethodName(), ex);
throw ex;
} catch (TApplicationException ex) {
LOGGER.error("Internal application error processing " + getMethodName(), ex);
result = ex;
msgType = TMessageType.EXCEPTION;
} catch (Exception ex) {
LOGGER.error("Internal error processing " + getMethodName(), ex);
if(!isOneway()) {
result = new TApplicationException(TApplicationException.INTERNAL_ERROR,
"Internal error processing " + getMethodName());
msgType = TMessageType.EXCEPTION;
}
} if(!isOneway()) {
oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
}

跟进读取参数方法

public void read(org.apache.thrift.protocol.TProtocol iprot, multiply_args struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // N1
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
struct.n1 = iprot.readI32();
struct.setN1IsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // N2
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
struct.n2 = iprot.readI32();
struct.setN2IsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd(); // check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}

服务端跟进处理

result = getResult(iface, args);

继续跟进

public multiply_result getResult(I iface, multiply_args args) throws org.apache.thrift.TException {
multiply_result result = new multiply_result();
result.success = iface.multiply(args.n1, args.n2);
result.setSuccessIsSet(true);
return result;
}

这里拿到结果, 最后写回给客户端

if(!isOneway()) {
oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
}

这是最简单的一种方式,使用阻塞io, 二进制协议序列化。还有分阻塞, 压缩协议等。

05-11 21:58