Future 示例-LMLPHP

public  static Object send(RequestClient request)
future.channel().writeAndFlush(JSONObject.toJSONString(request));
future.channel().writeAndFlush("\r\n");
DefaultFuture defaultFuture = new DefaultFuture(request);//请求未来的响应
Response response = defaultFuture.get(10);//阻塞获取响应
return response;
return null;
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
Response res = JSONObject.parseObject(msg.toString(), Response.class);
DefaultFuture.recive(res);//拿到结果,通知阻塞解除
}
}
public class DefaultFuture {

    private long id;
public final static Map<Long,DefaultFuture> FUTURES= new ConcurrentHashMap<Long,DefaultFuture>();
private long timeout;
private final long start=System.currentTimeMillis(); //get方法和recive方法是不同线程调用的同一个对象,要volatile。
private volatile Response response;
private volatile Lock lock = new ReentrantLock();
private volatile Condition condition = lock.newCondition(); public DefaultFuture(){} public DefaultFuture(RequestClient request){
id=request.getId();//通过id进行异步判断,
FUTURES.put(id, this);//请求的id和响应对应。
} public Response get(){
lock.lock();
while(!hasDone()){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
return response;
} //超时是防止服务器卡死了。
public Response get(long timeout){
long start = System.currentTimeMillis();
lock.lock();
while(!hasDone()){
try {//condition是依赖ReentrantLock
//此时当前线程释放lock锁,进入[等待状态],等待其他线程执行aCondition.signal()时才有可能执行
condition.await(timeout, TimeUnit.SECONDS);
if(System.currentTimeMillis()-start>=timeout){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
return response;
} //收到服务器响应
public static void recive(Response res){
//找到res相对应的DefaultFuture
DefaultFuture future = FUTURES.remove(res.getId());
if(future==null){
return ;
}
Lock lock= future.getLock();
lock.lock();
try{
future.setResponse(res);
Condition condition = future.getCondition();
if(condition!=null){
condition.signal();
} }catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
} //开一条线程处理超时
static class FutureTimeOutThread extends Thread{
@Override
public void run() {
while(true){
for(long futureId : FUTURES.keySet()){
DefaultFuture f = FUTURES.get(futureId);
if(f==null){//等于空直接移除
FUTURES.remove(futureId);
continue;
}
if(f.getTimeout()>0){//不等于空判断是否超时
if((System.currentTimeMillis()-f.getStart())>f.getTimeout()){
Response res = new Response();
res.setContent(null);
res.setMsg("请求超时!");
res.setStatus(1);//响应异常处理
res.setId(f.getId());
DefaultFuture.recive(res);
}
}
}
}
}
} static{
FutureTimeOutThread timeOutThread = new FutureTimeOutThread();
timeOutThread.setDaemon(true);//守护线程,主线程在就在,主线程挂掉就挂掉,
timeOutThread.start();
} private boolean hasDone() {return response !=null? true:false;}
public long getId() {return id;}
public Response getResponse() {return response;}
public void setResponse(Response response) {this.response = response;}
public Lock getLock() {return lock;}
public void setLock(Lock lock) {this.lock = lock;}
public Condition getCondition() {return condition;}
public void setCondition(Condition condition) {this.condition = condition;}
public void setId(long id) {this.id = id;}
public long getTimeout() {return timeout;}
public void setTimeout(long timeout) {this.timeout = timeout;}
public long getStart() {return start;}
}
05-11 22:03