LinkedBlockingQueue 和 LinkedBlockingDeque 概览

LinkedBlockingQueue和LinkedBlockingDeque,两个都是队列,只不过前者只能一端出一端入,后者则可以两端同时出入,并且都是结构改变线程安全的队列。其实两个队列从实现思想上比较容易理解,有以下特点:

链表结构(动态数组)
通过ReentrantLock实现锁
利用Condition实现队列的阻塞等待,唤醒

 

LinkedBlockingQueue

LinkedBlockingQueue 也是使用单向链表实现的,其也有两个 Node,分别用来存放首、尾节点, 并且还有一个初始值为0的原子变量count,用来记录 队列元素个数 。 另外还有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原 子性,其中 takeLock 用来控制同时只有 一个线程可以从队列头获取元素 ,其他线程必须 等待, putLock控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必 须等待。另外, notEmpty 和 notFull 是条件变量 ,它们内部都有 一个条件队列用来存放进队和出队时被阻塞的线程,其实这是生产者一消费者模型。

类图:

 

常量代码:

  /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

 

当调 用 线程在 LinkedBlockingQueue 实例上执行 take、 poll 等操作 时 需要获取到takeLock锁,从而保证同时只有一个线程可以操作链表头节点。 另外由于条件变量 notEmpty 内部的条件队列的维护使用的是 takeLock 的锁状态管理机制,所以在调 用 notEmpty 的 await 和 signal 方法前调用线程必须先获取到 takeLock 锁,否则会抛 出 Illega!MonitorStateException异常。 notEmpty 内部则维护着一个条件队列,当线 程获取到 takeLock 锁后调用 notEmpty 的 await 方法时,调用线程会被阻塞,然后 该线程会被放到 notEmpty 内部的条件队列进行等待,直到有线程调用了 notEmpty 的 signal 方法。

 

 

在 LinkedBlockingQueue 实例上执行 put、 offer 等操作时需要获取到 putLock 锁,从而保证 同 时只有一 个 线程可以操作链表尾节点。同样由于条件变量 notFull 内部的条件队列的维护使用的是 putLock的锁状态管理机制,所以在调用 notFull 的 await 和 signal 方法前调用线程必须先获取到 putLock 锁,否 则 会抛出 Illega!MonitorStateException 异常。 notFull 内部 则 维护着一 个 条件队列,当线程获 取到 putLock锁后调用 notFull 的 await方法时,调用线程会被阻塞, 然后该线程会 被放到 notFull 内部的条件队列进行等待,直到有线程调用了 notFull 的 signal方法。

 

 

初始容量, Integer.MAX_VALUE    =  2^31-1 

初始化首 、尾节 点 , 让它们指向哨兵节点

/**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

原理简介:

LinkedBlockingQueue 的内部是通过单向链表实现的,使用头、尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作.

对头、尾节点的操作分别使用了单独的独占锁从而保证了原子性,所以出队和入队操作是可以同时进行的 。 另外对头 、 尾节点的独占锁都配备了一个条件队 列,用来存放被阻塞的线程,并结合入队、出队操作实现了 一个生产消费模型 。

 

 

 

size操作

由于进行出队、入队操作时的 count是加了锁的 ,所以结果相比 ConcurrentLinkedQueue的 size 方法比较准确 。 这里考虑为何在 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不使用 一个原子变量呢?这是因为使用原子变量保存队列元素个数需要保证入队、出队 操作和原子变量操作是原子性操作, 而 ConcurrentLinkedQueue使用的是 CAS 无锁算法, 所以无法做到这样 。

 

remove操作

删除队列里面指定的元素,有则删除并返回 true,没有则返回 falsea

由于 remove方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元 素的过程中是线程安全的,并且此时其他调用入队、出队操作 的线程全部会被阻塞 。 另外, 获取多个资源锁的顺序与释放的顺序是相反的 。

 

 

 

LinkedBlockingDeque

双向链表结构 , 只有一个锁用来获取数据 . 是一个双端队列,任何一端都可以进行元素的出入 .