一、创建event类 Order
public class Order {
private String id;
private String name;
private double price;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
二、创建消费者类 Consumer
import com.lmax.disruptor.WorkHandler;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class Consumer implements WorkHandler<Order> {
private String consumerId;
private static AtomicInteger count = new AtomicInteger(0);
private Random random = new Random();
public Consumer(String consumerId) {
this.consumerId = consumerId;
}
@Override
public void onEvent(Order event) throws Exception {
Thread.sleep(1 * random.nextInt(5));
System.out.println("当前消费者:" + this.consumerId + ",消费信息ID:"+event.getId());
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
三、创建生产者类 Producer
import com.lmax.disruptor.RingBuffer;
public class Producer {
private RingBuffer<Order> ringBuffer;
public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(String data) {
long sequnce = ringBuffer.next();
try {
Order order = ringBuffer.get(sequnce);
order.setId(data);
} finally {
ringBuffer.publish(sequnce);
}
}
四、创建测试类
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
public class TestMain {
public static void main(String[] args) throws Exception{
//1 创建ringbuffer
RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,
new EventFactory<Order>() {
@Override
public Order newInstance() {
return new Order();
}
},
1024 * 1024,
new YieldingWaitStrategy());
//2 通过ringbuffer 创建一个屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//3 创建多个消费者
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer("C" + i);
}
//4 构建多消费者工作池
WorkerPool<Order> workerPool = new WorkerPool<Order>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
consumers);
//5 设置多个消费者的sequence 序号用于单独统计消费进度,并且设置到ringbuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//6 启动workPool
workerPool.start(Executors.newFixedThreadPool(10));
//设置异步生产 100个生产者
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}
}).start();
}
Thread.sleep(2000);
System.out.println("--------线程创建完毕,开始生产数据----------");
latch.countDown();
Thread.sleep(10000);
System.out.println("消费者处理的任务总数:" + consumers[0].getCount());
}
//创建exception类
static class EventExceptionHandler implements ExceptionHandler<Order> {
@Override
public void handleEventException(Throwable ex, long sequence, Order event) {
}
@Override
public void handleOnStartException(Throwable ex) {
}
@Override
public void handleOnShutdownException(Throwable ex) {
}
}
}