最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

焦点热文:异步批处理教程

来源:博客园

书接上回 大数据量、高并发业务怎么优化?(一) 文章中介绍了异步批处理的三种方式,本文继续深入针对前两种进行讲解,并给出代码示例:


(相关资料图)

一 普通版本,采用阻塞队列ArrayBlockingQueue

使用普通方式能够直接基于JDK中现成的并发包ArrayBlockingQueue提供的 offer(E e, long timeout, TimeUnit unit)(添加元素到队列尾部,如果队列已满则等待参数指定时间后返回false)方法 和 poll(long timeout, TimeUnit unit)(从队列头部获取元素,如果队列为空则等待参数指定时间后返回null)方法,来达到异步批处理效果

生产者代码:由于采用内存队列,最好在创建 ArrayBlockingQueue时指定队列大小,防止队列无界,导致内存溢出

/** * 生产者 */@Component@Slf4jpublic class MonitorQueue {    private BlockingQueue> queue = new ArrayBlockingQueue<>(10000000);    public void put(List list) {        try {            queue.put(list);        } catch (InterruptedException e) {            log.error(String.format("队列put异常:%s", e.getMessage()), e);        }    }    public void offer(List list, long timeout, TimeUnit unit) throws InterruptedException {        queue.offer(list, timeout, unit);    }    public List poll(long timeout, TimeUnit unit) throws InterruptedException {        return queue.poll(timeout, unit);    }}

消费者代码:在创建生产者时开启一个子线程在死循环中一直读取队列元素,直到队列元素超过我们的 maxNum时,将临时列表元素插入数据库中

/** * 消费者 */@Slf4j@Componentpublic class MonitorConsumer implements Runnable {    @Autowired    private MonitorQueue queue;    @Autowired    private MonitorService monitorService;    @PostConstruct    public void init() {        new Thread(this, "monitor-collect").start();    }    // 临时列表大小限制    private int maxNum = 2000;    @SuppressWarnings("InfiniteLoopStatement")    @Override    public void run() {        while (true) {            handler();        }    }    private void handler() {        try {            List temp = new ArrayList<>(maxNum);            while (temp.size() <= maxNum) {                List list = queue.poll(20, TimeUnit.SECONDS);                if (CollectionUtil.isNotEmpty(list)) {                    temp.addAll(list);                } else {                    break;                }            }            if (CollectionUtil.isEmpty(temp)) {                return;            }            int i = monitorService.batchSave(temp);            log.debug("----------------------------batchSave num:{}, collect.size:{}", i, collect.size());        } catch (Exception e) {            log.error(String.format("消费者异常: %s", e.getMessage()), e);        }    }}    

可以看到采用该种方式实现的异步批量入库代码比较简单,便于理解,在性能上,基本都能够满足日常普通业务存在的批量入库场景

二 进阶版,采用 Disruptor队列,本文基于 Disruptor最新4.0版本

先给出 Disruptor官网简介

Disruptor 是一个提供并发环形缓冲区数据结构的库。它旨在在异步事件处理架构中提供低延迟、高吞吐量的工作队列。为了理解 Disruptor 的好处,我们可以将它与一些很好理解且目的非常相似的东西进行比较。在 Disruptor 的情况下,这将是 Java 的 BlockingQueue。与队列一样,Disruptor 的目的是在同一进程内的线程之间移动数据(例如消息或事件)。然而,Disruptor 提供的一些关键特性使其有别于队列。他们是:向消费者多播事件,带有消费者依赖图。为事件预分配内存。可选无锁

Disruptor给我们在项目中实现异步批处理提供了另一种方式,一种无锁、延迟更低、吞吐量更高、提供消费者多播等等的内存队列

下面介绍如何使用

2.1 依赖安装

    com.lmax    disruptor    4.0.0.RC1

2.2 Disruptor使用代码如下:

public class LongEvent{    private long value;    public void set(long value){        this.value = value;    }    @Override    public String toString(){        return "LongEvent{" + "value=" + value + "}";    }}@Slf4jpublic class LongEventMain {    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {        log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);    }    public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {        event.set(buffer.getLong(0));    }    public static void main(String[] args) throws Exception {        int bufferSize = 128;        // 1. 创建Disruptor对象        Disruptor disruptor =                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());        // 2. 添加事件处理类(消费者)        disruptor.handleEventsWith(LongEventMain::handleEvent);        // 3. 开启事件处理线程        disruptor.start();        // 4. 获取ringBuffer        RingBuffer ringBuffer = disruptor.getRingBuffer();        ByteBuffer bb = ByteBuffer.allocate(8);        for (long l = 0; true; l++) {            bb.putLong(0, l);            // 5. 发布事件(生产者)            ringBuffer.publishEvent(LongEventMain::translate, bb);            Thread.sleep(1);        }    }}

2.3 上面代码完成了一个事件发布后,事件处理类就能够收到对应事件信息的功能,但是我们想要的是能在消费者线程中批量处理生产者数据的逻辑,还得再修改一下事件处理类代码,如下:

@Slf4jpublic class LongEventBatch implements EventHandler {    private static final int MAX_BATCH_SIZE = 20;    private final List batch = new ArrayList<>();    public LongEventBatch() {        // 虚拟机关闭处理        Runtime.getRuntime().addShutdownHook(new Thread(() -> {            log.info("------------------ShutdownHook-DataEventHandler,上报tempList");            if (batch.size() > 0) {                // 批量入库伪代码                int i = xxxService.batchSave(temp);            }        }));    }        @Override    public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) {        log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);        batch.add(event);        if (batch.size() >= MAX_BATCH_SIZE) {            processBatch(batch);        }    }    private void processBatch(final List batch) {        // 批量入库伪代码        int i = xxxService.batchSave(temp);        // 记得清空batch列表        batch.clear();    }}

由此,我们就实现了基于 Disruptor的异步批处理逻辑,该方式会比普通版本性能高出一个数量级,大家在工作中可以尝试使用一番

最后

附博主 github地址 https://github.com/wayn111

关键词: 事件处理 指定时间 事件信息