一、创建event类 Order

public class Order {

    private String id;
private String name;
private double price; public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public double getPrice() {
return price;
} public void setPrice(double price) {
this.price = price;
}
}

二、创建消费者类 Consumer

import com.lmax.disruptor.WorkHandler;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; public class Consumer implements WorkHandler<Order> { private String consumerId; private static AtomicInteger count = new AtomicInteger(0); private Random random = new Random(); public Consumer(String consumerId) {
this.consumerId = consumerId;
} @Override
public void onEvent(Order event) throws Exception {
Thread.sleep(1 * random.nextInt(5));
System.out.println("当前消费者:" + this.consumerId + ",消费信息ID:"+event.getId());
count.incrementAndGet();
} public int getCount() {
return count.get();
}
}

三、创建生产者类 Producer

import com.lmax.disruptor.RingBuffer;

public class Producer {

    private RingBuffer<Order> ringBuffer;

    public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
} public void sendData(String data) {
long sequnce = ringBuffer.next(); try {
Order order = ringBuffer.get(sequnce);
order.setId(data);
} finally {
ringBuffer.publish(sequnce);
}
}

四、创建测试类

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; public class TestMain {
public static void main(String[] args) throws Exception{ //1 创建ringbuffer
RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,
new EventFactory<Order>() {
@Override
public Order newInstance() {
return new Order();
}
},
1024 * 1024,
new YieldingWaitStrategy()); //2 通过ringbuffer 创建一个屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //3 创建多个消费者
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer("C" + i);
} //4 构建多消费者工作池
WorkerPool<Order> workerPool = new WorkerPool<Order>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
consumers); //5 设置多个消费者的sequence 序号用于单独统计消费进度,并且设置到ringbuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); //6 启动workPool
workerPool.start(Executors.newFixedThreadPool(10)); //设置异步生产 100个生产者
CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) {
Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} for (int j = 0; j < 100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}
}).start();
} Thread.sleep(2000);
System.out.println("--------线程创建完毕,开始生产数据----------");
latch.countDown(); Thread.sleep(10000);
System.out.println("消费者处理的任务总数:" + consumers[0].getCount());
} //创建exception类
static class EventExceptionHandler implements ExceptionHandler<Order> { @Override
public void handleEventException(Throwable ex, long sequence, Order event) { } @Override
public void handleOnStartException(Throwable ex) { } @Override
public void handleOnShutdownException(Throwable ex) { }
}
}
05-15 06:05