CountDownLatch的使用

1. 他是什么?

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

是一个多功能的同步辅助器, 允许一个或者多个线程等待, 直到其他线程的操作完成.

可以给定一个count进行初始化, 用await方法进行阻塞, 直到当前计数通过countDown方法减少至0之后, 所有等待线程被释放. 

2. 他的作用

可以用于让一个线程等待N个线程完成某个动作(某个动作已经完成了N次)

 初始化方法,其中count代表计数次数:

final CountDownLatch countDownLatch = new CountDownLatch(12);

 计数方法,每调用一次计数一次:

countDownLatch.countDown();

阻塞方法(带时间参数,代表兜底方案, 最多阻塞多少时间)

countDownLatch.await(5,TimeUnit.SECONDS);
countDownLatch.await();

3. 实战

3.1  需求

批处理每日推送消息: 每天某个时间轮训扫表3000万个用户进行推送消息

主页获取展示页面接口: 一次性调用12个接口封装前端参数, 一个接口返回

 

3.2 代码展示

批处理每日推送消息:

            // 批次结束完才能进行下一波
            final CountDownLatch countDownLatch = new CountDownLatch(users.size());

            if (CollectionUtil.isNotEmpty(users)) {
                for (UserDo userDo : users) {
                    ThreadPoolUtil.execute(() -> {
                        try {
                            pushMessage(userDo );
                        } catch (Exception e) {
                            slsLogger.error(String.format("推送消息发生异常{%s}", userDo ), e);
                        }
                        // 计数
                        countDownLatch.countDown();
                    });

                }
            }
            // 最长等待20秒钟时间, 释放阻塞
            countDownLatch.await(20, TimeUnit.SECONDS);

主页获取展示页面接口:

package com.gupao.springbootdemo.test;

import com.gupao.springbootdemo.util.ThreadPoolUtil;
import org.apache.commons.compress.utils.Lists;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 功能描述:
 *
 * @Author: zhouzhou
 * @Date: 2021/6/16$ 16:04$
 */
public class CDL {


    @Test
    public void test1() throws Exception {
        Long start = System.currentTimeMillis();
        List<String> show = Lists.newArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(12);

        // ...... 共十二次

        ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));

        ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));

        ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
        ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));

        countDownLatch.await(5,TimeUnit.SECONDS);

        System.out.println(show);
        System.out.println(String.format("cost{%s}ms", System.currentTimeMillis() - start));
    }

    private void doShowOne(CountDownLatch countDownLatch, List<String> show) {
        // doSomething
        show.add("haha" + System.currentTimeMillis());
        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            // xxx
        }
        countDownLatch.countDown();
    }

    private void doShowTwo(CountDownLatch countDownLatch, List<String> show) {
        // doSomething
        show.add("hehe" + System.currentTimeMillis());

        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            // xxx
        }
        countDownLatch.countDown();
    }

    private void doShowThree(CountDownLatch countDownLatch, List<String> show) {
        // doSomething
        show.add("enen" + System.currentTimeMillis());

        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            // xxx
        }
        countDownLatch.countDown();
    }
}