玖叶教程网

前端编程开发入门

JUC-BlockingQueue

ArrayBlockingQueue,是数组实现的线程安全的有界的阻塞队列,它实现了多线程对资源的竞争访问。

在基础集合中,LinkedList采用链表实现了Queue接口,但它是线程不安全的,ArrayBlockingQueue相当于线程安全的LinkedList。

ArrayBlockingQueue的线程安全是通过ReentraintLock和Condition实现的,线程阻塞队列由AQS实现,共享资源队列则由ArrayBlockingQueue中的数组实现。

ArrayBlockingQueue的有界表现在它是通过数组实现,且其内部有两个Condition,notEmpty和notFunn,分别用来实现空队列或满队列的线程等待和唤醒。

下面来分析ArrayBlockingQueue的源码。

属性

/** The queued items */
// 共享资源队列
final Object[] items;

/** items index for next take, poll, peek or remove */
// 下一次取的索引位置
int takeIndex;

/** items index for next put, offer, or add */
// 下一次存的索引位置
int putIndex;

/** Number of elements in the queue */
// 队列中元素的个数
int count;

/** Main lock guarding all access */
// 可重入锁
final ReentrantLock lock;

/** Condition for waiting takes */
// 资源队列不为空条件
private final Condition notEmpty;

/** Condition for waiting puts */
// 资源队列未满条件
private final Condition notFull;

构造方法

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and default access policy.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and the specified access policy.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    // 初始化指定长度的队列
    this.items = new Object[capacity];
    // 公平锁/非公平锁
    lock = new ReentrantLock(fair);
    // 不为空条件
    notEmpty = lock.newCondition();
    // 未满条件
    notFull =  lock.newCondition();
}

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity, the specified access policy and initially containing the
 * elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @param c the collection of elements to initially contain
 * @throws IllegalArgumentException if {@code capacity} is less than
 *         {@code c.size()}, or less than 1.
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);
    // 从其他集合中导入,所以需要对该集合加锁
    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

入队

add(E e) 非阻塞式

该方法调用offer方法。

public boolean add(E e) {
    return super.add(e);
}

offer(E e) 非阻塞式

该方法方法放入资源,如果队列满了,会返回false,线程不会阻塞

    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        // 检查元素是否为null,为null则抛出异常
        checkNotNull(e);
        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                // 如果队列已满则返回false
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            // 释放锁资源
            lock.unlock();
        }
    }

enqueue(E x)

该方法放入资源,是在加锁同步下执行的,如果队列满了,移动队尾指针,唤醒notempty上的阻塞线程

/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // 放入资源
    items[putIndex] = x;
    // 移动索引指针
    if (++putIndex == items.length)
        putIndex = 0;
    // 更新队列中实际元素数量
    count++;
    // 唤醒因为没有获取到资源被阻塞的线程
    // notEmpty上阻塞的线程
    notEmpty.signal();
}

put(E e) 阻塞式

/**
 * Inserts the specified element at the tail of this queue, waiting
 * for space to become available if the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    // 检查元素是否为null,为null则抛出异常
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁,可响应中断
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 如果满了就要阻塞等待
            notFull.await();
        // 被唤醒后再放入
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}

出队

E poll() 非阻塞式

public E poll() {
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 有资源则取出资源并返回
        return (count == 0) ? null : dequeue();
    } finally {
        // 释放锁资源
        lock.unlock();
    }
}

dequeue()

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 取出资源
    E x = (E) items[takeIndex];
    // 将位置置null
    items[takeIndex] = null;
    // 移动指针
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素个数减1
    count--;
    // 
    if (itrs != null)
        // 如果队列为空的对迭代器的操作
        itrs.elementDequeued();
    // 唤醒因队列满无法放入资源的阻塞线程
    // notFull上阻塞的线程
    notFull.signal();
    return x;
}

take() 阻塞式

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

E peek() 非阻塞式

只是获取队首元素,不出队。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 根据索引直接返回数据
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

总结

ArrayBlockingQueue队列中存放的是的共享资源,在多线程环境下,队列为空或队列为满将会使出队线程或入队线程阻塞;

  • ArrayBlockingQueue初始化需要指定容量,但不可扩容,可选择公平/非公平线程阻塞队列,默认非公平队列;
  • ArrayBlockingQueue的并发控制是通过ReentrantLock+Condition实现的;
  • ArrayBlockingQueue内部使用数组实现的环型阻塞式队列,用来存放共享资源;
  • ArrayBlockingQueue内部使用count变量,用来记录实际元素个数,元素的入队、出队都会和队列容量作比较,控制边界条件;
  • ArrayBlockingQueue对入队、出队操作分为阻塞式和非阻塞式;
  • 阻塞:put(E e) 、take() ,可响应中断;
  • 非阻塞:add(E e) 、offer(E e) 、E poll()、E peek() ;

ArrayBlockingQueue的使用就是简单的入队、出队操作,就不作演示了。

LinkedBlockingQueue

LinkedBlockingQueue是用单向链表代替ArrayBlockingQueue的数组实现的有界单向阻塞队列,可以指定队列的容量,默认容量大小等于Integer.MAX_VALUE。其并发控制同样是靠ReentraintLock和Condition实现。LinkedBlockingQueue有两把锁,takeLock和putLock,入队和出队操作分别加不同的锁;count计数器为AtomicInteger类型;

LinkedBlockingDeque

LinkedBlockingQueue是用双向链表来实现的有界双向阻塞队列,支持FIFO、FILO,可以指定队列的容量,默认容量大小等于Integer.MAX_VALUE。其并发控制同样是靠ReentraintLock和Condition实现。LinkedBlockingDeque只有一把锁来控制队列操作,因为双向队列可以在队首获取元素,也可以在队首放入元素,所以只能用一把锁来控制并发。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是用单向链表实现的无界单向阻塞队列,不能指定队列的容量。其并发访问是靠volatile+CAS实现。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言