高性能内存队列Disruptor初探

这段时间在学习Disruptor的使用,由于其使用略微复杂,所以记录一下防止忘记:

一、定义数据结构

这个数据可以是任何你需要用到的数据,作为一个类封装起来:

1
2
3
4
5
@Data
public class OrderEvent {
private Long id;
private String name;
}

二、定义工厂类

工厂类负责提供事件对象:

1
2
3
4
5
6
import com.lmax.disruptor.EventFactory;
public class OrderEventFactory implements EventFactory<OrderEvent> {
public OrderEvent newInstance() {
return new OrderEvent();
}
}

三、定义消息Handler

消息handler负责处理从队列中弹出的消息。

1
2
3
4
5
6
import com.lmax.disruptor.EventHandler;
public class OrderEventHandler implements EventHandler<OrderEvent> {
public void onEvent(OrderEvent orderEvent, long sequence, boolean endOfBatch) throws Exception {
System.out.printf("Handler : %d, %s, sequence: %d, endOfBatch: %b \n", orderEvent.getId(), orderEvent.getName(), sequence, endOfBatch);
}
}

四、创建disruptor对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
OrderEventFactory orderEventFactory = new OrderEventFactory(); // 工厂对象
int ringBufferSize = 1024 * 1024; // disruptor环大小
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂,使用默认
BlockingWaitStrategy blockingWaitStrategy = new BlockingWaitStrategy(); // 等待策略
Disruptor<OrderEvent> orderEventDisruptor = new Disruptor<OrderEvent>(
orderEventFactory,
ringBufferSize,
threadFactory,
ProducerType.SINGLE,
blockingWaitStrategy
);
OrderEventHandler orderEventHandler = new OrderEventHandler(); // 消息处理handler对象
orderEventDisruptor.handleEventsWith(orderEventHandler); // 绑定消息处理handler
orderEventDisruptor.start(); // 启动消息处理

五、发送消息

1
2
3
4
5
6
7
8
9
10
11
12
RingBuffer<OrderEvent> ringBuffer = orderEventDisruptor.getRingBuffer(); // 获取环
for (long i = 0; i < 100L; i++) {
long sequence = ringBuffer.next(); // 得到坐标sequence
try {
OrderEvent orderEvent = ringBuffer.get(sequence); // 获得消息对象
orderEvent.setId(i); // 填充消息对象参数
orderEvent.setName("Event:" + i); // 填充消息对象参数
System.out.printf("publish:%d\n", i);
} finally {
ringBuffer.publish(sequence); // 发布消息
}
}

六、总结

通过以上几步即可发送消息,disruptor是一个性能非常高的内存队列,值得深入研究。