Java基础学习之并发篇:LinkedBlockingQueue和LinkedBlockingDeque

学习目标

在并发编程中,阻塞队列在多线程中的场景特别有用,比如在生产和消费者模型中,生产者生产数据到队列,队列满时需要阻塞线程,停止往队列生产。消费者消费队列,对队列为空时阻塞线程停止消费,在Java中有提供不同场景的阻塞队列,那么接下来我们将学习:LinkedBlockingQueueLinkedBlockingDeque两种阻塞队列。


LinkedBlockingQueue

上次我们学习了基础的有界阻塞队列ArrayBlockingQueue👉有兴趣可以了解下,明白了有界阻塞队列基本实现原理,而LinkedBlockingQueue与ArrayBlockingQueue比较而言,我们知道ArrayBlockingQueue是通过一个有界的数组对象来存储数据,而LinkedBlockingQueue是用了单链表来实现数据的存储,且相较于ArrayBlockingQueue是用两个锁分别来处理数据的生产和消费。实现类如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    }
  • capacity :提供可维护单向链表长度,不设置默认即Integer.MAX_VALUE
  • count :实际大小,即单向链表中当前节点个数
  • head:指向单向链表的表头
  • last :指向单向链表的表尾
  • takeLock :消费锁,保证线程安全
  • putLock :生产锁,保证线程安全
  • notEmpty :保证消费等待,数据队列空了线程进入等待
  • notFull:保证生产等待,数据队列满了线程进入等待

其Node节点定义是一个带头指针和尾指针的单向链表:
在这里插入图片描述
Node定义的结构如下

    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

可以看下生产put的过程

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    private void signalNotEmpty() {
       final ReentrantLock takeLock = this.takeLock;
       takeLock.lock();
       try {
           notEmpty.signal();
       } finally {
           takeLock.unlock();
       }
   }
   private void enqueue(Node<E> node) {
       // assert putLock.isHeldByCurrentThread();
       // assert last.next == null;
       last = last.next = node;
   }

大概流程如下
可以看到实现相差无几,通过两个等待条件实现相互等待。当前大小count在不超过总容量capacity情况下进入队列,即增加链表节点到链表尾部,last指针指向插入的节点。
在当前节点小于总容量capacity时,signal将当前的线程等待队列中第一个等待的线程唤醒。而当count为0时表示队列有一条数据了,可以让消费线程唤醒进行生产。


LinkedBlockingDeque

LinkedBlockingDeque这里乍一看好像和LinkedBlockingQueue连名字都没啥差别,实质上LinkedBlockingDeque后面是Deque,Deque是一个双端队列,是 “Double Ended Queue” 的缩写。双端队列是一个你可以从任意一端插入或者抽取元素的队列。实现了在队列头和队列尾的高效插入和移除。并且比传统的生产者-消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。
在这里插入图片描述
实现了
一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。
定义的成员

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {

    /** Doubly-linked list node class */
    static final class Node<E> {
        E item;

        Node<E> prev;

        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    transient Node<E> first;

    transient Node<E> last;

    /** Number of items in the deque */
    private transient int count;

    /** Maximum number of items in the deque */
    private final int capacity;

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();
    }

和LinkedBlockingQueue成员变量相比,发现Node除了加了一个prev指针,并无二致。
但通过继承关系可以发现实现了BlockingDeque接口,而该接口定义了如下方法:
头部操作

抛异常特定值阻塞超时
插入addFirst(o)offerFirst(o)putFirst(o)offerFirst(o, timeout, timeunit)
移除removeFirst(o)pollFirst(o)takeFirst(o)pollFirst(timeout, timeunit)
检查getFirst(o)peekFirst(o)

尾部操作

抛异常特定值阻塞超时
插入addLast(o)offerLast(o)putLast(o)offerLast(o, timeout, timeunit)
移除removeLast(o)pollLast(o)takeLast(o)pollLast(timeout, timeunit)
检查getLast(o)peekLast(o)

所以针对上述方法LinkedBlockingQueue类都有实现,四组不同的行为方式解释

  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,
  5. 等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

并且BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque