玖叶教程网

前端编程开发入门

多线程之阻塞队列BlockingQueue详解

上篇博文,我们简单的介绍了多线程之Semaphore,这篇我们重点介绍阻塞队列BlockingQueue。

什么是阻塞队列

阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

1、当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。

2、当阻塞队列是满时,从队列里添加元素的操作将会被阻塞。

上面阻塞队列的特点,也是阻塞队列与普通队列的区别。

为什么用阻塞队列,有什么好处?

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为一切BlockingQueue都给你一手包办了。

阻塞队列核心方法

相比Queue接口有两种形式的api,BlockingQueue则有四种形式的api,阻塞队列定义如果调用了某个函数可能当时不能立即满足结果 ,但很有可能在未来的某个时刻会满足。

四种api定义:

(1)调用函数失败,抛出异常

(2)调用失败,返回null或者false

(3)调用失败,当前线程无限阻塞直到成功

(4)阻塞指定是一段时间,如果还不能满足,就放弃该次操作。

具体看如下表格:

BlockingQueue队列的实现有无界限队列和有界限队列,任何时候如果put操作的值如果大于了界限值,那么put操作将会阻塞,其实严格意义的说阻塞队列也是有界限的内部默认最大值是 Integer.MAX_VALUE.

架构梳理和种类分析

对于队列的架构图就下图所示:

上图能够展示队列的几个核心的顶层接口和具体实现,其他中间接口有省略。队列的基类接口Queue它定义了所有实现队列的类必须拥有的方法行为而BlockingQueue阻塞队列接口继承了Queue接口,此外BlockingQueue队列接口是Java并发包里面所有实现线程安全队列的基类接口。基类接口Queue又继承了Collection接口,说明队列中的元素像操作集合一样,能够进行添加,删除,排序等等。

Java并发包里面实现BlockingQueue队列的的子类有如下7种:

对于阻塞队列,我们在日常的开发中重点关注三个:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。

阻塞队列用在哪里

  • 生产者消费者模式
  • 线程池
  • 消息中间件

使用阻塞队列实现生产者和消费者模式

class MyData{
 private volatile boolean FLAG = true;//默认开启,进行生产与消费
 private AtomicInteger atomicInteger = new AtomicInteger();
 BlockingQueue<String> blockingQueue = null;
 public MyData(BlockingQueue<String> blockingQueue) {
 this.blockingQueue = blockingQueue;
 System.out.println(blockingQueue.getClass().getName());
 }
 public void myProd() throws Exception{
 String data = null;
 Boolean retValue = null;
 while (FLAG){
 data = atomicInteger.incrementAndGet() + "";
 retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
 if(retValue){
 System.out.println(Thread.currentThread().getName() +"插入队列中数据" + data +"成功");
 }else {
 System.out.println(Thread.currentThread().getName() +"插入队列中数据" + data +"失败");
 }
 TimeUnit.MILLISECONDS.sleep(100);
 }
 System.out.println(Thread.currentThread().getName() +"生产停止");
 }
 public void myConsumer() throws Exception{
 String result = null;
 while (FLAG){
 result = blockingQueue.poll(2, TimeUnit.SECONDS);
 if(null == result || result.equalsIgnoreCase("")){
 FLAG = false;
 System.out.println(Thread.currentThread().getName() +"超过2s,没有取到数据,消费退出");
 return;
 }
 System.out.println(Thread.currentThread().getName() +"消费队列,消费" + result +"成功");
 }
 }
 //叫停生产
 public void stop(){
 this.FLAG = false;
 }
}
public class ProConsumer_BlockQueueDemo {
 public static void main(String[] args) {
 MyData myData = new MyData(new ArrayBlockingQueue<>(3));
 //生产线程
 new Thread(() -> {
 System.out.println("生产线程开启");
 try {
 myData.myProd();
 }catch (Exception exception){
 exception.printStackTrace();
 }
 }, "Product").start();
 //消费线程
 new Thread(() -> {
 System.out.println("消费线程开启");
 try {
 System.out.println("================");
 myData.myConsumer();
 System.out.println("================");
 }catch (Exception exception){
 exception.printStackTrace();
 }
 }, "Consumer").start();
 try{
 TimeUnit.SECONDS.sleep(1);
 }catch (InterruptedException exception){
 exception.printStackTrace();
 }
 System.out.println();
 System.out.println("大老板叫停");
 myData.stop();
 }
}
执行结果如下图所示:

从上图可以看到,只有生产者开始生产数据,消息者才能消息数据,否则队列为空,线程会阻塞2s。

总结

阻塞队列,理解起来还是比较简单的,对于阻塞队列更加底层总结,在我们介绍了AQS之后,会继续跟大家分享。

下篇博文给大家分享,JUC包中另外一个比较重要的接口,Condition的使用及原理解析。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言