1.CPU缓存架构详解
1.1 CPU高速缓存概念
1.2 CPU多核缓存架构
场景二
1.3 CPU缓存架构缓存一致性的解决方案
1.4 缓存一致性协议实现原理
或者执行 cat /proc/cpuinfo 命令
class Pointer {
volatile long x;
//避免伪共享: 缓存行填充
long p1, p2, p3, p4, p5, p6, p7;
volatile long y;
}
public class FalseSharingTest {
public static void main(String[] args) throws InterruptedException {
testPointer(new Pointer());
}
private static void testPointer(Pointer pointer) throws InterruptedException {
long start = System.currentTimeMillis();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 100000000; i++) {
pointer.x++;
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 100000000; i++) {
pointer.y++;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(pointer.x + "," + pointer.y);
System.out.println(System.currentTimeMillis() - start);
}
}
class Pointer {
// 避免伪共享: @Contended + jvm参数:-XX:-RestrictContended jdk8支持
//@Contended
volatile long x;
//避免伪共享: 缓存行填充
//long p1, p2, p3, p4, p5, p6, p7;
volatile long y;
}
2.高性能内存队列Disruptor详解
2.1 juc包下阻塞队列的缺陷
2.2 Disruptor介绍
2.3 Disruptor的高性能设计方案
2.4 Disruptor实战
<!-- disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
单生产者单消费者模式
1)创建Event(消息载体/事件)和EventFactory(事件工厂)
@Data
public class OrderEvent {
private long value;
private String name;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
2) 创建消息(事件)生产者
public class OrderEventProducer {
//事件队列
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long value,String name) {
// 获取事件队列 的下一个槽
long sequence = ringBuffer.next();
try {
//获取消息(事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 写入消息数据
orderEvent.setValue(value);
orderEvent.setName(name);
} catch (Exception e) {
// TODO 异常处理
e.printStackTrace();
} finally {
System.out.println("生产者发送数据value:"+value+",name:"+name);
//发布事件
ringBuffer.publish(sequence);
}
}
}
3)创建消费者
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws
Exception {
// TODO 消费逻辑
System.out.println("消费者获取数据value:"+
event.getValue()+",name:"+event.getName());
}
}
4) 测试
public class DisruptorDemo {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.SINGLE, //单生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
//发送消息
for (int i = 0; i < 100; i++) {
eventProducer.onData(i, "Fox" + i);
}
disruptor.shutdown();
}
}
单生产者多消费者模式
//设置多消费者,消息会被重复消费
disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
public class OrderEventHandler implements EventHandler<OrderEvent>,
WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws
Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
@Override
public void onEvent(OrderEvent event) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
}
多生产者多消费者模式
public class DisruptorDemo2 {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.MULTI, //多生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
//disruptor.handleEventsWith(new OrderEventHandler());
//设置多消费者,消息会被重复消费
//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new
OrderEventHandler());
//启动disruptor
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(() -> {
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for (int i = 0; i < 100; i++) {
eventProducer.onData(i, "Fox" + i);
}
}, "producer1").start();
new Thread(() -> {
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for (int i = 0; i < 100; i++) {
eventProducer.onData(i, "monkey" + i);
}
}, "producer2").start();
//disruptor.shutdown();
}
}
3.RingBuffer
3.1 ringbuffer到底是什么
它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。
基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。(如下图右边的图片表示序号,这个序号指向数组的索引4的位置。)
随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。
要找到数组中当前序号指向的元素,可以通过sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。
常用的队列之间的区别
- 没有尾指针。只维护了一个指向下一个可用位置的序号。
- 不删除buffer中的数据,也就是说这些数据一直存放在buffer中,直到新的数据覆盖他们
3.2 ringbuffer采用这种数据结构原因
- 因为它是数组,所以要比链表快,数组内元素的内存地址的连续性存储的。这是对CPU缓存友好的—也就是说,在硬件级别,数组中的元素是会被预加载的,因此在ringbuffer当中,cpu无需时不时去主存加载数组中的下一个元素。因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行。
- 其次,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。
3.3 如何从Ringbuffer读取
消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9。
消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.
final long availableSeq = consumerBarrier.waitFor(nextSequence);
ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12。ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号.
接下来
接下来,消费者会一直逛来逛去,等待更多数据被写入 Ring Buffer。并且,写入数据后消费者会收到通知——节点 9,10,11 和 12 已写入。现在序号 12 到了,消费者可以指示 ConsumerBarrier 去拿这些序号里的数据了。
在Disruptor中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。
实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度。
如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。
- Producer会向这个RingBuffer中填充元素,填充元素的流程是首先从RingBuffer读取下一个Sequence,之后在这个Sequence位置的槽填充数据,之后发布。
- Consumer消费RingBuffer中的数据,通过SequenceBarrier来协调不同的Consumer的消费先后顺序,以及获取下一个消费位置Sequence。
- Producer在RingBuffer写满时,会从头开始继续写替换掉以前的数据。但是如果有SequenceBarrier指向下一个位置,则不会覆盖这个位置,阻塞到这个位置被消费完成。Consumer同理,在所有Barrier被消费完之后,会阻塞到有新的数据进来。
3.4 一个生产者
生产者单线程写数据的流程比较简单:
- 申请写入m个元素;
- 若是有m个元素可以写入, 则返回最大的序列号. 这儿主要判断是否会覆盖未读的元素
-
若是返回的正确, 则生产者开始写入元素.
3.5 多个生产者
多个生产者的情况下, 会遇到“如何防止多个线程重复写同一个元素”的问题. Disruptor的解决方法是, 每个线程获取不同的一段数组空间进行操作. 这个通过CAS很容易达到. 只需要在分配元素的时候, 通过CAS判断一下这段空间是否已经分配出去即可.
但是会遇到一个新问题: 如何防止读取的时候, 读到还未写的元素. Disruptor在多个生产者的情况下, 引入了一个与Ring Buffer大小相同的buffer: available Buffer. 当某个位置写入成功的时候, 便把availble Buffer相应的位置置位, 标记为写入成功. 读取的时候, 会遍历available Buffer, 来判断元素是否已经就绪.
3.6 读数据
生产者多线程写入的情况会复杂很多:
- 申请读取到序号n;
- 若writer cursor >= n, 这时仍然无法确定连续可读的最大下标. 从reader cursor开始读取available Buffer, 一直查到第一个不可用的元素, 然后返回最大连续可读元素的位置;
- 消费者读取元素.
如下图所示, 读线程读到下标为2的元素, 三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据, 写线程被分配到的最大元素下标是11.
读线程申请读取到下标从3到11的元素, 判断writer cursor>=11. 然后开始读取availableBuffer, 从3开始, 往后读取, 发现下标为7的元素没有生产成功, 于是WaitFor(11)返回6.
然后, 消费者读取下标从3到6共计4个元素.
3.7 写数据
多个生产者写入的时候:
- 申请写入m个元素;
- 若是有m个元素可以写入, 则返回最大的序列号. 每个生产者会被分配一段独享的空间;
- 生产者写入元素, 写入元素的同时设置available Buffer里面相应的位置, 以标记自己哪些位置是已经写入成功的.
如下图所示, Writer1和Writer2两个线程写入数组, 都申请可写的数组空间. Writer1被分配了下标3到下表5的空间, Writer2被分配了下标6到下标9的空间.
Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位, 标记已经写入成功, 往后移一位, 开始写下标4位置的元素. Writer2同样的方式. 最终都写入完成.
防止不同生产者对同一段空间写入的代码, 如下所示:
public long tryNext(int n) throws InsufficientCapacityException
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));
return next;
}
通过do/while循环的条件cursor.compareAndSet(current, next), 来判断每次申请的空间是否已经被其他生产者占据. 假如已经被占据, 该函数会返回失败, While循环重新执行, 申请写入空间.
消费者的流程与生产者非常类似, 这儿就不多描述了. Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能.