基于JDK 1.8 版本
概念
ArrayBlockingQueue是基于数组实现的阻塞队列,通过先进先出的顺序来访问元素。ArrayBlockingQueue是一个固定大小的有界队列,一旦创建,它的容量就不能再变化。如果向一个已经满的ArrayBlockingQueue中添加元素,则会导致该操作阻塞,直到该操作成功为止,同样的,如果从一个空的ArrayBlockingQueue中获取元素,也会导致阻塞。
使用
public static void main(String[] args) throws InterruptedException {
// 创建一个容量为10的对象
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(10);
// 初始化数据
for (int i = 0; i < 10; i++) {
arrayBlockingQueue.put(String.valueOf(i));
}
// 创建一个消费线程
Thread consumer = new Thread(() -> {
String value = "";
try {
// 等待10秒,降低消费频率
Thread.sleep(10000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
while ((value = arrayBlockingQueue.poll()) != null) {
System.out.println(value);
}
});
consumer.start();
// 主线程在arrayBlockingQueue满的情况下,再增加数据
long now = System.currentTimeMillis();
System.out.println("put 第11个数据,当前时间:" + now);
arrayBlockingQueue.put("aaaa");
System.out.println("put 第11个数据,花费:" + (System.currentTimeMillis() - now));
}
该段代码的逻辑是首先创建一个ArrayBlockingQueue对象,容量设置为10,并初始化数据。开启一个消费线程来消费该队列的数据,同时主线程在队列满的情况下,继续添加元素,为了达到等待效果,消费线程休眠10s钟,这时可以看到主线程在调用put方法时,会被阻塞。具体的执行结果如下:
分析
首先看下ArrayBlockingQueue的类图:
ArrayBlockingQueue实现了 Queue、Collection接口,所以它既有集合,也有队列相关的行为。
数据结构
/** The queued items */
final Object[] items; // 队列数组
/** items index for next take, poll, peek or remove */
int takeIndex; // 当前出队列的索引
/** items index for next put, offer, or add */
int putIndex; // 当前入队列的索引
/** Number of elements in the queue */
int count; // 当前队列的数量
/** Main lock guarding all access */
final ReentrantLock lock; // 并发锁
/** Condition for waiting takes */
private final Condition notEmpty; // 队列不空的信号量
/** Condition for waiting puts */
private final Condition notFull; // 队列不满的信号量
构造函数
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 根据传入的参数,来决定是否创建公平锁
lock = new ReentrantLock(fair);
// 同一个锁创建的两个condition
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
在 ArrayBlockingQueue数据结构中,ReentrantLock 创建了两个 Condition对象:
notEmpty: 当队列不空的时候,会触发信号量,通知等待的线程
notFull: 当队列未满时,会触发信号量,通知等待的线程
注: Condition对象,是由指定的Lock对象创建, 当调用 Condition的await 方法时,会自动释放lock锁,并将当前线程阻塞,直到有其他线程调用该Condition的signal 方法、signalAll 方法、或是其他线程调用当前线程的interupt 方法,在await 方法返回当前线程时,当前线程必须重新获取和该Condition关联的锁。
put 操作
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 可重入锁
final ReentrantLock lock = this.lock;
// 获取锁
lock.lockInterruptibly();
try {
// 如果当前已经满了,则等待信号量
while (count == items.length)
notFull.await();// 这里会释放锁,线程阻塞等待
// 如果成功,则元素入队
enqueue(e);
} finally {
// 解锁成功
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// notEmpty的信号量通知
notEmpty.signal();
}
put方法处理:
- 获取锁
- 如果获取锁成功,判断当前的元素数量是否满了,如果未满,直接加入队列,否则等待未满的信号量
- 释放锁
poll 方法
public E poll() {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 如果数量为空,则返回null,否则调用dequeue
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 返回taskIndex处的数据
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// notFull的信号量通知
notFull.signal();
return x;
}
处理逻辑:
- 获取锁
- 如果元素为空,返回 null, 如果不为空,返回takeIndex处理的元素,并处理notFull的信息量
- 释放锁
lock.lock() 与 lock.lockInterruptibly 的区别
在分析 offer与 put方法时,发现使用的锁定的方式是不同的
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// lock
lock.lock();
try {
...
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
...
} finally {
lock.unlock();
}
}
lock.lock()与 lock.lockInterruptibly()的差异在于,调用锁定的线程是否允许被其他线程调用该线程的interupt方法,通常情况下,当锁被其他线程持有时,另一个线程去申请时,会被阻塞,一直等待在那边,如 synchronized 关键字、lock.lock() 方法等, 但当调用 lock.lockInterruptibly() 方法时,如果线程被其他的线程调用了interupt 方法时,则可以使被阻塞的线程跳出阻塞等待的逻辑。