Java基础学习之并发篇:LinkedBlockingQueue和LinkedBlockingDeque
学习目标
在并发编程中,阻塞队列在多线程中的场景特别有用,比如在生产和消费者模型中,生产者生产数据到队列,队列满时需要阻塞线程,停止往队列生产。消费者消费队列,对队列为空时阻塞线程停止消费,在Java中有提供不同场景的阻塞队列,那么接下来我们将学习:LinkedBlockingQueue和LinkedBlockingDeque两种阻塞队列。
LinkedBlockingQueue
上次我们学习了基础的有界阻塞队列ArrayBlockingQueue,👉有兴趣可以了解下,明白了有界阻塞队列基本实现原理,而LinkedBlockingQueue与ArrayBlockingQueue比较而言,我们知道ArrayBlockingQueue是通过一个有界的数组对象来存储数据,而LinkedBlockingQueue是用了单链表来实现数据的存储,且相较于ArrayBlockingQueue是用两个锁分别来处理数据的生产和消费。实现类如下:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
}
- capacity :提供可维护单向链表长度,不设置默认即Integer.MAX_VALUE
- count :实际大小,即单向链表中当前节点个数
- head:指向单向链表的表头
- last :指向单向链表的表尾
- takeLock :消费锁,保证线程安全
- putLock :生产锁,保证线程安全
- notEmpty :保证消费等待,数据队列空了线程进入等待
- notFull:保证生产等待,数据队列满了线程进入等待
其Node节点定义是一个带头指针和尾指针的单向链表:

Node定义的结构如下:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
可以看下生产put的过程:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
大概流程如下:
可以看到实现相差无几,通过两个等待条件实现相互等待。当前大小count在不超过总容量capacity情况下进入队列,即增加链表节点到链表尾部,last指针指向插入的节点。
在当前节点小于总容量capacity时,signal将当前的线程等待队列中第一个等待的线程唤醒。而当count为0时表示队列有一条数据了,可以让消费线程唤醒进行生产。
LinkedBlockingDeque
而LinkedBlockingDeque这里乍一看好像和LinkedBlockingQueue连名字都没啥差别,实质上LinkedBlockingDeque后面是Deque,Deque是一个双端队列,是 “Double Ended Queue” 的缩写。双端队列是一个你可以从任意一端插入或者抽取元素的队列。实现了在队列头和队列尾的高效插入和移除。并且比传统的生产者-消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。

实现了:
一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。
定义的成员:
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/** Doubly-linked list node class */
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
transient Node<E> first;
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
}
和LinkedBlockingQueue成员变量相比,发现Node除了加了一个prev指针,并无二致。
但通过继承关系可以发现实现了BlockingDeque接口,而该接口定义了如下方法:
头部操作
| 抛异常 | 特定值 | 阻塞 | 超时 | |
|---|---|---|---|---|
| 插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
| 移除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
| 检查 | getFirst(o) | peekFirst(o) |
尾部操作
| 抛异常 | 特定值 | 阻塞 | 超时 | |
|---|---|---|---|---|
| 插入 | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
| 移除 | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
| 检查 | getLast(o) | peekLast(o) |
所以针对上述方法LinkedBlockingQueue类都有实现,四组不同的行为方式解释:
- 抛异常:如果试图的操作无法立即执行,抛一个异常。
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,
- 等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
并且BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque