CountDownLatch的使用
1. 他是什么?
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
是一个多功能的同步辅助器, 允许一个或者多个线程等待, 直到其他线程的操作完成.
可以给定一个count进行初始化, 用await方法进行阻塞, 直到当前计数通过countDown方法减少至0之后, 所有等待线程被释放.
2. 他的作用
可以用于让一个线程等待N个线程完成某个动作(某个动作已经完成了N次)
初始化方法,其中count代表计数次数:
final CountDownLatch countDownLatch = new CountDownLatch(12);
计数方法,每调用一次计数一次:
countDownLatch.countDown();
阻塞方法(带时间参数,代表兜底方案, 最多阻塞多少时间)
countDownLatch.await(5,TimeUnit.SECONDS);
countDownLatch.await();
3. 实战
3.1 需求
批处理每日推送消息: 每天某个时间轮训扫表3000万个用户进行推送消息
主页获取展示页面接口: 一次性调用12个接口封装前端参数, 一个接口返回
3.2 代码展示
批处理每日推送消息:
// 批次结束完才能进行下一波
final CountDownLatch countDownLatch = new CountDownLatch(users.size());
if (CollectionUtil.isNotEmpty(users)) {
for (UserDo userDo : users) {
ThreadPoolUtil.execute(() -> {
try {
pushMessage(userDo );
} catch (Exception e) {
slsLogger.error(String.format("推送消息发生异常{%s}", userDo ), e);
}
// 计数
countDownLatch.countDown();
});
}
}
// 最长等待20秒钟时间, 释放阻塞
countDownLatch.await(20, TimeUnit.SECONDS);
主页获取展示页面接口:
package com.gupao.springbootdemo.test;
import com.gupao.springbootdemo.util.ThreadPoolUtil;
import org.apache.commons.compress.utils.Lists;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 功能描述:
*
* @Author: zhouzhou
* @Date: 2021/6/16$ 16:04$
*/
public class CDL {
@Test
public void test1() throws Exception {
Long start = System.currentTimeMillis();
List<String> show = Lists.newArrayList();
final CountDownLatch countDownLatch = new CountDownLatch(12);
// ...... 共十二次
ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowOne(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowTwo(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
ThreadPoolUtil.execute(() -> doShowThree(countDownLatch,show));
countDownLatch.await(5,TimeUnit.SECONDS);
System.out.println(show);
System.out.println(String.format("cost{%s}ms", System.currentTimeMillis() - start));
}
private void doShowOne(CountDownLatch countDownLatch, List<String> show) {
// doSomething
show.add("haha" + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
// xxx
}
countDownLatch.countDown();
}
private void doShowTwo(CountDownLatch countDownLatch, List<String> show) {
// doSomething
show.add("hehe" + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
// xxx
}
countDownLatch.countDown();
}
private void doShowThree(CountDownLatch countDownLatch, List<String> show) {
// doSomething
show.add("enen" + System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
// xxx
}
countDownLatch.countDown();
}
}