ArrayBlockingQueue,是数组实现的线程安全的有界的阻塞队列,它实现了多线程对资源的竞争访问。
在基础集合中,LinkedList采用链表实现了Queue接口,但它是线程不安全的,ArrayBlockingQueue相当于线程安全的LinkedList。
ArrayBlockingQueue的线程安全是通过ReentraintLock和Condition实现的,线程阻塞队列由AQS实现,共享资源队列则由ArrayBlockingQueue中的数组实现。
ArrayBlockingQueue的有界表现在它是通过数组实现,且其内部有两个Condition,notEmpty和notFunn,分别用来实现空队列或满队列的线程等待和唤醒。
下面来分析ArrayBlockingQueue的源码。
属性
/** The queued items */
// 共享资源队列
final Object[] items;
/** items index for next take, poll, peek or remove */
// 下一次取的索引位置
int takeIndex;
/** items index for next put, offer, or add */
// 下一次存的索引位置
int putIndex;
/** Number of elements in the queue */
// 队列中元素的个数
int count;
/** Main lock guarding all access */
// 可重入锁
final ReentrantLock lock;
/** Condition for waiting takes */
// 资源队列不为空条件
private final Condition notEmpty;
/** Condition for waiting puts */
// 资源队列未满条件
private final Condition notFull;
构造方法
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化指定长度的队列
this.items = new Object[capacity];
// 公平锁/非公平锁
lock = new ReentrantLock(fair);
// 不为空条件
notEmpty = lock.newCondition();
// 未满条件
notFull = lock.newCondition();
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
// 从其他集合中导入,所以需要对该集合加锁
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
入队
add(E e) 非阻塞式
该方法调用offer方法。
public boolean add(E e) {
return super.add(e);
}
offer(E e) 非阻塞式
该方法方法放入资源,如果队列满了,会返回false,线程不会阻塞
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 检查元素是否为null,为null则抛出异常
checkNotNull(e);
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
// 如果队列已满则返回false
return false;
else {
enqueue(e);
return true;
}
} finally {
// 释放锁资源
lock.unlock();
}
}
enqueue(E x)
该方法放入资源,是在加锁同步下执行的,如果队列满了,移动队尾指针,唤醒notempty上的阻塞线程
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 放入资源
items[putIndex] = x;
// 移动索引指针
if (++putIndex == items.length)
putIndex = 0;
// 更新队列中实际元素数量
count++;
// 唤醒因为没有获取到资源被阻塞的线程
// notEmpty上阻塞的线程
notEmpty.signal();
}
put(E e) 阻塞式
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 检查元素是否为null,为null则抛出异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,可响应中断
lock.lockInterruptibly();
try {
while (count == items.length)
// 如果满了就要阻塞等待
notFull.await();
// 被唤醒后再放入
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
出队
E poll() 非阻塞式
public E poll() {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 有资源则取出资源并返回
return (count == 0) ? null : dequeue();
} finally {
// 释放锁资源
lock.unlock();
}
}
dequeue()
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 取出资源
E x = (E) items[takeIndex];
// 将位置置null
items[takeIndex] = null;
// 移动指针
if (++takeIndex == items.length)
takeIndex = 0;
// 元素个数减1
count--;
//
if (itrs != null)
// 如果队列为空的对迭代器的操作
itrs.elementDequeued();
// 唤醒因队列满无法放入资源的阻塞线程
// notFull上阻塞的线程
notFull.signal();
return x;
}
take() 阻塞式
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
E peek() 非阻塞式
只是获取队首元素,不出队。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 根据索引直接返回数据
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
总结
ArrayBlockingQueue队列中存放的是的共享资源,在多线程环境下,队列为空或队列为满将会使出队线程或入队线程阻塞;
- ArrayBlockingQueue初始化需要指定容量,但不可扩容,可选择公平/非公平线程阻塞队列,默认非公平队列;
- ArrayBlockingQueue的并发控制是通过ReentrantLock+Condition实现的;
- ArrayBlockingQueue内部使用数组实现的环型阻塞式队列,用来存放共享资源;
- ArrayBlockingQueue内部使用count变量,用来记录实际元素个数,元素的入队、出队都会和队列容量作比较,控制边界条件;
- ArrayBlockingQueue对入队、出队操作分为阻塞式和非阻塞式;
- 阻塞:put(E e) 、take() ,可响应中断;
- 非阻塞:add(E e) 、offer(E e) 、E poll()、E peek() ;
ArrayBlockingQueue的使用就是简单的入队、出队操作,就不作演示了。
LinkedBlockingQueue
LinkedBlockingQueue是用单向链表代替ArrayBlockingQueue的数组实现的有界单向阻塞队列,可以指定队列的容量,默认容量大小等于Integer.MAX_VALUE。其并发控制同样是靠ReentraintLock和Condition实现。LinkedBlockingQueue有两把锁,takeLock和putLock,入队和出队操作分别加不同的锁;count计数器为AtomicInteger类型;
LinkedBlockingDeque
LinkedBlockingQueue是用双向链表来实现的有界双向阻塞队列,支持FIFO、FILO,可以指定队列的容量,默认容量大小等于Integer.MAX_VALUE。其并发控制同样是靠ReentraintLock和Condition实现。LinkedBlockingDeque只有一把锁来控制队列操作,因为双向队列可以在队首获取元素,也可以在队首放入元素,所以只能用一把锁来控制并发。
ConcurrentLinkedQueue
ConcurrentLinkedQueue是用单向链表实现的无界单向阻塞队列,不能指定队列的容量。其并发访问是靠volatile+CAS实现。