1、阻塞队列的作用
- 当阻塞队列是空时, 从队列中获取元素的操作将会被阻塞;当阻塞队列是满时, 往队列中添加元素的操作将会被阻塞
- 试图从空的阻塞队列中获取元素的线程也将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他线程从队列中移除一个或者多个元素或者全清空队列后使队列重新变得空闲并后续新增
2、为什么使用阻塞队列
在多线程领域,所谓阻塞,在某些情况下会挂起线程,一旦满足条件,被挂起的线程又会自动被唤醒。在juc并发包中没有提供队列的实现时,我们需要自行关注细节,如何时唤醒、何时阻塞,资源和性能的平衡等问题。而现在JUC包中提供了BlockingQueue,提供了一系列api,可以很方便地实现多线程下高效、安全地处理资源的场景。
3、BlockingQueue的核心API
核心方法:插入、移除、检查
- 抛出异常:当阻塞队列满时, 再往队列里面add插入元素会抛IllegalStateException: Queue full;当阻塞队列空时, 再往队列Remove元素时候回抛出NoSuchElementException
- 特殊值:插入方法,成功返回true 失败返回false;移除方法,成功返回元素,队列为空,就返回null
- 一直阻塞:当阻塞队列满时,生产者继续往队列里面put元素,队列会一直阻塞,直到put数据or响应中断退出;当阻塞队列空时,消费者试图从队列take元素,队列会一直阻塞消费者线程直到队列可用
- 超时退出:当阻塞队列满时,队列会阻塞生产者线程一定时间, 超过限时后生产者线程就会退出
示例
package com.panda00hi.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author panda00hi
* @date 2022/4/11
*/
public class BlockingQueueDemo {
public static void main(String[] args) {
// List list = new ArrayList();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// 超过限制大小,报异常
// System.out.println(blockingQueue.add("d"));
System.out.println("=============");
// 队列首部元素
System.out.println(blockingQueue.element());
// 删掉队首元素
System.out.println(blockingQueue.remove());
// 再次取队首元素
System.out.println(blockingQueue.element());
}
}
4、架构简介
体系图(建议大家自行通过idea的UML功能查看)
- ArrayBlockingQueue: 由数组结构组成的有界阻塞队列
- LinkedBlockingDeque: 由链表结构组成的有界(但大小默认值Integer>MAX_VALUE)阻塞队列
- PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
- DelayQueue: 使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue: 不存储元素的阻塞队列,也即是单个元素的队列(有且只有一个,生产一个消费一个)
注意:
1)SynchronousQueue没有容量与其他BlcokingQueue不同,SynchronousQueue是一个不存储元素的BlcokingQueue。每个put操作必须要等待一个take操作,否则阻塞,不能继续添加元素,反之亦然。
2)LinkedTransferQueue: 由链表结构组成的无界阻塞队列
3)LinkedBlockingDeque:由链表结构组成的双向阻塞队列
5、使用场景
- 生产者消费者模式
- 线程池
- 消息中间件
5.1 生产者消费者(传统实现)
场景:一个初始值为0的变量,两个线程对其进行交替操作,一个加1,一个减1,共 5 轮
关键点:wait、notify
package com.panda00hi.juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 生产者消费者模型(传统方式实现)
* 传统方式:通过wait、notify去实现
* 场景:一个初始值为0的变量,两个线程对其进行交替操作,一个加1,一个减1,来 5 轮
*
* @author panda00hi
* @date 2022/4/11
*/
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.increment();
}
}, "AAA 生产").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
}, "BBB 消费").start();
}
}
/**
* 资源类
*/
class ShareData {
private int num = 0;
private final Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
/**
* 加操作,模拟生产过程
*/
public void increment() {
// 相当于同步代码块加锁
lock.lock();
try {
// 1 判断
while (num != 0) {
// 等待,不能生产
condition.await();
}
// 2 干活。当前num为0了,需要进行生产,即加操作
num++;
System.out.println(Thread.currentThread().getName() + " " + num);
// 3 通知唤醒
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 减操作,模拟生产过程
*/
public void decrement() {
// 相当于同步代码块加锁
lock.lock();
try {
// 1 判断
while (num == 0) {
// 等待,不能消费
condition.await();
}
// 2 干活。消费完成,num减为0
num--;
System.out.println(Thread.currentThread().getName() + " " + num);
// 3 通知唤醒
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
}
运行结果:
AAA 生产 1
BBB 消费 0
AAA 生产 1
BBB 消费 0
AAA 生产 1
BBB 消费 0
AAA 生产 1
BBB 消费 0
AAA 生产 1
BBB 消费 0
5.2 生产者消费者(阻塞队列版)
关键:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
package com.panda00hi.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者消费者模型(阻塞队列实现)
*
* @author panda00hi
* @date 2022/4/11
*/
public class ProdConsumerBlockQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 生产者线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 消费者线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
// 暂停一会线程
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n5秒时间到,main线程调用停止方法,结束生产消费活动");
myResource.stop();
}
}
class MyResource {
// 默认开启,进行生产+消费过程
private volatile boolean FLAG = true;
private final AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 通过反射,方便后台排错时,查询传入的实际类的名字
System.out.println(blockingQueue.getClass().getName());
}
/**
* 生产动作
*
* @throws Exception
*/
public void myProd() throws Exception {
// while外声明,实现复用
String data = null;
boolean retValue;
while (FLAG) {
System.out.println(Thread.currentThread().getName() + "\tFLAG为true,生产进行中……");
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入队列值:" + data + " 成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入队列值:" + data + " 失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\tFLAG为false,生产动作结束");
}
/**
* 消费动作
*
* @throws Exception
*/
public void myConsumer() throws Exception {
// 生产和消费受同一个FLAG控制,要么都进行,要么都结束
String result = null;
while (FLAG) {
System.out.println((Thread.currentThread().getName() + "\tFLAG为true,消费进行中……"));
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t超过2秒钟没有取到值,FLAG改为false,消费退出");
return;
}
System.out.println((Thread.currentThread().getName() + "\t消费队列中的值:" + result + " 成功"));
}
}
/**
* 终止生产和消费
*/
public void stop() {
FLAG = false;
System.out.println("FLAG的值被修改为false,终止生产和消费");
}
}
运行结果:(注意由于是打印,所以打印的顺序可能不是最理想的顺序)
Prod 生产者线程启动
Prod FLAG为true,生产进行中……
Consumer 消费者线程启动
Consumer FLAG为true,消费进行中……
Prod 插入队列值:1 成功
Consumer 消费队列中的值:1 成功
Consumer FLAG为true,消费进行中……
Prod FLAG为true,生产进行中……
Consumer 消费队列中的值:2 成功
Consumer FLAG为true,消费进行中……
Prod 插入队列值:2 成功
Prod FLAG为true,生产进行中……
Prod 插入队列值:3 成功
Consumer 消费队列中的值:3 成功
Consumer FLAG为true,消费进行中……
Prod FLAG为true,生产进行中……
Prod 插入队列值:4 成功
Consumer 消费队列中的值:4 成功
Consumer FLAG为true,消费进行中……
5秒时间到,main线程调用停止方法,结束生产消费活动
FLAG的值被修改为false,终止生产和消费
Prod FLAG为false,生产动作结束
Consumer 超过2秒钟没有取到值,FLAG改为false,消费退出
更多使用场景,可参考这篇博客 https://cloud.tencent.com/developer/article/1636024