Exchanger并发辅助类,允许在并发任务之间交换数据。具体来说Exchanger类在两个线程之间定义同步点。当两个线程到达同步点时,它们交换数据结构。需要注意的是Exchanger类只能同步两个线程。

  内存一致性效果:对于通过Exchanger成功交换对象的每对线程,每个线程中在exchanger()之前的操作 happen-before从另一线程中相应的exchanger()返回的后续操作。

下面用一对一的生产者-消费者例子进行说明。

  生产者代码如下:

public class Producer implements Runnable {
private List<String> buffer;
private Exchanger<List<String>> exchanger;
public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
int cycle=1;
for (int i=0; i<10; i++) {
System.out.println("Producer:Cycle "+cycle);
for (int j = 1; j <=10; j++) {
String msg="Event "+(i*10+j);
System.out.println("Producer: "+msg);
buffer.add(msg);
}
try {
buffer=exchanger.exchange(buffer);//和消费者交换数据结构
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer size: "+buffer.size());
cycle++;
}
} }

  消费者代码:

public class Consumer implements Runnable {
private List<String> buffer;
private Exchanger<List<String>> exchanger;
public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
int cycle=1;
for (int i=0; i<=9; i++) {
System.out.println("Consumer: Cycle "+cycle);
try {
buffer=exchanger.exchange(buffer);//he生产者交换数据结构
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumer size: "+buffer.size());
for (int j = 1; j <=10; j++) {
String msg=buffer.get(0);
System.out.println("Consumer: "+msg);
buffer.remove(0);
}
cycle++;
}
}
}

  运行:

public class Core {
public static void main(String[] args) {
List<String> buffer1=new ArrayList<String>();
List<String> buffer2=new ArrayList<String>();
Exchanger<List<String>> exchanger=new Exchanger<List<String>>();
Producer producer=new Producer(buffer1, exchanger);
Consumer consumer=new Consumer(buffer2, exchanger);
Thread thread1=new Thread(producer);
Thread thread2=new Thread(consumer);
thread1.start();
thread2.start();
}
}
05-08 08:20