一 概述

1.Disruptor

Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型。

2.RingBuffer

RingBuffer是一种环形数据结构,包含一个指向下一个槽点的序号,可以在线程间传递数据。

3.Event

在Disruptor框架中,生产者生产的数据叫做Event。

二 Disruptor框架基本构成

1.MyEvent:自定义对象,充当“生产者-消费者”模型中的数据。
2.MyEventFactory:实现EventFactory的接口,用于生产数据。
3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布。
4.MyEventHandler:自定义消费者。

三 Demo

初次接触Disruptor,认识停留在表面,零散,模糊,在此记一个简单的示例,以便日后深入研究。

1.自定义数据类

package com.disruptor.basic;

public class LongEvent {
private long value; public long getValue() {
return value;
} public void setValue(long value) {
this.value = value;
} }

2.数据生产工厂(创建数据类对象)

package com.disruptor.basic;

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
// TODO Auto-generated method stub
return new LongEvent();
} }

3.数据源(初始化数据对象并发布)

package com.disruptor.basic;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer; public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
} private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
/**
* event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
* bb:包含有将要被存储到目标对象中的数据的容器
*/
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
// TODO Auto-generated method stub
event.setValue(bb.getLong(0));// 将数据存储到目标对象中
}
}; public void onData(ByteBuffer bb) {
ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者
} }

4.消费者

package com.disruptor.basic;

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO Auto-generated method stub
System.out.println("当前消费的数据="+event.getValue());
} }

5.测试类

package com.disruptor.basic;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.junit.Test; import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; public class LongEventTest { @SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void test01() throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
EventFactory<LongEvent> factory = new LongEventFactory();
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
disruptor.handleEventsWith(new LongEventHandler());
disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// LongEventProducer producer = new
// LongEventProducer(ringBuffer);
LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
// long startTime = System.currentTimeMillis();
for (long a = 0; a < 100; a++) {
bb.putLong(0, a);
producer.onData(bb);
/*if (a == 99) {
long endTime = System.currentTimeMillis();
System.out.println("useTime=" + (endTime - startTime));
}*/
Thread.sleep(100);
}
/*long endTime = System.currentTimeMillis();
System.out.println("useTime=" + (endTime - startTime));*/
disruptor.shutdown();
executor.shutdown();
} /*@Test
public void test02() {
long startTime = System.currentTimeMillis();
for (long a = 0; a < 100; a++) {
System.out.println(a);
}
long endTime = System.currentTimeMillis();
System.out.println("useTime=" + (endTime - startTime));
}*/ }
05-13 14:26