做犯法任务的网站西安东郊做网站
做犯法任务的网站,西安东郊做网站,百度账号中心,软件app开发公司业务员目录
Disruptor简介
Disruptor的设计方案
RingBuffer数据结构
一个生产者单线程写数据的流程
多个生产者写数据的流程
消费者读数据
多个生产者写数据
Disruptor核心概念
Disruptor的使用
单生产者单消费者模式
单生产者多消费者模式
多生产者多消费者模式
消费者…目录
Disruptor简介
Disruptor的设计方案
RingBuffer数据结构
一个生产者单线程写数据的流程
多个生产者写数据的流程
消费者读数据
多个生产者写数据
Disruptor核心概念
Disruptor的使用
单生产者单消费者模式
单生产者多消费者模式
多生产者多消费者模式
消费者优先级模式 Disruptor简介 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列研发的初衷是解决内存队列的延迟问题在性能测试中发现竟然与I/O操作处于同样的数量级。基于Disruptor开发的系统单线程能支撑每秒600万订单2010年在QCon演讲后获得了业界关注。2011年企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。注意这里所说的队列是系统内部的内存队列而不是Kafka这样的分布式队列。
GithubGitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
Disruptor实现了队列的功能并且是一个有界队列可以用于生产者-消费者模型。
juc下队列存在的问题 队列 描述 ArrayBlockingQueue 基于数组结构实现的一个有界阻塞队列 LinkedBlockingQueue 基于链表结构实现的一个无界阻塞队列指定容量为有界阻塞队列 PriorityBlockingQueue 支持按优先级排序的无界阻塞队列 DelayQueue 基于优先级队列PriorityBlockingQueue实现的无界阻塞队列 SynchronousQueue 不存储元素的阻塞队列 LinkedTransferQueue 基于链表结构实现的一个无界阻塞队列 LinkedBlockingDeque 基于链表结构实现的一个双端阻塞队列
1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中为了防止生产者速度过快导致内存溢出只能选择有界队列。
2. 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起等待其他线程释放锁而唤醒这个过程存在很大的开销而且存在死锁的隐患。
3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。
Disruptor的设计方案
Disruptor通过以下设计来解决队列速度慢的问题
环形数组结构
为了避免垃圾回收采用数组而非链表。同时数组对处理器的缓存机制更加友好空间局部性原理。
元素位置定位
数组长度2^n通过位运算加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型即使100万QPS的处理速度也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程会先申请可以操作的元素在数组中的位置申请到之后直接在该位置写入或者读取数据。
利用缓存行填充解决了伪共享的问题实现了基于事件驱动的生产者消费者模型观察者模式
消费者时刻关注着队列里有没有消息一旦有新消息产生消费者线程就会立刻把它消费
RingBuffer数据结构
使用RingBuffer来作为队列的数据结构RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence)用以指向下一个可用的元素供生产者与消费者使用。原理图如下所示 Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下存与取数组上的元素时间复杂度只有O(1)而这个index我们可以通过序列号与数组的长度取模来计算得出indexsequence % entries.length。也可以用位运算来计算效率更高此时array.length必须是2的幂次方indexsequece(entries.length-1)当所有位置都放满了再放下一个时就会把0号位置覆盖掉
思考能覆盖数据是否会导致数据丢失呢
当需要覆盖数据时会执行一个策略Disruptor给提供多种策略比较常用的
BlockingWaitStrategy策略常见且默认的等待策略当这个队列里满了不执行覆盖而是阻塞等待。使用ReentrantLockCondition实现阻塞最节省cpu但高并发场景下性能最差。适合CPU资源紧缺吞吐量和延迟并不重要的场景SleepingWaitStrategy策略会在循环中不断等待数据。先进行自旋等待如果不成功则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志。YieldingWaitStrategy策略这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统并且对延时比较有严格的要求可以考虑这种策略。BusySpinWaitStrategy策略: 采用死循环消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用
一个生产者单线程写数据的流程
申请写入m个元素若是有m个元素可以写入则返回最大的序列号。这里主要判断是否会覆盖未读的元素若是返回的正确则生产者开始写入元素。 多个生产者写数据的流程
多个生产者的情况下会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候通过CAS判断一下这段空间是否已经分配出去即可。
但是会遇到一个新问题如何防止读取的时候读到还未写的元素。Disruptor在多个生产者的情况下引入了一个与Ring Buffer大小相同的bufferavailable Buffer。当某个位置写入成功的时候便把availble Buffer相应的位置置位标记为写入成功。读取的时候会遍历available Buffer来判断元素是否已经就绪。
消费者读数据
生产者多线程写入的情况下读数据会复杂很多
申请读取到序号n若writer cursor n这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer一直查到第一个不可用的元素然后返回最大连续可读元素的位置消费者读取元素。
如下图所示读线程读到下标为2的元素三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素判断writer cursor11。然后开始读取availableBuffer从3开始往后读取发现下标为7的元素没有生产成功于是WaitFor(11)返回6。然后消费者读取下标从3到6共计4个元素。 多个生产者写数据
多个生产者写入的时候
申请写入m个元素若是有m个元素可以写入则返回最大的序列号。每个生产者会被分配一段独享的空间生产者写入元素写入元素的同时设置available Buffer里面相应的位置以标记自己哪些位置是已经写入成功的。
如下图所示Writer1和Writer2两个线程写入数组都申请可写的数组空间。Writer1被分配了下标3到下表5的空间Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素同时把available Buffer相应位置置位标记已经写入成功往后移一位开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。 Disruptor核心概念
RingBuffer环形缓冲区基于数组的内存级别缓存是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。Disruptor总体执行入口对RingBuffer的封装持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。Sequence序号分配器对RingBuffer中的元素进行序号标记通过顺序递增的方式来管理进行交换的数据(事件/Event)一个Sequence可以跟踪标识某个事件的处理进度同时还能消除伪共享。Sequencer数据传输器Sequencer里面包含了Sequence是Disruptor的核心Sequencer有两个实现类SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现)Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法SequenceBarrier消费者屏障用于控制RingBuffer的Producer和Consumer之间的平衡关系并且决定了Consumer是否还有可处理的事件的逻辑。WaitStrategy消费者等待策略决定了消费者如何等待生产者将Event生产进DisruptorWaitStrategy有多种实现策略Event从生产者到消费者过程中所处理的数据单元Event由使用者自定义。
EventHandler由用户自定义实现就是我们写消费者逻辑的地方代表了Disruptor中的一个消费者的接口。EventProcessor这是个事件处理器接口实现了Runnable处理主要事件循环处理Event拥有消费者的Sequence Disruptor构造器
public Disruptor(final EventFactoryT eventFactory,final int ringBufferSize,final ThreadFactory threadFactory,final ProducerType producerType,...)
EventFactory创建事件任务的工厂类。ringBufferSize容器的长度。ThreadFactory 用于创建执行任务的线程。ProductType生产者类型单生产者、多生产者。WaitStrategy等待策略。
Disruptor的使用
引入依赖
!-- disruptor --
dependencygroupIdcom.lmax/groupIdartifactIddisruptor/artifactIdversion3.3.4/version
/dependency
单生产者单消费者模式
1.创建Event(消息载体/事件)和EventFactory事件工厂
创建 OrderEvent 类这个类将会被放入环形队列中作为消息内容。创建OrderEventFactory类用于创建OrderEvent事件
Data
public class OrderEvent {private long value;private String name;
}public class OrderEventFactory implements EventFactoryOrderEvent {Overridepublic OrderEvent newInstance() {return new OrderEvent();}
}
2. 创建消息事件生产者
创建 OrderEventProducer 类它将作为生产者使用
public class OrderEventProducer {//事件队列private RingBufferOrderEvent ringBuffer;public OrderEventProducer(RingBufferOrderEvent ringBuffer) {this.ringBuffer ringBuffer;}public void onData(long value,String name) {// 获取事件队列 的下一个槽long sequence ringBuffer.next();try {//获取消息事件OrderEvent orderEvent ringBuffer.get(sequence);// 写入消息数据orderEvent.setValue(value);orderEvent.setName(name);} catch (Exception e) {// TODO 异常处理e.printStackTrace();} finally {System.out.println(生产者发送数据value:value,name:name);//发布事件ringBuffer.publish(sequence);}}
3.创建消费者
创建 OrderEventHandler 类并实现 EventHandler 作为消费者。
public class OrderEventHandler implements EventHandlerOrderEvent {Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println(消费者获取数据value: event.getValue(),name:event.getName());}
4. 测试
public class DisruptorDemo {public static void main(String[] args) throws Exception {//创建disruptorDisruptorOrderEvent disruptor new Disruptor(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE, //单生产者new YieldingWaitStrategy() //等待策略);//设置消费者用于处理RingBuffer的事件disruptor.handleEventsWith(new OrderEventHandler());disruptor.start();//创建ringbuffer容器RingBufferOrderEvent ringBuffer disruptor.getRingBuffer();//创建生产者OrderEventProducer eventProducer new OrderEventProducer(ringBuffer);//发送消息for(int i0;i100;i){eventProducer.onData(i,Foxi);}disruptor.shutdown();}
单生产者多消费者模式
如果消费者是多个只需要在调用 handleEventsWith 方法时将多个消费者传递进去。
- disruptor.handleEventsWith(new OrderEventHandler());
上面传入的两个消费者会重复消费每一条消息如果想实现一条消息在有多个消费者的情况下只会被一个消费者消费那么需要调用 handleEventsWithWorkerPool 方法。
- disruptor.handleEventsWith(new OrderEventHandler());
注意消费者要实现WorkHandler接口
public class OrderEventHandler implements EventHandlerOrderEvent, WorkHandlerOrderEvent {Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println(消费者 Thread.currentThread().getName()获取数据value: event.getValue(),name:event.getName());}Overridepublic void onEvent(OrderEvent event) throws Exception {// TODO 消费逻辑System.out.println(消费者 Thread.currentThread().getName()获取数据value: event.getValue(),name:event.getName());}
多生产者多消费者模式
在实际开发中多个生产者发送消息多个消费者处理消息才是常态。
public class DisruptorDemo2 {public static void main(String[] args) throws Exception {//创建disruptorDisruptorOrderEvent disruptor new Disruptor(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI, //多生产者new YieldingWaitStrategy() //等待策略);//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费者,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBufferOrderEvent ringBuffer disruptor.getRingBuffer();new Thread(()-{//创建生产者OrderEventProducer eventProducer new OrderEventProducer(ringBuffer);// 发送消息for(int i0;i100;i){eventProducer.onData(i,Foxi);}},producer1).start();new Thread(()-{//创建生产者OrderEventProducer eventProducer new OrderEventProducer(ringBuffer);// 发送消息for(int i0;i100;i){eventProducer.onData(i,monkeyi);}},producer2).start();//disruptor.shutdown();}
消费者优先级模式
在实际场景中我们通常会因为业务逻辑而形成一条消费链。比如一个消息必须由 消费者A - 消费者B - 消费者C 的顺序依次进行消费。在配置消费者时可以通过 .then 方法去实现顺序消费。
disruptor.handleEventsWith(new OrderEventHandler()).then(new OrderEventHandler())
handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then 的它们可以结合使用。比如可以按照 消费者A - (消费者B 消费者C) - 消费者D 的消费顺序
disruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler()).then(new OrderEventHandler());
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/89914.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!