并发编程-CPU缓存架构详解 Disruptor的高性能设计方案

1.CPU缓存架构详解

1.1 CPU高速缓存概念

        CPU缓存即高速缓冲存储器,是位于CPU与主内存间的一种容量较小但速度很高的存储器。CPU高 速缓存可以分为一级缓存,二级缓存,部分高端CPU还具有三级缓存,每一级缓存中所储存的全部数 据都是下一级缓存的一部分,这三种缓存的技术难度和制造成本是相对递减的,所以其容量也是相对递增的。
        由于CPU的速度远高于主内存,CPU直接从内存中存取数据要等待一定时间周期,Cache中保存着CPU刚用过或循环使用的一部分数据,当CPU再次使用该部分数据时可从Cache中直接调用,减少CPU 的等待时间,提高了系统的效率。
在CPU访问存储设备时,无论是存取数据抑或存取指令,都趋于聚集在一片连续的区域中,这就
局部性原理
时间局部性( Temporal Locality :如果一个信息项正在被访问,那么在近期它很可能还会被再次访问。 比如循环、递归、方法的反复调用等。
空间局部性( Spatial Locality :如果一个存储器的位置被引用,那么将来他附近的位置也会被引用。 比如顺序执行的代码、连续创建的两个对象、数组等。

1.2 CPU多核缓存架构

        现代CPU为了提升执行效率,减少CPU与内存的交互,一般在CPU上集成了多级缓存架构,常见的为三级缓存结构。如下图:
CPU寄存器是位于CPU内部的存储器,数据的读写速度非常快,比缓存和主存更快。CPU会把常
用的数据放到寄存器中进行处理,而不是直接从主存中读取,这样可以加速数据的访问和处理。但
是寄存器的容量非常有限,一般只有几十个或者几百个字节,因此只能存储少量的数据。
        当CPU读取一个地址中的数据时,会先在 L1 Cache 中查找。如果数据在 L1 Cache 中找到,CPU 会直接从 L1 Cache 中读取数据。如果没有找到,则会将这个请求发送给 L2 Cache,然后在 L2 Cache 中查找,如果 L2 Cache 中也没有找到,则会继续将请求发送到 L3 Cache 中。如果在 L3 Cache 中还是没有找到数据,则最后会从主内存中读取数据并将其存储到 CPU 的缓存中。
        当CPU写入一个地址中的数据时,同样会先将数据写入 L1 Cache 中,然后再根据缓存一致性协议将数据写入 L2 Cache、L3 Cache 以及主内存中。具体写入过程与缓存一致性协议相关,有可能只写入 L1 Cache 中,也有可能需要将数据写入 L2 Cache、L3 Cache 以及主内存中。写入过程中也可能会使用缓存行失效、写回等技术来提高效率。
思考:这种缓存架构在多线程访问的时候存在什么问题?
CPU多核缓存架构缓存一致性问题分析
场景一

场景二

        在CPU多核缓存架构中,每个处理器都有一个单独的缓存,共享数据可能有多个副本:一个副本在主内存中,一个副本在请求它的每个处理器的本地缓存中。当数据的一个副本发生更改时,其他副本必须反映该更改。也就是说, CPU多核缓存架构要保证缓存一致性。

1.3 CPU缓存架构缓存一致性的解决方案

        32位的IA-32处理器支持对系统内存中的位置进行锁定的原子操作。这些操作通常用于管理共享的 数据结构(如信号量、段描述符、系统段或页表),在这些结构中,两个或多个处理器可能同时试图修改相同的字段或标志。处理器使用三种相互依赖的机制来执行锁定的原子操作:
1.有保证的原子操作。
        
        处理器提供一些特殊的指令或者机制,可以保证在多个处理器同时执行原子操作时,它们不会相互干扰,从而保 证原子性。这些指令或者机制的实现通常需要硬件支持。例如x86架构中提供了一系列的原子操作指令, 如XADD、XCHG、CMPXCHG等,可以保证在多个处理器同时执行这些指令时,它们不会相互干扰,从而保证原子性。
2.总线锁定,使用LOCK#信号和LOCK指令前缀。
        总线锁定是一种用于确保原子操作的机制,通常会在LOCK指令前缀和一些特殊指令中使用。在执行LOCK指令前缀时,处理器会将LOCK#信号拉低,这个信号会通知其他处理器当前总线上的数据已经被锁定,从而确保原子性。
3.缓存一致性协议,确保原子操作可以在缓存的数据结构上执行(缓存锁定);这种机制出现在Pentium 4、IntelXeon和P6系列处理器中。
缓存一致性协议是一种用于确保处理器缓存中的数据和主存中的数据一致的机制。 缓存一致性协议会 通过处理器之间的通信,确保在一个处理器修改了某个数据后,其他处理器缓存中的该数据会被更新 或者失效,从而保证在多个处理器同时对同一个数据进行操作时,它们所看到的数据始终是一致的。 缓存锁定则是在缓存一致性协议的基础上实现原子操作的机制 ,它会利用缓存一致性协议来确保在多 个处理器同时修改同一个缓存行中的数据时,只有一个处理器能够获得锁定,从而保证原子性。缓存锁定的实现也需要硬件的支持,而且不同的处理器架构可能会有不同的实现方式。
缓存一致性协议不能使用的特殊情况:
        当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行时 ,则处理器会调用总线锁定。 不能被缓存在处理器内部的数据通常指的是不可缓存的设备内存(如显存、网络接口卡的缓存 等),这些设备内存一般不受缓存一致性协议的管辖,处理器无法将其缓存到自己的缓存行中。
        有些处理器不支持缓存锁定。 早期的Pentium系列处理器并不支持缓存锁定机制。在这些处理器上,只能使用总 线锁定来实现原子操作。但现代的处理器通常都支持缓存锁定机制,因此应该尽量使用缓存锁定来实现原子操作,以获得更好的性能。

1.4 缓存一致性协议实现原理

总线窥探
        总线窥探(Bus snooping)是缓存中的一致性控制器(snoopy cache)监视或窥探总线事务的一种方 案,其目标是在分布式共享内存系统中维护缓存一致性。包含一致性控制器(snooper)的缓存称为 snoopy缓存。该方案由Ravishankar和Goodman于1983年提出。
在计算机中,数据通过总线在处理器和内存之间传递。每次处理器和内存之间的数据传递都是通过
一系列步骤来完成的,这一系列步骤称之为 总线事务(Bus Transaction)
工作原理
        当特定数据被多个缓存共享时,处理器修改了共享数据的值,更改必须传播到所有其他具有该数据 副本的缓存中。这种更改传播可以防止系统违反缓存一致性。 数据变更的通知可以通过总线窥探来完成。所有的窥探者都在监视总线上的每一个事务。如果一个修改共享缓存块的事务出现在总线上,所有的窥探者都会检查他们的缓存是否有共享块的相同副本。 如果缓存中有共享块的副本,则相应的窥探者执行一个动作以确保缓存一致性。 这个动作可以是刷新缓存块或使缓存块失效。它还涉及到缓存块状态的改变,这取决于缓存一致性协议(cache coherence protocol)。
窥探协议类型
根据管理写操作的本地副本的方式,有两种窥探协议:
写失效(Write-invalidate)
        当处理器写入一个共享缓存块时,其他缓存中的所有共享副本都会通过总线窥探失效。 这种方法确 保处理器只能读写一个数据的一个副本。其他缓存中的所有其他副本都无效。这是最常用的窥探协议。MSI、MESI、MOSI、MOESI和MESIF协议属于该类型。
写更新(Write-update)
        当处理器写入一个共享缓存块时,其他缓存的所有共享副本都会通过总线窥探更新。这个方法将写数据广播到总线上的所有缓存中。它比write-invalidate协议引起更大的总线流量。这就是为什么这种方法不常见。Dragon和firefly协议属于此类别。
缓存一致性协议
        缓存一致性协议在多处理器系统中应用于高速缓存一致性。为了保持一致性,人们设计了各种模型和协议,如MSI、MESI(又名Illinois)、MOSI、MOESI、MERSI、MESIF、write-once、Synapse、 Berkeley、Firefly和Dragon协议。
MESI协议
        MESI协议 是一个基于写失效的缓存一致性协议,是支持回写(write-back)缓存的最常用协议。 也称作伊利诺伊协议 (Illinois protocol,因为是在伊利诺伊大学厄巴纳-香槟分校被发明的)。
缓存行有4种不同的状态:
已修改Modified (M)
        缓存行是脏的(dirty ),与主存的值不同。如果别的CPU内核要读主存这块数据,该缓存行必须回写到主存,状态变为共享(S).
独占Exclusive (E)
        缓存行只在当前缓存中,但是干净的——缓存数据同于主存数据。当别的缓存读取它时,状态变为共享;当前写数据时,变为已修改状态。
共享Shared (S)
        缓存行也存在于其它缓存中且是未修改的。缓存行可以在任意时刻抛弃。
无效Invalid (I)
        缓存行是无效的
        MESI协议用于确保多个处理器之间共享的内存数据的一致性。 当一个处理器需要访问某个内存数 据时,它首先会检查自己的缓存中是否有该数据的副本。如果缓存中没有该数据的副本,则会发出一 个缓存不命中(miss)请求,从主内存中获取该数据的副本,并将该数据的副本存储到自己的缓存 中。
        当一个处理器发出一个缓存不命中请求时,如果该数据的副本已经存在于另一个处理器或核心的缓 存中(即处于共享状态),则该处理器可以从另一个处理器的缓存中复制该数据的副本。这个过程称 为缓存到缓存复制(cache-to-cache transfer)。
        缓存到缓存复制可以减少对主内存的访问,从而提高系统的性能。但是,需要确保数据的一致性, 否则会出现数据错误或不一致的情况。因此,在进行缓存到缓存复制时,需要使用MESI协议中的其他 状态转换来确保数据的一致性。例如,如果两个缓存都处于修改状态,那么必须先将其中一个缓存的 数据写回到主内存,然后才能进行缓存到缓存复制。
1.5 伪共享的问题
如果多个核上的线程在操作同一个缓存行中的不同变量数据,那么就会出现频繁的缓存失效,即使在 代码层面看这两个线程操作的数据之间完全没有关系。 这种不合理的资源竞争情况就是伪共享(False Sharing)。 ArrayBlockingQueue有三个成员变量:
         takeIndex:需要被取走的元素下标
        putIndex:可被元素插入的位置的下标
        count:队列中元素的数量
这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。
如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。
linux下查看Cache Line大小
Cache Line大小是64Byte

或者执行 cat /proc/cpuinfo 命令

避免伪共享方案
方案1:缓存行填充
class Pointer {
 volatile long x;
 //避免伪共享: 缓存行填充
 long p1, p2, p3, p4, p5, p6, p7;
 volatile long y;
 }
方案2:使用 @sun.misc.Contended 注解(java8)
注意需要配置jvm参数:-XX:-RestrictContended
public class FalseSharingTest {

    public static void main(String[] args) throws InterruptedException {
        testPointer(new Pointer());
    }

    private static void testPointer(Pointer pointer) throws InterruptedException {
        long start = System.currentTimeMillis();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000000; i++) {
                pointer.x++;
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100000000; i++) {
                pointer.y++;
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println(pointer.x + "," + pointer.y);

        System.out.println(System.currentTimeMillis() - start);


    }
}


class Pointer {
    // 避免伪共享: @Contended + jvm参数:-XX:-RestrictContended jdk8支持
    //@Contended
    volatile long x;
    //避免伪共享: 缓存行填充
    //long p1, p2, p3, p4, p5, p6, p7;
    volatile long y;
}









方案3: 使用线程的本地内存,比如ThreadLocal

2.高性能内存队列Disruptor详解

2.1 juc包下阻塞队列的缺陷

1) juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。 在稳定性要求特别高的系统中, 为了防止生产者速度过快,导致内存溢出,只能选择有界队列。
2) 加锁的方式通常会严重影响性能。 线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒, 这个过程存在很大的开销,而且存在死锁的隐患。
3) 有界队列通常采用数组实现。但是 采用数组实现又会引发另外一个问题false sharing(伪共享)。

2.2 Disruptor介绍

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列 ,研发的初衷是解决内存队列的延迟
问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。 基于Disruptor开发的系统单线程能支撑每秒600万订单 ,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性
能。
Github: https://github.com/LMAX-Exchange/disruptor
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。

2.3 Disruptor的高性能设计方案

Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
        为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原 理)。
元素位置定位
        数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问 题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
        每个生产者或者消费者线程,会通过先申请可以操作的元素在数组中的位置,申请到之后,直接在 该位置写入或者读取数据。 整个过程通过原子变量CAS,保证操作的线程安全。
利用缓存行填充解决了伪共享的问题
实现了基于事件驱动的生产者消费者模型(观察者模式)
        消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费。
RingBuffer数据结构
        使用RingBuffer来作为队列的数据结构, RingBuffer就是一个可自定义大小的环形数组 。除数组外 还有一个 序列号(sequence),用以指向下一个可用的元素 ,供生产者与消费者使用。原理图如下所 示:

Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只 有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece& (entries.length-1)
当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉
思考:覆盖数据是否会导致数据丢失呢?
等待策略
Disruptor在日志框架中的应用
Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优。 该特性源自于Log4j 2的异步模式 采用了Disruptor来处理。 在Log4j 2的配置文件中可以配置WaitStrategy,默认是Timeout策略。
        loggers all async采用的是Disruptor,而Async Appender采用的是ArrayBlockingQueue队列。 由图可见,单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程 的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。

2.4 Disruptor实战

引入依赖
<!-- disruptor -->
 <dependency>
 <groupId>com.lmax</groupId>
 <artifactId>disruptor</artifactId>
 <version>3.3.4</version>
 </dependency>
Disruptor构造器
public Disruptor(
 final EventFactory<T> eventFactory,
 final int ringBufferSize,
 final ThreadFactory threadFactory,
 final ProducerType producerType,
 final WaitStrategy waitStrategy)
EventFactory:创建事件(任务)的工厂类。
ringBufferSize:容器的长度。
ThreadFactory :用于创建执行任务的线程。
ProductType:生产者类型:单生产者、多生产者。
WaitStrategy:等待策略。
使用流程:
1)构建消息载体(事件)
2) 构建生产者
3)构建消费者
4) 生产消息,消费消息的测试
单生产者单消费者模式
1)创建Event(消息载体/事件)和EventFactory(事件工厂)
创建 OrderEvent 类,这个类将会被放入环形队列中作为消息内容。创建 OrderEventFactory类,用于创 建OrderEvent事件
@Data
 public class OrderEvent {
 private long value;
 private String name;
 }

 public class OrderEventFactory implements EventFactory<OrderEvent> {
 
 @Override
 public OrderEvent newInstance() {
 return new OrderEvent();
 }
 }
2) 创建消息(事件)生产者
创建 OrderEventProducer 类,它将作为生产者使用
 public class OrderEventProducer {
 //事件队列
 private RingBuffer<OrderEvent> ringBuffer;

 public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
 this.ringBuffer = ringBuffer;
 }

 public void onData(long value,String name) {
 // 获取事件队列 的下一个槽
 long sequence = ringBuffer.next();
 try {
 //获取消息(事件)
 OrderEvent orderEvent = ringBuffer.get(sequence);
 // 写入消息数据
 orderEvent.setValue(value);
 orderEvent.setName(name);
 } catch (Exception e) {
 // TODO 异常处理
 e.printStackTrace();
 } finally {
 System.out.println("生产者发送数据value:"+value+",name:"+name);
 //发布事件
 ringBuffer.publish(sequence);
 }
}
 }
3)创建消费者
创建 OrderEventHandler 类,并实现 EventHandler<T> ,作为消费者。
public class OrderEventHandler implements EventHandler<OrderEvent> {

 @Override
 public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws
Exception {
 // TODO 消费逻辑
 System.out.println("消费者获取数据value:"+
event.getValue()+",name:"+event.getName());
 }
 }
4) 测试
public class DisruptorDemo {

    public static void main(String[] args) throws Exception {

        //创建disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                new OrderEventFactory(),
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE, //单生产者
                new YieldingWaitStrategy() //等待策略
        );
        //设置消费者用于处理RingBuffer的事件
        disruptor.handleEventsWith(new OrderEventHandler());
        disruptor.start();

        //创建ringbuffer容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        //创建生产者
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        //发送消息
        for (int i = 0; i < 100; i++) {
            eventProducer.onData(i, "Fox" + i);
        }

        disruptor.shutdown();
    }
}
单生产者多消费者模式
如果消费者是多个,只需要在调用 handleEventsWith 方法时将多个消费者传递进去
//设置多消费者,消息会被重复消费
 disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下,只会 被一个消费者消费,那么需要调用 handleEventsWithWorkerPool 方法。
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
 disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
注意:消费者要实现WorkHandler接口
public class OrderEventHandler implements EventHandler<OrderEvent>,
        WorkHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws
            Exception {
        // TODO 消费逻辑
        System.out.println("消费者"+ Thread.currentThread().getName()
                +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    }

    @Override
    public void onEvent(OrderEvent event) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者"+ Thread.currentThread().getName()
                +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    }
}
多生产者多消费者模式
在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。

public class DisruptorDemo2 {

    public static void main(String[] args) throws Exception {

        //创建disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                new OrderEventFactory(),
                1024 * 1024,
                Executors.defaultThreadFactory(),
                ProducerType.MULTI, //多生产者
                new YieldingWaitStrategy() //等待策略
        );

        //设置消费者用于处理RingBuffer的事件
        //disruptor.handleEventsWith(new OrderEventHandler());
        //设置多消费者,消息会被重复消费
        //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
        //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new
                OrderEventHandler());

        //启动disruptor
        disruptor.start();

        //创建ringbuffer容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();

        new Thread(() -> {
            //创建生产者
            OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
            // 发送消息
            for (int i = 0; i < 100; i++) {
                eventProducer.onData(i, "Fox" + i);
            }
        }, "producer1").start();

        new Thread(() -> {
            //创建生产者
            OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
            // 发送消息
            for (int i = 0; i < 100; i++) {
                eventProducer.onData(i, "monkey" + i);
            }
        }, "producer2").start();


        //disruptor.shutdown();

    }
}

3.RingBuffer

3.1 ringbuffer到底是什么

它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。

基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。(如下图右边的图片表示序号,这个序号指向数组的索引4的位置。)


随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。

要找到数组中当前序号指向的元素,可以通过sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

常用的队列之间的区别

  • 没有尾指针。只维护了一个指向下一个可用位置的序号。
  • 不删除buffer中的数据,也就是说这些数据一直存放在buffer中,直到新的数据覆盖他们

3.2 ringbuffer采用这种数据结构原因

  • 因为它是数组,所以要比链表快,数组内元素的内存地址的连续性存储的。这是对CPU缓存友好的—也就是说,在硬件级别,数组中的元素是会被预加载的,因此在ringbuffer当中,cpu无需时不时去主存加载数组中的下一个元素。因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行。
  • 其次,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。

3.3 如何从Ringbuffer读取

消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9。

消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.

final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12。ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号.

接下来

接下来,消费者会一直逛来逛去,等待更多数据被写入 Ring Buffer。并且,写入数据后消费者会收到通知——节点 9,10,11 和 12 已写入。现在序号 12 到了,消费者可以指示 ConsumerBarrier 去拿这些序号里的数据了。

在Disruptor中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。

实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度
如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。

  • Producer会向这个RingBuffer中填充元素,填充元素的流程是首先从RingBuffer读取下一个Sequence,之后在这个Sequence位置的槽填充数据,之后发布。
  • Consumer消费RingBuffer中的数据,通过SequenceBarrier来协调不同的Consumer的消费先后顺序,以及获取下一个消费位置Sequence。
  • Producer在RingBuffer写满时,会从头开始继续写替换掉以前的数据。但是如果有SequenceBarrier指向下一个位置,则不会覆盖这个位置,阻塞到这个位置被消费完成。Consumer同理,在所有Barrier被消费完之后,会阻塞到有新的数据进来。

3.4 一个生产者

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入, 则返回最大的序列号. 这儿主要判断是否会覆盖未读的元素
  3. 若是返回的正确, 则生产者开始写入元素.

3.5 多个生产者

        多个生产者的情况下, 会遇到“如何防止多个线程重复写同一个元素”的问题. Disruptor的解决方法是, 每个线程获取不同的一段数组空间进行操作. 这个通过CAS很容易达到. 只需要在分配元素的时候, 通过CAS判断一下这段空间是否已经分配出去即可.

但是会遇到一个新问题: 如何防止读取的时候, 读到还未写的元素. Disruptor在多个生产者的情况下, 引入了一个与Ring Buffer大小相同的buffer: available Buffer. 当某个位置写入成功的时候, 便把availble Buffer相应的位置置位, 标记为写入成功. 读取的时候, 会遍历available Buffer, 来判断元素是否已经就绪.

3.6 读数据

生产者多线程写入的情况会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n, 这时仍然无法确定连续可读的最大下标. 从reader cursor开始读取available Buffer, 一直查到第一个不可用的元素, 然后返回最大连续可读元素的位置;
  3. 消费者读取元素.

如下图所示, 读线程读到下标为2的元素, 三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据, 写线程被分配到的最大元素下标是11.
读线程申请读取到下标从3到11的元素, 判断writer cursor>=11. 然后开始读取availableBuffer, 从3开始, 往后读取, 发现下标为7的元素没有生产成功, 于是WaitFor(11)返回6.

然后, 消费者读取下标从3到6共计4个元素.

3.7 写数据

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入, 则返回最大的序列号. 每个生产者会被分配一段独享的空间;
  3. 生产者写入元素, 写入元素的同时设置available Buffer里面相应的位置, 以标记自己哪些位置是已经写入成功的.
    如下图所示, Writer1和Writer2两个线程写入数组, 都申请可写的数组空间. Writer1被分配了下标3到下表5的空间, Writer2被分配了下标6到下标9的空间.

Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位, 标记已经写入成功, 往后移一位, 开始写下标4位置的元素. Writer2同样的方式. 最终都写入完成.

 防止不同生产者对同一段空间写入的代码, 如下所示:


public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }
 
    long current;
    long next;
 
    do
    {
        current = cursor.get();
        next = current + n;
 
        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));
 
    return next;
}

通过do/while循环的条件cursor.compareAndSet(current, next), 来判断每次申请的空间是否已经被其他生产者占据. 假如已经被占据, 该函数会返回失败, While循环重新执行, 申请写入空间.

消费者的流程与生产者非常类似, 这儿就不多描述了. Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/113737.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式linux常用的文件传输方式

做嵌入式就避免不了移植工作&#xff0c;所谓移植就是将交叉编译生成的可执行程序&#xff0c;库&#xff0c;配置文件等传输到开发板上进行工作。 常用传输方式有以下几种&#xff1a;1.串口传输 就是使用串口传输工具rz/sz; 该工具通过串口传输在SRT串口工具…

SRAM型FPGA空间应用的抗单粒子翻转

1 SRAM型FPGA内部结构 目前&#xff0c;SRAM 型 FPGA 在星载电子产品中应用较多的是Xilinx公司的FPGA。 图1所示为Xilinx系列FPGA器件的内部结构。从图中可以看出&#xff0c;FPGA内部由3 部分组成&#xff0c;分别为&#xff1a; 1&#xff09;用于实现用户逻辑的可编程阵…

【MySQL】MVCC机制(undo log,read view)

文章目录 前言一. 预备知识二. 模拟MVCC三. Read View四. RC与RR的本质区别结束语 前言 MVCC&#xff08;多版本并发控制&#xff09;是一种用来解决读-写冲突的无锁并发控制 MVCC为事务分配单向增长的事务ID&#xff0c;为每个修改保存一个版本&#xff0c;版本与事物ID相关联…

74X138元件怎么找——错误解决方法

1.在做74X138的时候根据课本&#xff0c;无法在现有的库中找到74X138&#xff0c;搜索了老师发的库中&#xff0c;都是集成库打不开&#xff0c;那我该怎么办? 根据这个课本P343&#xff0c;&#xff08;即机械工业出版社&#xff0c;刘超&#xff0c;包建荣&#xff0c;俞优姝…

CTF-Reverse---VM虚拟机逆向[HGAME 2023 week4]vm题目复现【详解】

文章目录 前言0x1[HGAME 2023 week4]vm提取汇编指令mov指令push指令&pop指令运算操作cmp指令jmp指令je 和 jne exp 前言 没有前言。终于搞定第二题了。费劲是真的费。 题目在nssctf上有。 这一题写完才看到有提示&#xff0c;是关于构建结构体的&#xff0c;下次试试。 0x…

1360. 日期之间隔几天

1360. 日期之间隔几天 Java代码&#xff1a; 【DateFormat】DateFormat用于实现日期的格式化 import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; // 好像已过时class Solution {public int daysBet…

vue3+ts 提取公共方法

因为好多页面都会使用到这个效验规则&#xff0c;封装一个校检规则&#xff0c;方便维护 封装前 封装后

【机器学习】四、计算学习理论

1 基础知识 计算学习理论&#xff08;computational learning theory&#xff09;&#xff1a;关于通过“计算”来进行“学习”的理论&#xff0c;即关于机器学习的理论基础&#xff0c;其目的是分析学习任务的困难本质&#xff0c;为学习算法体统理论保证&#xff0c;并根据结…

frp-内网穿透部署-ubuntu22服务器-windows server-详细教程

文章目录 1.下载frp2.Ubuntu修改root用户3.配置服务器3.1.配置frps.ini文件3.2.设置服务文件3.3.设置开机自启和服务操作3.4.后台验证3.5.服务器重启 4.配置本地window4.1.frpc配置4.2.添加开机计划启动4.3.控制台启动隐藏窗口 5.centos防火墙和端口5.1.开放端口5.2.查看端口 6…

【电路笔记】-相量图和相量代数

相量图和相量代数 文章目录 相量图和相量代数1、概述2、相量图3、相量代数3.1 加减3.2 差异化与整合 4、总结 1、概述 交流电信号可以用三种不同的方法来表示&#xff0c;以便表征和实现代数运算。 前面的文章中已经介绍了两种方法&#xff0c;本文稍后将介绍一种新的图形方法…

Pytorch 注意力机制解析与代码实现

目录 什么是注意力机制1、SENet的实现2、CBAM的实现3、ECA的实现4、CA的实现 什么是注意力机制 注意力机制是深度学习常用的一个小技巧&#xff0c;它有多种多样的实现形式&#xff0c;尽管实现方式多样&#xff0c;但是每一种注意力机制的实现的核心都是类似的&#xff0c;就…

【计算机网络】(谢希仁第八版)第一章课后习题答案

1.计算机网络可以向用户提供哪些服务&#xff1f; 答&#xff1a;例如音频&#xff0c;视频&#xff0c;游戏等&#xff0c;但本质是提供连通性和共享这两个功能。 连通性&#xff1a;计算机网络使上网用户之间可以交换信息&#xff0c;好像这些用户的计算机都可以彼此直接连…

036-第三代软件开发-系统时间设置

第三代软件开发-系统时间设置 文章目录 第三代软件开发-系统时间设置项目介绍系统时间设置演示效果QML 实现小伙伴自创 TumblerQt 家 Tumbler C 端实现 总结一下 关键字&#xff1a; Qt、 Qml、 Time、 时间、 系统 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;…

【VPX630】青翼 基于KU115 FPGA+C6678 DSP的6U VPX通用超宽带实时信号处理平台

板卡概述 VPX630是一款基于6U VPX总线架构的高速信号处理平台&#xff0c;该平台采用一片Xilinx的Kintex UltraScale系列FPGA&#xff08;XCKU115&#xff09;作为主处理器&#xff0c;完成复杂的数据采集、回放以及实时信号处理算法。采用一片带有ARM内核的高性能嵌入式处理器…

Nginx负载均衡 以及Linux前后端项目部署

一、Nginx简介 Nginx是一款高性能的开源Web服务器和反向代理服务器。它由俄罗斯的程序设计师Igor Sysoev创建&#xff0c;旨在解决传统Web服务器的性能限制问题。 Nginx采用事件驱动的架构和异步非阻塞的处理方式&#xff0c;使其能够处理大量并发连接&#xff0c;并具备良好…

【分享】winterm ssh登录报错Unkown error

非软文哈&#xff0c;实测Winterm非常好用&#xff0c;唯一的障碍是 某些特定服务器ssh登录报错Unkown error 后经github issue得知&#xff0c;关闭会话设置-ssh选项卡中的 尝试键盘交互认证的勾即可。 https://github.com/kingToolbox/WindTerm/issues/1922

二叉树题目:在二叉树中增加一行

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法一思路和算法代码复杂度分析 解法二思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;在二叉树中增加一行 出处&#xff1a;623. 在二叉树中增加一行 难度 5 级 题目描述 要求 给定一个二叉树的根结…

Vue 数据绑定 和 数据渲染

目录 一、Vue快速入门 1.简介 : 2.MVVM : 3.准备工作 : 二、数据绑定 1.实例 : 2.验证 : 三、数据渲染 1.单向渲染 : 2.双向渲染 : 一、Vue快速入门 1.简介 : (1) Vue[/vju/]&#xff0c;是Vue.js的简称&#xff0c;是一个前端框架&#xff0c;常用于构建前端用户…

C++二分查找算法的应用:俄罗斯套娃信封问题

本文涉及的基础知识点 二分查找 题目 给你一个二维整数数组 envelopes &#xff0c;其中 envelopes[i] [wi, hi] &#xff0c;表示第 i 个信封的宽度和高度。 当另一个信封的宽度和高度都比这个信封大的时候&#xff0c;这个信封就可以放进另一个信封里&#xff0c;如同俄罗…

assert断言与const修饰指针的妙用(模拟实现strcpy函数)

assert断言 目录 assert断言的妙用&#xff1a; 头文件&#xff1a; 使用方法&#xff1a; const修饰指针的妙用 主要用法 const在*左边 const在*右边 断言和const修饰指针的应用 模拟实现C语言strcpy函数 1、若字符串str1,str2有空指针怎么办&#xff1f; 2.str2改变…
最新文章