生产者

package cn.lonecloud.procum.disruptor;

import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; /**
* @author lonecloud
* @version v1.0
* @date 下午3:02 2018/5/7
*/
public class Producer { //队列
private final RingBuffer<Data> dataRingBuffer; public Producer(RingBuffer<Data> dataRingBuffer) {
this.dataRingBuffer = dataRingBuffer;
} /**
* 插入数据
* @param s
*/
public void pushData(String s) { //获取下一个位置
long next = dataRingBuffer.next();
try {
//获取容器
Data data = dataRingBuffer.get(next);
//设置数据
data.setData(s);
} finally {
//插入
dataRingBuffer.publish(next);
}
}
}

  消费者

package cn.lonecloud.procum.disruptor;

import cn.lonecloud.procum.Data;
import com.lmax.disruptor.WorkHandler; /**
* @author lonecloud
* @version v1.0
* @date 下午3:01 2018/5/7
*/
public class Customer implements WorkHandler<Data> {
@Override
public void onEvent(Data data) throws Exception {
System.out.println(Thread.currentThread().getName()+"---"+data.getData());
}
}

  数据工厂

package cn.lonecloud.procum.disruptor;

import cn.lonecloud.procum.Data;
import com.lmax.disruptor.EventFactory; /**
* @author lonecloud
* @version v1.0
* @date 下午3:02 2018/5/7
*/
public class DataFactory implements EventFactory<Data> { @Override
public Data newInstance() {
return new Data();
}
}

  主函数

package cn.lonecloud.procum.disruptor;

import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; /**
* @author lonecloud
* @version v1.0
* @date 下午3:09 2018/5/7
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
//创建线程池
ExecutorService service = Executors.newCachedThreadPool();
//创建数据工厂
DataFactory dataFactory = new DataFactory();
//设置缓冲区大小,必须为2的指数,否则会有异常
int buffersize = 1024;
Disruptor<Data> dataDisruptor = new Disruptor<Data>(dataFactory, buffersize,
service);
//创建消费者线程
dataDisruptor.handleEventsWithWorkerPool(
new Customer(),
new Customer(),
new Customer(),
new Customer(),
new Customer(),
new Customer(),
new Customer()
);
//启动
dataDisruptor.start();
//获取其队列
RingBuffer<Data> ringBuffer = dataDisruptor.getRingBuffer();
for (int i = 0; i < 100; i++) {
//创建生产者
Producer producer = new Producer(ringBuffer);
//设置内容
producer.pushData(UUID.randomUUID().toString());
//Thread.sleep(1000);
}
}
}

  其中策略有几种:

1. BlockingWaitStrategy:阻塞策略,最节省CPU,但是高并发条件下性能最糟糕

2 SleepingWaitStrategy:在循环中无限等待,处理数据会产生高延迟,对生产线程影响小,场景:异步日志

3. YieldingWaitStrategy:低延迟场合,使用必须保证剩余的消费者线程的逻辑CPU

4. BusySpinWaitStrategy:消费者线程会尽最大努力疯狂的监控缓冲区变化。

04-19 17:18
查看更多