kafka 由于它自身的高性能发送与消费能力,而受到广大企业的喜欢,所以我们就先看看kafka 一些源码实现如下:  

  public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));// 异步发送
} else {
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();// 同步发送
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}

  这段代码摘抄的是,kafka源码 生产者发送消息demo(kafka.examples.Producer) 里面的一个片段,主要是涉及到两个知识点,一个是异步发送消息,

回调函数的实现,另一个就是同步发送,多线程Future.get 模式的实现。现在分别阐述这两种实现方式。

  异步回调方式

  其实这种方式主要应用在调用多线程执行某个任务时,不用傻傻等到该线程完成后得到相应的反馈信息。举个例子Client端需要调用Server端来执行某个任务,并且希望Server端执行完成后

主动将相应的结果告诉Client端。这个过程就叫做回调了。如下代码:

  

 public class Client implements CSCallBack {

     private volatile boolean stopThread = false;
private Server server; public Client(Server server) {
this.server = server;
} public void sendMsg(final String msg){
System.out.println("ThreadName="+Thread.currentThread().getName()+" 客户端:发送的消息为:" + msg);
new Thread(new Runnable() {
@Override
public void run() {
server.getClientMsg(Client.this,msg);// 核心代码1:将被调用方自己当作参数(client)传递到调用方(Server) while(!stopThread) {// 模拟等待另服务器端代码完成
System.out.println("ThreadName="+Thread.currentThread().getName()+"客户端:模拟等待回调完成"); try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
System.out.println("ThreadName="+Thread.currentThread().getName()+" 客户端:异步发送成功");
} @Override
public void process(String status) {
stopThread = true;
System.out.println("ThreadName="+Thread.currentThread().getName()+" 客户端:收到服务端回调状态为:" + status);
}
}
 public class Server {

     public void getClientMsg(CSCallBack csCallBack , String msg) {

         // 模拟服务端需要对数据处理
try {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("ThreadName="+Thread.currentThread().getName()+" 服务端:服务端接收到客户端发送的消息为:" + msg);
while(true) {
int max=10,min=1;
int ranNum = (int) (Math.random()*(max-min)+min); if(ranNum >6) {// 当随机数大于5时认为任务完成
System.out.println("ThreadName="+Thread.currentThread().getName()+" 服务端:数据处理成功,返回成功状态 200");
String status = "200";
csCallBack.process(status);// 核心代码2:调用方(Server)任务处理完成相应的任务后,调用被调用方(Client)的方法告知任务完成
break;
} try {
Thread.sleep(80);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start(); } catch (Exception e) {
e.printStackTrace();
} }
}

其实核心代码就两个:

      client端:被调用方自己当作参数(client)传递到调用方(Server)。

      Server端:调用方(Server)任务处理完成相应的任务后,调用被调用方(Client)的方法告知任务完成。

   同步发送多线程 Future.get 模式实现 

        这种方式方式主要是用来等待某一项任务完成后,接着顺序执行某项任务。和上面的例子一样都是client 端 向server 端请求完成某项任务,并且期望server 端在完成任务后,返回结果

  实例代码如下:

  

 public class FutureDemo {

     protected RealData realdata = null;
protected boolean isReady = false;
public synchronized void requestData(RealData realdata) {// client请求server完成某项任务
if (isReady) {
return;
}
this.realdata = realdata;
isReady = true;
notifyAll();//核心代码2:当请求的任务处理完成时,唤醒等待中的线程
} public synchronized String getResult() {// client等待server完成任务后返回,此处就相当于 Future.get
while (!isReady) {
try {
wait();//核心代码1:发出请求后等待线程被激活
} catch (InterruptedException e) {
}
}
return realdata.result;
}
}

  核心实现代码其实就是多线程里面的,wait 和 notify 实现方式。异步回调 和 同步 Future get 模式最大的区别,举个例子吧,

老婆(client 端)很爱老公,老公(服务器端)每天完成加班很晚,老婆都会等到老公回家然后给他做夜宵(同步 Future get 模式)

          老婆(client 端)很爱老公,老公(服务器端)每天完成加班很晚,老婆觉得一直等太累了,就先睡觉,等老公回来后通知老婆(回调),然后老婆再给老公做夜宵(异步回调方式)。

所以大家都期望自己的老婆是, Future get 模式 还是 异步回调模式?

              

            

05-14 14:44