上篇博文,我们简单的介绍了多线程之Semaphore,这篇我们重点介绍阻塞队列BlockingQueue。
什么是阻塞队列
阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
1、当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
2、当阻塞队列是满时,从队列里添加元素的操作将会被阻塞。
上面阻塞队列的特点,也是阻塞队列与普通队列的区别。
为什么用阻塞队列,有什么好处?
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为一切BlockingQueue都给你一手包办了。
阻塞队列核心方法
相比Queue接口有两种形式的api,BlockingQueue则有四种形式的api,阻塞队列定义如果调用了某个函数可能当时不能立即满足结果 ,但很有可能在未来的某个时刻会满足。
四种api定义:
(1)调用函数失败,抛出异常
(2)调用失败,返回null或者false
(3)调用失败,当前线程无限阻塞直到成功
(4)阻塞指定是一段时间,如果还不能满足,就放弃该次操作。
具体看如下表格:
BlockingQueue队列的实现有无界限队列和有界限队列,任何时候如果put操作的值如果大于了界限值,那么put操作将会阻塞,其实严格意义的说阻塞队列也是有界限的内部默认最大值是 Integer.MAX_VALUE.
架构梳理和种类分析
对于队列的架构图就下图所示:
上图能够展示队列的几个核心的顶层接口和具体实现,其他中间接口有省略。队列的基类接口Queue它定义了所有实现队列的类必须拥有的方法行为而BlockingQueue阻塞队列接口继承了Queue接口,此外BlockingQueue队列接口是Java并发包里面所有实现线程安全队列的基类接口。基类接口Queue又继承了Collection接口,说明队列中的元素像操作集合一样,能够进行添加,删除,排序等等。
Java并发包里面实现BlockingQueue队列的的子类有如下7种:
对于阻塞队列,我们在日常的开发中重点关注三个:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。
阻塞队列用在哪里
- 生产者消费者模式
- 线程池
- 消息中间件
使用阻塞队列实现生产者和消费者模式
class MyData{ private volatile boolean FLAG = true;//默认开启,进行生产与消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyData(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception{ String data = null; Boolean retValue = null; while (FLAG){ data = atomicInteger.incrementAndGet() + ""; retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if(retValue){ System.out.println(Thread.currentThread().getName() +"插入队列中数据" + data +"成功"); }else { System.out.println(Thread.currentThread().getName() +"插入队列中数据" + data +"失败"); } TimeUnit.MILLISECONDS.sleep(100); } System.out.println(Thread.currentThread().getName() +"生产停止"); } public void myConsumer() throws Exception{ String result = null; while (FLAG){ result = blockingQueue.poll(2, TimeUnit.SECONDS); if(null == result || result.equalsIgnoreCase("")){ FLAG = false; System.out.println(Thread.currentThread().getName() +"超过2s,没有取到数据,消费退出"); return; } System.out.println(Thread.currentThread().getName() +"消费队列,消费" + result +"成功"); } } //叫停生产 public void stop(){ this.FLAG = false; } } public class ProConsumer_BlockQueueDemo { public static void main(String[] args) { MyData myData = new MyData(new ArrayBlockingQueue<>(3)); //生产线程 new Thread(() -> { System.out.println("生产线程开启"); try { myData.myProd(); }catch (Exception exception){ exception.printStackTrace(); } }, "Product").start(); //消费线程 new Thread(() -> { System.out.println("消费线程开启"); try { System.out.println("================"); myData.myConsumer(); System.out.println("================"); }catch (Exception exception){ exception.printStackTrace(); } }, "Consumer").start(); try{ TimeUnit.SECONDS.sleep(1); }catch (InterruptedException exception){ exception.printStackTrace(); } System.out.println(); System.out.println("大老板叫停"); myData.stop(); } } 执行结果如下图所示:
从上图可以看到,只有生产者开始生产数据,消息者才能消息数据,否则队列为空,线程会阻塞2s。
总结
阻塞队列,理解起来还是比较简单的,对于阻塞队列更加底层总结,在我们介绍了AQS之后,会继续跟大家分享。
下篇博文给大家分享,JUC包中另外一个比较重要的接口,Condition的使用及原理解析。