博客
关于我
无锁并发框架-Disruptor的使用(二)
阅读量:366 次
发布时间:2019-03-05

本文共 4548 字,大约阅读时间需要 15 分钟。

文章目录

1、生产消费模型的应用

1.1、引入依赖

<dependency>	<groupId>com.lmax</groupId>	<artifactId>disruptor</artifactId>	<version>3.2.1</version></dependency><dependency>	<groupId>org.projectlombok</groupId>	<artifactId>lombok</artifactId>	<version>1.18.8</version>	<optional>true</optional></dependency>

1.2、定义Event

package com.zhz.disruptor.event;import lombok.Data;/** * @author :zhz * @date :Created in 2020/12/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 定义事件event 通过Disruptor 进行交换的数据类型。 **/@Datapublic class LongEvent {       private Long value;}

1.3、定义EventFactory

package com.zhz.disruptor.event;import com.lmax.disruptor.EventFactory;/** * @author :zhz * @date :Created in 2020/12/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 定义事件工厂 **/public class LongEventFactory implements EventFactory<LongEvent> {       @Override    public LongEvent newInstance() {           return new LongEvent();    }}

1.4、定义事件消费者

package com.zhz.disruptor;import com.lmax.disruptor.EventHandler;import com.zhz.disruptor.event.LongEvent;import lombok.extern.slf4j.Slf4j;/** * @author :zhz * @date :Created in 2020/12/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 事件消费者 **/@Slf4jpublic class LongEventHandler implements EventHandler<LongEvent> {       private long serial = 0;    public LongEventHandler(long serial){           this.serial = serial;    }    @Override    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {           log.info("消费者-{}:{}",this.serial,event.getValue());    }}

1.5、定义生产者

package com.zhz.disruptor;import com.lmax.disruptor.RingBuffer;import com.zhz.disruptor.event.LongEvent;import lombok.extern.slf4j.Slf4j;import java.nio.ByteBuffer;/** * @author :zhz * @date :Created in 2020/12/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: 事件生产者 **/@Slf4jpublic class LongEventProducer {       public final RingBuffer<LongEvent> ringBuffer;    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {           this.ringBuffer = ringBuffer;    }    public void onData(ByteBuffer byteBuffer) {           // 1.ringBuffer 事件队列 下一个槽        long sequence = ringBuffer.next();        Long data = null;        try {               //2.取出空的事件队列            LongEvent event = ringBuffer.get(sequence);            //3.获取事件队列传递的数据            data = byteBuffer.getLong(0);            event.setValue(data);            try {                   Thread.sleep(10);            } catch (InterruptedException e) {                   e.printStackTrace();            }        } finally {               log.info("生产者准备发送数据");            //4.发布事件,            //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;            // 如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。            ringBuffer.publish(sequence);        }    }}

1.6、定义Main入口

package com.zhz.disruptor;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import com.zhz.disruptor.event.LongEvent;import com.zhz.disruptor.event.LongEventFactory;import java.nio.ByteBuffer;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @author :zhz * @date :Created in 2020/12/30 * @version: V1.0 * @slogan: 天下风云出我辈,一入代码岁月催 * @description: **/public class DisruptorMain {       public static void main(String[] args) {           // 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理        ExecutorService executor = Executors.newCachedThreadPool();        // 2.创建工厂        EventFactory eventFactory = new LongEventFactory();        // 3.创建ringBuffer 大小,ringBufferSize大小一定要是2的N次方        int ringBufferSize = 1024;        // 4.创建Disruptor        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());        // 5.连接消费端方法        disruptor.handleEventsWith(new LongEventHandler(1),new LongEventHandler(2));        // 6.启动        disruptor.start();        // 7.创建RingBuffer容器        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();        // 8.创建生产者        LongEventProducer producer = new LongEventProducer(ringBuffer);        // 9.指定缓冲区大小        ByteBuffer byteBuffer = ByteBuffer.allocate(8);        for (int i = 1; i <= 10; i++) {               byteBuffer.putLong(0,i);            producer.onData(byteBuffer);        }        //10.关闭disruptor和executor        disruptor.shutdown();        executor.shutdown();    }}

我是小白弟弟,一个在互联网行业的小白,立志成为一名架构师

转载地址:http://fbbwz.baihongyu.com/

你可能感兴趣的文章
并发框架下的“基础类型”——浅析基本类型、ThreadLocal、原子类的线程安全机制
查看>>
Android Studio同步Gradle失败的解决办法
查看>>
VHDL代码风格
查看>>
C++系列7:回调函数
查看>>
图像处理系列1.skimage
查看>>
好用的拼图小程序,图作妖
查看>>
读取二进制存储信息,将低位二进制存储转换为高位存储
查看>>
Hibernate操作Blob,将Blob转换为String
查看>>
Object Clone
查看>>
Javascript中String支持使用正则表达式的四种方法
查看>>
2021年判断浏览器最新写法,你都掌握了吗?
查看>>
简易版Http请求工具
查看>>
【法律】如何保障企业的合法权益:保密协议模板
查看>>
【IoT】 产品设计之结构设计:材料工艺选择及特点(PP、PVC、PE、PS、ABS、PC)
查看>>
【IoT】 产品设计之结构设计:PMMA(亚克力)板、PC耐力板、PS有机板与MS板区别
查看>>
【IoT】蓝牙BLE基础:CC254x通信系列之模拟SPI协议
查看>>
【IoT】TI BLE CC2541 串口控制蓝牙详解
查看>>
【企业】走近华为,微观世界
查看>>
【产品】项目管理的五个过程和九大知识领域之二
查看>>
【项目管理】项目管理流程浅析
查看>>