LinkedBlockingQueue的put,take方法

put操作:在LinkedBlockingQueue 中有putlcok和takelock俩把锁,put操作使用putlock这把锁,利用lockInterruptibly方法加锁,该方法的作用为:如果该线程被标记为中断,可抛出异常。加锁之后,判断count是否等于容量,相等的话条用await()方法线线程加到条件队列中去,直到唤醒,把元素加入到队列,然后判断一下count+1如果还没有达到容量值的话,再次调用notfull的signal方法,因为有可能多个线程阻塞到notfull条件队列中。最后释放锁。

释放完锁之后,如果原来count值为0的话,现在加了一个元素,调用signalNotEmpty方法,执行notEmpty.signal(),值得注意的是此过程得用takelock加锁,原因是防止其他线程消费了。

public void put(E e) throws InterruptedException {
    // 不允许null元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 新建一个节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 使用put锁加锁
    putLock.lockInterruptibly();
    try {
        // 如果队列满了,就阻塞在notFull条件上
        // 等待被其它线程唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        // 队列不满了,就入队
        enqueue(node);
        // 队列长度加1
        c = count.getAndIncrement();
        // 如果现队列长度如果小于容量
        // 就再唤醒一个阻塞在notFull条件上的线程
        // 这里为啥要唤醒一下呢?
        // 因为可能有很多线程阻塞在notFull这个条件上的
        // 而取元素时只有取之前队列是满的才会唤醒notFull
        // 为什么队列满的才唤醒notFull呢?
        // 因为唤醒是需要加putLock的,这是为了减少锁的次数
        // 所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程
        // 说白了,这也是锁分离带来的代价
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    // 直接加到last后面
    last = last.next = node;
}    

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 加take锁
    takeLock.lock();
    try {
        // 唤醒notEmpty条件
        notEmpty.signal();
    } finally {
        // 解锁
        takeLock.unlock();
    }
}

 take操作:其实和put的操作相反就行,利用takelock加锁,如果count值为0,那么notEmpty.await()加入到notEmpty的条件队列中去直至唤醒,唤醒之后取队头元素,如果此时count-1还大于0,那么调用signal()唤醒阻塞再notEmpty条件队列的线程。然后释放锁。最后调用signalNotFull,唤醒阻塞在notfull条件队列上的线程,此过程利用putlock锁来防止其他线程执行put操作进而可能导致队列又满了

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 使用takeLock加锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列无元素,则阻塞在notEmpty条件上
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 否则,出队
        x = dequeue();
        // 获取出队前队列的长度
        c = count.getAndDecrement();
        // 如果取之前队列长度大于1,则唤醒notEmpty
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    // 如果取之前队列长度等于容量
    // 则唤醒notFull
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    // head节点本身是不存储任何元素的
    // 这里把head删除,并把head下一个节点作为新的值
    // 并把其值置空,返回原来的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 唤醒notFull
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}