Java集合(八)BlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue
文章目录
本系列文章:
Java集合(一)集合框架概述
Java集合(二)List、ArrayList、LinkedList
Java集合(三)CopyOnWriteArrayList、Vector、Stack
Java集合(四)Map、HashMap
Java集合(五)LinkedHashMap、TreeMap
Java集合(六)Hashtable、ConcurrentHashMap
Java集合(七)Set、HashSet、LinkedHashSet、TreeSet
Java集合(八)BlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue
【BlockingQueue】
一、Queue和BlockingQueue
Queue,队列。Queue通常但不一定是以FIFO(先进先出)方式排序元素。
Queue通常不允许插入null元素。
Queue一般用作高并发集合容器。
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
- 当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;
- 当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作。
BlockingQueue的接口位于JUC包中,表明了BlockingQueue是线程安全的
。
阻塞队列被广泛使用在“生产者-消费者”问题中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。BlockingQueue适合用于作为数据共享的通道。
使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
阻塞队列和一般的队列的区别就在于:
- 多线程支持,多个线程可以安全的访问队列。
- 阻塞操作,当队列为空的时候,消费线程会阻塞等待队列不为空;当队列满了的时候,生产线程就会阻塞直到队列不满。
- 生产者消费者模式
生产者消费者模式会经常使用阻塞队列来实现:
负责生产的线程不断的制造新对象并插入到阻塞队列中,直到达到这个队列的上限值。队列达到上限值之后生产线程将会被阻塞,直到消费的线程对这个队列进行消费。同理,负责消费的线程不断的从队列中消费对象,直到这个队列为空,当队列为空时,消费线程将会被阻塞,除非队列中有新的对象被插入。
阻塞队列有:
- ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
二、BlockingQueue接口中的方法
阻塞队列一共有四套方法分别用来进行insert、remove,当每套方法对应的操作不能马上执行时会有不同的结果:
抛异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
元素入队 | add(o) | offer(o) | put(o) | offer(o,timeout,timeunit) |
删除元素 | remove(o) 删除某个指定的元素 | poll() 删除并返回队首元素 | take() 删除并返回队首元素 | poll(timeout,timeunit) |
元素出队 | element() 队首元素出队 | peek() 队首元素出队 |
这四套方法对应的特点分别是:
抛异常
:如果操作不能马上进行,则抛出异常;
特殊值
:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false;
阻塞
:如果操作不能马上进行,操作会被阻塞;
超时
:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false。
阻塞队列区别于其他类型的队列的最主要的特点就是阻塞
这两个字,阻塞功能使得生产者和消费者两端的能力得以平衡:当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是take
方法和put
方法。
- 1、take 方法
take方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行take方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程:
- 2、put 方法
put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。过程:
阻塞队列分类:
- 1、ArrayBlockingQueue
有界阻塞队列,底层采用数组实现。ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不能保证线程访问队列的公平性,参数 fair 可用于设置线程是否公平访问队列。为了保证公平性,通常会降低吞吐量。
private static ArrayBlockingQueue<Integer> blockingQueue =
new ArrayBlockingQueue<Integer>(10,true);//fair
- 2、LinkedBlockingQueue
LinkedBlockingQueue是一个用单向链表实现的有界阻塞队列,可以当做无界队列也可以当做有界队列来使用。通常在创建 LinkedBlockingQueue 对象时,会指定队列最大的容量。此队列的默认和最大长度为 Integer.MAX_VALUE 。此队列按照先进先出的原则对元素进行排序。与 ArrayBlockingQueue 相比起来具有更高的吞吐量。 - 3、PriorityBlockingQueue
支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo() 方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator 来进行排序。
PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容。
PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞). - 4、DelayQueue
支持延时获取元素的无界阻塞队列。队列使用PriorityBlockingQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。 - 5、SynchronousQueue
不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。支持公平访问队列。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。 - 6、LinkedTransferQueue
由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法。
transfer方法:如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法:用来试探生产者传入的元素能否直接传给消费者。如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
三、ArrayBlockingQueue和LinkedBlockingQueue的比较*
ArrayBlockingQueue | LinkedBlockingQueue | |
---|---|---|
锁的实现 | 锁是没有分离的,生产和消费用的是同一个锁 | 锁是分离的,生产用的是putLock,消费是takeLock |
底层实现 | 数组 | 链表 |
队列边界 | 必须指定队列的大小,是有界队列 | 不指定队列的大小,但是默认是Integer.MAX_VALUE,是无界队列 |
LinkedBlockingQueue用了两把锁,所以并发性能更好。
由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
【ArrayBlockingQueue】
一、ArrayBlockingQueue介绍
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。它的本质是一个基于数组的BlockingQueue的实现。
ArrayBlockingQueue结构图示:
1.1 ArrayBlockingQueue特点*
- 1、ArrayBlockingQueue是一个
底层是数组
的有界
队列,FIFO(先进先出
)。 - 2、新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
- 3、ArrayBlockingQueue是一个简单的“有界缓存区”,
一旦创建,就不能在增加其容量
。 - 4、在向已满的队列中添加元素会导致操作阻塞,从空队列中提取元素也将导致阻塞。
- 5、此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序的(即采用非公平策略)。可以将公平性(fair)设置为true,而构造的队列允许按照FIFO顺序访问线程。公平性通常会降低吞吐量,但可以保证线程的“先来后到”。
- 6、
线程安全
。
ArrayBlockingQueue一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,因此可能存在当ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue的情况。如果保证公平性,通常会降低吞吐量。
1.2 ArrayBlockingQueue的使用
- 1、构造方法
//创建具有指定容量和默认(非公平)锁策略的 ArrayBlockingQueue
public ArrayBlockingQueue(int capacity)
//创建一个具有指定容量和指定锁策略的 ArrayBlockingQueue
public ArrayBlockingQueue(int capacity, boolean fair)
- 2、添加元素
//添加元素到队列的尾部
public boolean add(E e)
//添加元素到队列尾部
public boolean offer(E e)
//在该队列的尾部插入指定的元素,如果队列已满就等待指定的时间
public boolean offer(E e, long timeout, TimeUnit unit)
//在队列的尾部插入指定的元素
public void put(E e) throws InterruptedException
- 3、删除元素
//检索并删除此队列的头元素,如果此队列为空,则返回Null
public E poll()
//检索并删除此队列的头元素,如果队列为空,则等待指定的时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException
//从该队列中删除指定元素的单个实例
public boolean remove(Object o)
//检索并删除此队列的头元素
public E take() throws InterruptedException
//删除所有的元素
public void clear()
- 4、获取队首元素
//检索但不删除此队列的头元素,如果此队列为空,则返回null
public E peek()
- 5、其他方法
//判断是否包含指定元素
public boolean contains(Object o)
//获取迭代器
public Iterator<E> iterator()
//返回此队列中的元素数
public int size()
二、从源码理解ArrayBlockingQueue
变量:
private static final long serialVersionUID = -817911632652898426L;
//存储对象的数组
final Object[] items;
//用来为下一个take/poll/remove的索引(出队)
int takeIndex;
//用来为下一个put/offer/add的索引(入队)
int putIndex;
//队列中元素的个数
int count;
//重入锁,出队和入队持有这一把锁
final ReentrantLock lock;
//出队的条件
private final Condition notEmpty;
//入队的条件
private final Condition notFull;
//Itrs 维护一个 Itr 链表,用于在一个队列下的多个 Itr 迭代器中共享队列元素,
//保证多个迭代器中的元素数据的一致性
transient Itrs itrs = null;
2.1 创建对象
在创建ArrayBlockingQueue对象时需要指定容量和是否公平访问的策略。
//构造一个指定初始容量和指定公平锁策略的ArrayBlockingQueue
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化底层数组
this.items = new Object[capacity];
//若fair为false,则为非公平锁;若fair为true,则为公平锁
lock = new ReentrantLock(fair);
//阻塞出队条件
notEmpty = lock.newCondition();
//阻塞入队条件
notFull = lock.newCondition();
}
//指定初始容量,采用非公平锁策略
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
2.2 添加元素
- 1、元素入队
public boolean offer(E e) {
//检测是否为空,若为空则抛出NullPointerException
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//若队列已满,不能再添加
if (count == items.length)
return false;
//队列未满时,就让元素入队
else {
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
元素入队用的具体方法是enqueue(E x):
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//当putIndex 等于数组长度(队列中元素已满)时,将 putIndex 重置为 0
if (++putIndex == items.length)
putIndex = 0;
//队列元素个数+1
count++;
//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在
//消费者线程阻塞,就可以开始取出元素
notEmpty.signal();
}
putIndex为什么会在等于数组长度的时候重新设置为0?因为ArrayBlockingQueue是一个FIFO的队列,队列添加元素时,是从队尾获取putIndex来存储元素。当putIndex等于数组长度时,下次就需要从数组头部开始添加了。
- 2、具有超时功能的元素入队方法
这个方法的作用和上一个方法的作用相似,不过带有超时等待功能。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//检测元素是否为空
checkNotNull(e);
//将参数中的时间转化为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加锁,该锁可被打断
lock.lockInterruptibly();
try {
//队列已满
while (count == items.length) {
//如果等待时间耗尽,返回添加元素失败
if (nanos <= 0)
return false;
//更新等待的超时时间
nanos = notFull.awaitNanos(nanos);
}
//元素入队
enqueue(e);
return true;
} finally {
//释放锁
lock.unlock();
}
}
- 3、带有阻塞功能的元素入队
//put 方法和 add 方法功能一样(添加元素),差异是 put 方法如果队列满了,会阻塞
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
//队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中
notFull.await();
//元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
2.3 删除元素
- 1、删除队列中的指定元素
public boolean remove(Object o) {
//如果o为null,不删除任何元素
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列中有元素
if (count > 0) {
//获取下一个要添加元素时的索引
final int putIndex = this.putIndex;
//获取当前要被移除的元素的索引
int i = takeIndex;
do {
//从takeIndex 下标开始,寻找要被删除的元素
if (o.equals(items[i])) {
//根据下标,删除元素
removeAt(i);
return true;
}
//当前删除索引执行加 1 后判断是否与数组长度相等
//若为 true,说明索引已到数组尽头,将 i 设置为 0
if (++i == items.length)
i = 0;
//继续查找,直到找到最后一个元素
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
//根据下标删除元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//要删除的元素在takeIndex位置
if (removeIndex == takeIndex) {
//将takeIndex位置的元素置为null
items[takeIndex] = null;
//类似于putIndex,当takeIndex到队列尾部时,从头开始
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//要删除的元素不在takeIndex位置
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
//触发 因为队列满了以后导致的被阻塞的线程
notFull.signal();
}
- 2、元素出队
//当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当队列中有元素时,元素出队
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
- 3、带有超时功能的元素出队
//带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//当队列中没有元素时,进行超时时间等待
while (count == 0) {
if (nanos <= 0)
return null;
//更新等待超时时间
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//元素出队
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//出队位置的元素置为null
items[takeIndex] = null;
//如果takeIndex到了末尾,从队列头部开始
if (++takeIndex == items.length)
takeIndex = 0;
//元素个数-1
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒因队列满而被阻塞的线程
notFull.signal();
return x;
}
- 4、带有阻塞功能的元素出队
//基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,
//直到队列中有新的数据可以消费
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//当队列中没有元素时,就一直等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
2.4 获取元素
获取队首元素。
//获取头元素如果没有获取到则返回null。仅仅是获取不移除
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//根据takeIndex,获取队首元素
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
2.5 清空队列
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
//队列不为空
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//将数组元素逐个值为null
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
//数组中元素清空时,入队索引和出队索引处在了同一个位置上
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--)
//检测当前是否有线程已调用condition.await()并且处于await状态
notFull.signal();
}
} finally {
lock.unlock();
}
}
【LinkedBlockingQueue】
一、LinkedBlockingQueue介绍
LinkedBlockingQueue是一个基于单向链表实现的可选容量的阻塞队列,内部是由节点Node构成。在LinkedBlockingQueue中,队列的头节点head是不存元素的,它的item是Null,next指向的元素才是真正的第一个元素,它也有两个用于阻塞等待的Condition条件对象。
LinkedBlockingQueue的结构:
1.1 LinkedBlockingQueue的特点*
- 1、基于链表实现的队列,从头部获取元素,在尾部插入元素,比基于数组的队列吞吐量更高。
- 2、双锁队列的变种实现,一把写锁,一把读锁。
使用两把锁,可以保证元素的插入和删除并不互斥,从而能够同时进行,达到提高吞吐量的的效果。 - 3、默认队列的大小是Integer的最大值,如果添加速度大于读取速度的话,有可能造成内存溢出。
LinkedBlockingQueue,底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性。与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE。 - 4、因为是两把锁,所以元素的个数使用了一个原子类型的变量来维护(AtomicInteger)。
1.3 LinkedBlockingQueue的使用
- 1、构造方法
//创建一个 LinkedBlockingQueue ,容量为Integer.MAX_VALUE
public LinkedBlockingQueue()
//创建一个具有指定容量的 LinkedBlockingQueue
public LinkedBlockingQueue(int capacity)
- 2、插入元素
//如果可以在不超过队列的容量的情况下立即将其指定的元素插入到
//队列的尾部,如果队列已满,则不能插入元素
public boolean offer(E e)
//在该队列的尾部插入指定的元素,如果队列已满则等待,可以等待指定的时间
public boolean offer(E e, long timeout, TimeUnit unit)
//在该队列的尾部插入指定的元素,如果队列满,则阻塞
public void put(E e)
- 3、删除元素
//从这个队列中原子地删除所有的元素
public void clear()
//检索并删除此队列的头元素,如果此队列为空,则返回Null
public E poll()
//检索并删除此队列的头元素,队列为空时则等待指定的时间
public E poll(long timeout, TimeUnit unit)
//从该队列中删除指定元素的单个实例
public boolean remove(Object o)
//检索并删除此队列的头元素,如果队列为空,则阻塞
public E take()
- 4、获取队首元素
//检索但不删除此队列的头元素,如果此队列为空,则返回Null
public E peek()
二、从源码理解LinkedBlockingQueue
2.1 节点
先看LinkedBlockingQueue中所存储的节点的定义:
static class Node<E> {
//当前节点value
E item;
//下一个节点
Node<E> next;
Node(E x) { item = x; }
}
从这个节点定义只有next指针可以看出,LinkedBlockingQueue是一个单向链表构成的队列
。
LinkedBlockingQueue中的变量:
//队列容量
private final int capacity;
//当前队列元素数量
private final AtomicInteger count = new AtomicInteger();
//头结点,不存数据
transient Node<E> head;
//尾节点
private transient Node<E> last;
//出队锁,只有take,poll方法会持有
private final ReentrantLock takeLock = new ReentrantLock();
//出队等待条件
private final Condition notEmpty = takeLock.newCondition();
//入队锁,只有put,offer会持有
private final ReentrantLock putLock = new ReentrantLock();
//入队等待条件
private final Condition notFull = putLock.newCondition();
2.2 构造方法
创建LinkedBlockingQueue对象时,可以不指定初始容量,也可以指定初始容量。
//未指定队列容量时,容量为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//指定初始容量的队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点
last = head = new Node<E>(null);
}
2.3 添加元素
即元素入队。
public boolean offer(E e) {
//不能添加非null元素
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果队列已满,不能再往队列中添加元素
if (count.get() == capacity)
return false;
int c = -1;
//构建新节点
Node<E> node = new Node<E>(e);
//获取入队锁putLock
final ReentrantLock putLock = this.putLock;
//加锁
putLock.lock();
try {
//如果队列未满,元素入队
if (count.get() < capacity) {
enqueue(node);
//更新队列中元素数量
c = count.getAndIncrement();
//如果队列未满
if (c + 1 < capacity)
//唤醒一个入队条件队列中阻塞的线程
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
//节点数量为0,说明队列是空的
if (c == 0)
//唤醒一个出队条件队列阻塞的线程
signalNotEmpty();
return c >= 0;
}
//唤醒一个出队条件队列阻塞的线程
private void signalNotEmpty() {
//获取出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
元素入队的具体方法是:enqueue(node),实现:
private void enqueue(Node<E> node) {
//将当前尾节点的next指针指向新节点,再把last指向新节点
last = last.next = node;
}
图示:
- 2、带超时功能的入队
//在队尾插入一个元素,超时则不尝试,支持中断。当前节点入队后,如果
//队列没满,就唤醒一个入队的线程让其入队
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
//转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
//获取入队锁,支持中断
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队列已满
while (count.get() == capacity) {
//如果超时,返回false
if (nanos <= 0)
return false;
//更新时间
nanos = notFull.awaitNanos(nanos);
}
//元素入队
enqueue(new Node<E>(e));
//更新队列中元素数量
c = count.getAndIncrement();
//说明当前元素后面还能再插入一个
//就唤醒一个入队条件队列中阻塞的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//节点数量为0,说明队列是空的
if (c == 0)
//唤醒一个出队条件队列阻塞的线程
signalNotEmpty();
return true;
}
- 3、带阻塞功能的入队
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//创建节点
Node<E> node = new Node<E>(e);
//获取入队锁,支持中断
final ReentrantLock putLock = this.putLock;
//获取队列中元素数量
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队列满了,进入阻塞状态
while (count.get() == capacity) {
notFull.await();
}
//元素入队
enqueue(node);
c = count.getAndIncrement();
//如果还可以插入元素,那么释放等待的入队线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//通知出队线程队列非空
if (c == 0)
signalNotEmpty();
}
2.4 删除元素
- 1、元素出队
//检索并删除此队列的头元素
public E poll() {
final AtomicInteger count = this.count;
//队列未空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//队列不空则出队
if (count.get() > 0) {
x = dequeue();
//队列中数量-1
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
- 2、带超时功能的元素出队
//检索并删除此队列的头元素,可以超时等待
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
//转化为纳秒
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
//获取独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//超时时间到,返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//元素出队
x = dequeue();
c = count.getAndDecrement();
//如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
if (c == capacity)
signalNotFull();
return x;
}
- 3、带阻塞功能的元素出队
//检索并删除此队列的头元素
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取独占锁,可中断
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空,进入阻塞状态
while (count.get() == 0) {
notEmpty.await();
}
//队列不为空,元素出队
x = dequeue();
//队列中元素个数-1
c = count.getAndDecrement();
//如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果出队前,队列是满的,则唤醒一个被take()阻塞的线程
if (c == capacity)
signalNotFull();
return x;
}
- 4、删除队列中的指定元素
public boolean remove(Object o) {
//因为队列不包含null元素,返回false
if (o == null) return false;
//获取两把锁
fullyLock();
try {
//从头的下一个节点开始遍历
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//如果匹配,那么将节点从队列中移除,trail表示前驱节点
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//释放两把锁
fullyUnlock();
}
}
2.5 查找元素
获取队首元素。
//检索但不删除此队列的头元素
public E peek() {
//队列未空,返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//因为head实际上是个空节点,所以要返回的是head的下一个节点
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
【PriorityBlockingQueue 】
一、PriorityBlockingQueue介绍
PriorityBlockingQueue是一个带有优先级的阻塞队列。底层数组实现二叉堆,数组可变,所以是支持优先级无界阻塞队列,ReentrantLock、Condition实现线程安全。
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数Comparator 来指定排序规则。
PriorityBlockingQueue并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue是有界队列,LinkedBlockingQueue也可以通过在构造函数中传入 capacity 指定队列最大的容量,但PriorityBlockingQueue只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是PriorityQueue的线程安全版本。不可以插入null值。同时,插入队列的对象必须是可比较大小的,否则报ClassCastException异常。它的插入操作put方法不会阻塞,因为它是无界队列(take方法在队列为空的时候会阻塞)。
1.1 PriorityBlockingQueue的特点*
- 1、初始化时,可指定队列的大小。未指定容量大小时,初始容量为11,可以自动扩容。
- 2、扩容的规则:如果旧的容量小于64,那么新的容量为:
2*旧的容量+2
,如果旧容量大于64,那么新的容量为1.5*旧的容量
。 - 3、并不是FIFO,而是经过排序后,默认优先级最高的元素(默认是小顶堆,即最小值)先出队。
- 4、线程安全。
- 5、元素按照其自然顺序进行升序排序(即存储的对象必须是实现了Comparable接口的),当然也可以指定Comparator进行排序。
- 6、PriorityBlockingQueue是无界的阻塞队列,它并不会阻塞生产者插入元素,当生产速率大于消费速率时,时间一长,可能导致内存溢出。
- 7、PriorityBlockingQueue类似于ArrayBlockingQueue,其内部都使用了一个独占锁来控制同时只有一个线程可以进行入队和出队的操作,另外PriorityBlockingQueue只使用了notEmpty条件变量,而没有使用notFull变量,这是因为前者是无界队列,当put的时候永远都不会处于await。
1.2 PriorityBlockingQueue的使用
- 1、构造方法
//创建一个初始容量为11,按照自然顺序排序的PriorityBlockingQueue对象
public PriorityBlockingQueue()
//创建一个初始容量为initialCapacity,按照自然顺序排序的PriorityBlockingQueue对象
public PriorityBlockingQueue(int initialCapacity)
//创建一个初始容量为initialCapacity,按照自然顺序排序的PriorityBlockingQueue对象
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
- 2、添加元素
public boolean add(E e)
public void put(E e)
public boolean offer(E e)
- 3、删除元素
//如果队列为空,则阻塞等待有可用元素后重试,否则移除并返回优先级最高的元素
public E take() throws InterruptedException
//如果队列为空,则立即返回Null;否则移除并返回优先级最高的元素
public E poll()
//在指定的超时时间内尝试移除并返回优先级最高的元素,如果已经超时,则返回Null
public E poll(long timeout, TimeUnit unit) throws InterruptedException
- 4、获取元素
//返回优先级最高的元素,但不移除,如果队列为空则返回Null
public E peek()
二、从源码理解PriorityBlockingQueue
变量:
//队列的默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//队列的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//放元素的数组
private transient Object[] queue;
//当前队列中元素的数量
private transient int size;
//元素比较器,如果按照自然顺序进行排序,则为null
private transient Comparator<? super E> comparator;
//全局锁
private final ReentrantLock lock;
//Condition对象
private final Condition notEmpty;
//扩容时用的CAS锁标记
private transient volatile int allocationSpinLock;
//优先级队列
private PriorityQueue<E> q;
2.1 构造方法
可以指定初始容量,也可以不指定。
//创建一个初始容量为11,按照自然顺序排序的PriorityBlockingQueue实例
public PriorityBlockingQueue() {
this(PriorityBlockingQueue.DEFAULT_INITIAL_CAPACITY, null);
}
//创建一个初始容量为initialCapacity,按照自然顺序排序的PriorityBlockingQueue实例
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
2.2 添加元素
扩容基于CAS+Lock
。CAS控制创建新的数组原子执行,Lock控制数组替换原子执行。
public boolean add(E e) {
return offer(e);
}
/**
* 将目标元素插入到队列中,由于是无界的,不会被阻塞
*/
@Override
public void put(E e) {
offer(e); // never need to block
}
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
final ReentrantLock lock = this.lock;
lock.lock();
/**
* n:length
* cap:capacity
*/
int n, cap;
Object[] array;
// 元素个数超出队列长度,则进行扩容
while ((n = size) >= (cap = (array = queue).length)) {
tryGrow(array, cap);
}
try {
final Comparator<? super E> cmp = comparator;
if (cmp == null) {
// 自然顺序的插入
PriorityBlockingQueue.siftUpComparable(n, e, array);
} else {
// 使用指定比较器的插入
PriorityBlockingQueue.siftUpUsingComparator(n, e, array, cmp);
}
size = n + 1;
// 唤醒在非空条件上阻塞等待的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 当前没有线程在执行扩容 && 原子更新扩容标识为 1
if (allocationSpinLock == 0 &&
PriorityBlockingQueue.ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
try {
/**
* 1)旧容量小于 64,执行【双倍+2】扩容
* 2)旧容量大于等于 64,执行1.5倍向下取整扩容
*/
int newCap = oldCap + (oldCap < 64 ?
oldCap + 2 : // grow faster if small
oldCap >> 1);
// 新容量超出最大容量
if (newCap - PriorityBlockingQueue.MAX_ARRAY_SIZE > 0) { // possible overflow
final int minCap = oldCap + 1;
// 如果已经溢出,则抛出 OutOfMemoryError 异常
if (minCap < 0 || minCap > PriorityBlockingQueue.MAX_ARRAY_SIZE) {
throw new OutOfMemoryError();
}
// 写入最大容量
newCap = PriorityBlockingQueue.MAX_ARRAY_SIZE;
}
// 创建新的对象数组
if (newCap > oldCap && queue == array) {
newArray = new Object[newCap];
}
} finally {
// 重置扩容标记
allocationSpinLock = 0;
}
}
// 说明已经有线程在执行扩容,则等待其扩容完成
if (newArray == null) {
Thread.yield();
}
lock.lock();
if (newArray != null && queue == array) {
// 扩容成功的线程会将元素从旧数组拷贝到新数组中
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
/**
* 这里主要就是向堆中插入元素,并且插入的同时要调整堆,保持最小堆状态
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
final Comparable<? super T> key = (Comparable<? super T>) x;
// 插入元素的目标索引
while (k > 0) {
// 计算父节点索引
final int parent = k - 1 >>> 1;
// 读取父节点值
final Object e = array[parent];
// 新增元素已经 >= 当前节点,则无需上移
if (key.compareTo((T) e) >= 0) {
break;
}
// 父节点元素下移
array[k] = e;
// 递归比较祖父节点
k = parent;
}
// 插入目标元素
array[k] = key;
}
/**
* 实现逻辑和 siftUpComparable 一致
*/
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
final int parent = k - 1 >>> 1;
final Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0) {
break;
}
array[k] = e;
k = parent;
}
array[k] = x;
}
2.3 获取元素
返回优先级最高的元素,但不移除,如果队列为空则返回Null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
2.4 删除元素
- 1、带有阻塞功能地获取优先级最高的元素
如果队列为空,则阻塞等待有可用元素后重试,否则移除并返回优先级最高的元素。
/**
* 如果队列为空,则阻塞等待有可用元素后重试,否则移除并返回优先级最高的元素
*/
@Override
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 尝试移除并返回优先级最高的元素
while ( (result = dequeue()) == null) {
// 当前线程在非空条件上阻塞等待,被唤醒后重试
notEmpty.await();
}
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
// 计算尾部元素索引
final int n = size - 1;
// 队列为空,则返回 null
if (n < 0) {
return null;
} else {
final Object[] array = queue;
// 读取优先级最高的元素
final E result = (E) array[0];
// 读取尾部元素
final E x = (E) array[n];
// 清空尾部元素
array[n] = null;
final Comparator<? super E> cmp = comparator;
if (cmp == null) {
PriorityBlockingQueue.siftDownComparable(0, x, array, n);
} else {
PriorityBlockingQueue.siftDownUsingComparator(0, x, array, n, cmp);
}
size = n;
return result;
}
}
/**
*
* @param k 需要填充的目标索引
* @param x 需要插入的目标元素
* @param array 持有对象的数组
* @param n 堆大小
*/
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
final Comparable<? super T> key = (Comparable<? super T>)x;
// 计算二分索引
final int half = n >>> 1; // loop while a non-leaf
while (k < half) {
// 计算左子节点索引
int child = (k << 1) + 1; // assume left child is least
// 读取节点值
Object c = array[child];
// 计算右子节点索引
final int right = child + 1;
/**
* 右子节点索引小于目标堆大小 &&
* 左子节点值 > 右子节点值
*/
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0) {
// 读取右子节点值,更新查找节点索引
c = array[child = right];
}
// 目标键已经小于查找节点,则可以直接插入
if (key.compareTo((T) c) <= 0) {
break;
}
// 否则,提升子节点为父节点
array[k] = c;
// 迭代处理子节点
k = child;
}
// 插入目标值
array[k] = key;
}
}
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
final int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
final int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0) {
c = array[child = right];
}
if (cmp.compare(x, (T) c) <= 0) {
break;
}
array[k] = c;
k = child;
}
array[k] = x;
}
}
- 2、获取优先级最高的元素
/**
* 如果队列为空,则立即返回Null;否则移除并返回优先级最高的元素。
*/
@Override
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
- 3、带有超时功能地获取优先级最高的元素
/**
* 在指定的超时时间内尝试移除并返回优先级最高的元素,如果已经超时,则返回 null.
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 尝试移除并返回优先级最高的元素 && 未超时
while ( (result = dequeue()) == null && nanos > 0) {
// 当前线程在非空条件上阻塞等待,被唤醒后重试
nanos = notEmpty.awaitNanos(nanos);
}
} finally {
lock.unlock();
}
return result;
}
【DelayQueue】
一、DelayQueue介绍
DelayQueue是一个支持延时获取元素的无界阻塞队列,队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延时期满(也就是过期元素),才能从队列中提取元素。
PriorityQueue是一种优先级的队列,队列中的元素会按照优先级进行排序。
1.1 DelayQueue的特点*
- 1、DelayQueue内部通过组合PriorityQueue,来实现存储和维护元素顺序的。
- 2、DelayQueue存储元素必须实现Delayed接口,通过实现Delayed接口,可以获取到元素延迟时间,以及可以比较元素大小(Delayed 继承Comparable)。
- 3、DelayQueue通过一个可重入锁来控制元素的入队出队行为。
- 4、PriorityQueue只负责存储数据以及维护元素的顺序,对于延迟时间取数据是在DelayQueue中进行判断控制的。
- 5、DelayQueue没有实现序列化接口。
1.2 DelayQueue的使用
- 1、构造方法
//默认构造方法
public DelayQueue()
//通过集合初始化
public DelayQueue(Collection<? extends E> c)
- 2、入队
//将指定的元素插入到此队列中,在成功时返回true
public boolean add(E e)
//将指定的元素插入到此队列中,在成功时返回 true
public boolean offer(E e)
//将指定的元素插入到此队列中,在成功时返回 true
public boolean offer(E e, long timeout, TimeUnit unit)
//将指定的元素插入到此队列中
public void put(E e)
- 3、出队
//获取并移除此队列的头,如果此队列为空,则返回Null
public E poll()
//获取并移除此队列的头部,在指定的等待时间前等待
public E poll(long timeout, TimeUnit unit) throws InterruptedException
//获取并移除此队列的头部,在元素变得可用之前一直等待
public E take() throws InterruptedException
- 4、获取元素
//调用此方法,可以返回队头元素,但是元素并不出队
public E peek()
二、从源码理解DelayQueue
2.1 Delayed接口
public interface Delayed extends Comparable<Delayed> {
//返回剩余时间
long getDelay(TimeUnit unit);
}
public interface Comparable<T> {
// 定义比较方法
public int compareTo(T o);
}
DelayQueue中的元素都要实现Delayed接口,Delayed接口有一个getDelay 方法,该方法用来告知延迟到期有多长的时间,或者延迟在多长时间之前已经到期。
为了排序,Delayed接口还继承了Comparable接口,因此必须实现compareTo(),使其可以进行元素的比较。
2.2 成员变量
//可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
//存储元素的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//基于Leader-Follower模式的变体,用于尽量减少不必要的线程等待
private Thread leader = null;
//条件控制,表示是否可以从队列中取数据
private final Condition available = lock.newCondition();
使用ReentrantLock独占锁实现线程同步,使用Condition实现等待通知机制。
基于Leader-Follower模式的变体,减少不必要的线程等待。
内部使用PriorityQueue优先级队列存储元素,且队列中元素必须实现Delayed接口。
2.3 构造方法
public DelayQueue() {}
//通过集合初始化
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
DelayQueue内部实现是PriorityQueue,对元素的操作都是通过PriorityQueue来实现的。
DelayQueue的构造方法中未指定什么参数,对于PriorityQueue都是使用的默认参数。因此,既不能通过DelayQueue来指定PriorityQueue的初始大小,也不能使用指定的Comparator(元素本身就需要实现了Comparable)。
2.4 入队
将指定的元素插入到此队列中,在成功时返回 true。
//将指定的元素插入到此队列中,在成功时返回true
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
//获取锁
lock.lock();
try {
//通过PriorityQueue 来将元素入队
q.offer(e);
//peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
- 2、带有超时功能地元素入队
因为是无界队列,因此不会出现”队满”(超出最大值会抛异常),指定一个等待时间将元素放入队列中并没有意义,队列没有达到最大值那么会入队成功,达到最大值,则失败,不会进行等待。
public boolean offer(E e, long timeout, TimeUnit unit) {
//调用offer 方法
return offer(e);
}
2.5 出队
- 1、元素出队
//获取并移除此队列的头,如果此队列为空,则返回Null
public E poll() {
final ReentrantLock lock = this.lock;
//获取同步锁
lock.lock();
try {
//获取队头
E first = q.peek();
//如果队头为null 或者 延时还没有到,则返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll(); //元素出队
} finally {
lock.unlock();
}
}
- 2、带有超时功能地元素出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//超时等待时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中断的获取锁
lock.lockInterruptibly();
try {
//无限循环
for (;;) {
//获取队头元素
E first = q.peek();
//队头为空,也就是队列为空
if (first == null) {
//达到超时指定时间,返回null
if (nanos <= 0)
return null;
else
// 如果还没有超时,那么再available条件上进行等待nanos时间
nanos = available.awaitNanos(nanos);
} else {
//获取元素延迟时间
long delay = first.getDelay(NANOSECONDS);
//延时到期
if (delay <= 0)
return q.poll(); //返回出队元素
//延时未到期,超时到期,返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 超时等待时间 < 延迟时间 或者有其它线程再取数据
if (nanos < delay || leader != null)
//在available 条件上进行等待nanos 时间
nanos = available.awaitNanos(nanos);
else {
//超时等待时间 > 延迟时间 并且没有其它线程在等待,那么当前元素成为leader,表示leader 线程最早 正在等待获取元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//等待 延迟时间 超时
long timeLeft = available.awaitNanos(delay);
//还需要继续等待 nanos
nanos -= delay - timeLeft;
} finally {
//清除 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//唤醒阻塞在available 的一个线程,表示可以取数据了
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}
此方法的逻辑:
- 如果队列为空,如果超时时间未到,则进行等待,否则返回null。
- 队列不空,取出队头元素,如果延迟时间到,则返回元素,否则 如果超时 时间到 返回null。
- 超时时间未到,并且超时时间< 延迟时间或者有线程正在获取元素,那么进行等待。
- 超时时间> 延迟时间,那么肯定可以取到元素,设置leader为当前线程,等待延迟时间到期。
Condition条件在阻塞时会释放锁,在被唤醒时会再次获取锁,获取成功才会返回。
当进行超时等待时,阻塞在Condition上后会释放锁,一旦释放了锁,那么其它线程就有可能参与竞争,某一个线程就可能会成为leader(参与竞争的时间早,并且能在等待时间内能获取到队头元素那么就可能成为leader)。
leader是用来减少不必要的竞争,如果leader不为空说明已经有线程在取了,设置当前线程等待即可。(leade 就是一个信号,告诉其它线程:你们不要再去获取元素了,它们延迟时间还没到期,我都还没有取到数据呢,你们要取数据,等我取了再说)
- 3、带有等待功能的元素出队
该方法就是相当于在前面的超时等待中,把超时时间设置为无限大,那么这样只要队列中有元素,要是元素延迟时间要求,那么就可以取出元素,否则就直接等待元素延迟时间到期,再取出元素,最先参与等待的线程会成为leader。
//获取并移除此队列的头部,在元素变得可用之前一直等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
//延迟到期
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
//如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//等待延迟时间到期
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
2.6 获取元素
//调用此方法,可以返回队头元素,但是元素并不出队
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//返回队列头部元素,元素不出队
return q.peek();
} finally {
lock.unlock();
}
}