玖叶教程网

前端编程开发入门

Java多线程开发进阶:ArrayBlockingQueue

本文作者:王一飞,叩丁狼高级讲师

在正式讲解ArrayBlockingQueue类前,先来科普一下线程中各类锁,只有了解这些锁之后,理解ArrayBlockingQueue那就更轻松了。

可重入锁

一种递归无阻塞的同步机制,也叫做递归锁。简单讲一个线程获取到锁对象之后,还是可以再次获取该锁对象时,不会发生阻塞。

java中 synchronized 跟ReentrantLock 都是可重入锁, synchronized 为隐性, 而ReentrantLock 为显示。 下面以synchronized 为例:

public class ThreadDemo {
 public synchronized void method1(){
 System.out.println(Thread.currentThread().getName() + "进入method1....");
 try {
 Thread.sleep(1000);
 method2();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 public synchronized void method2(){
 System.out.println(Thread.currentThread().getName() + "进入method2....");
 try {
 Thread.sleep(2000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}
public class App {
 public static void main(String[] args) {
 ThreadDemo threadDemo = new ThreadDemo();
 new Thread(new Runnable() {
 @Override
 public void run() {
 threadDemo.method1();
 }
 }, "t1").start();
 new Thread(new Runnable() {
 @Override
 public void run() {
 threadDemo.method2();
 }
 }, "t2").start();
 new Thread(new Runnable() {
 @Override
 public void run() {
 threadDemo.method2();
 }
 }, "t3").start();
 }
}

上面代码, t1线程先进入method1, 1s之后进入method2, 因为使用synchronized 加锁,t2, t3线程无法进入method2,必须等,而t1线程执行完method1后,可以直接进入method2, 无需要重复获取锁的操作。

改进: 可以再多开几个线程访问method2方法,会发现结果是一致的。

不可重入锁

不可重入锁是跟重入锁是对立的,表示一线程获取到锁对象后,想再次获取该锁对象时,必须先释放之前获取锁对象,否则阻塞等待。

java中没有线程类实现不可重入锁,更多时候,需要我们编程实现。

//自定义锁对象模拟不可重入锁
public class MyLock {
 private boolean isLock = false;
 //模拟获取锁
 public synchronized void lock() throws InterruptedException {
 //自旋排除一些硬件执行干扰
 while(isLock){
 wait();
 }
 isLock = true;
 }
 //模拟释放锁
 public synchronized void unLock(){
 isLock = false;
 notify();
 }
}
public class ThreadDemo {
 private MyLock lock = new MyLock();
 public void method1() throws InterruptedException {
 lock.lock();
 System.out.println("method1...in");
 method2();
 System.out.println("method1...out");
 lock.unLock();
 }
 public void method2() throws InterruptedException {
 lock.lock();
 System.out.println("method2......");
 lock.unLock();
 }
}
public class App {
 public static void main(String[] args) throws InterruptedException {
 ThreadDemo demo = new ThreadDemo();
 demo.method1();
 }
}

运行之后, 打印只有method1…in, 在执行method1时,调用lock.lock()方法, isLock标记量改为true,表示锁被持有,跳过循环。 执行method2时,再次执行lock.lock(),isLock标签为true, 进入循环,线程等待。模拟拉当锁已经被持有,同一个线程第二次申请同一把锁,需要等待。

互斥锁

同一个时刻,只允许获取到所对象的线程执行。synchronized ReentrantLock 本身就是互斥锁。

public class ThreadDemo {
 //同一个时刻只允许一个线程进入
 public synchronized void method1(){
 System.out.println(Thread.currentThread().getName() + "进入....");
 }
}
public class ThreadDemo {
 private ReentrantLock lock = new ReentrantLock();
 //同一个时刻只允许一个线程进入
 public void method1(){
 lock.lock();
 try {
 System.out.println(Thread.currentThread().getName() + "进入...."); 
 }finally {
 lock.unlock();
 }
 }
}

自旋锁

一种非阻塞锁,也就是说,如果某线程需要获取锁,但锁已经被线程占用时,线程不阻塞等待,而是通过空循环来消耗CPU时间片,等待其他线程释放锁。注意,自旋锁中的循环也不是瞎循环, 一般会设置一定循环次数或者循环跳出条件。

自旋锁运用非常广泛, jdk中的juc包原子操作类中都是, 比如: AtomicInteger

public class AtomicInteger extends Number implements java.io.Serializable {
 public final int getAndSet(int newValue) {
 return unsafe.getAndSetInt(this, valueOffset, newValue);
 }
}

Unsafe类

public final int getAndSetInt(Object var1, long var2, int var4) {
 int var5;
 do {
 var5 = this.getIntVolatile(var1, var2);
 } while(!this.compareAndSwapInt(var1, var2, var5, var4));
 return var5;
 }

偏向锁 / 轻量级锁 / 重量级锁

偏向锁,轻量锁,重量锁并不是对线程加锁机制,而是jdk1.6之后提出的对线程加锁的优化策略。

偏向锁:当线程没有竞争的环境下,需要重复获取某个锁对象时,jvm为减少开销,让线程进入偏向模式,再次获取锁对象时,取消之前已经获取锁同步操作(即一系列的cas判断),直接取得锁对象。如果期间有其他线程参与竞争,则退出偏向模式。

当线程退出偏向模式最后,进入轻量级锁模式。此时,线程尝试使用自旋方式来获取锁,如果获取成功,继续逻辑执行, 如果获取失败,表示当前锁对象存在竞争,锁就会膨胀成重量级锁。

当线程进入重量级锁模式, 所有操作就跟我们所认知那样,争夺CUP,在操作过程中,争夺失败线程会被操作系统挂起,阻塞等待。那么线程间的切换和调用成本就会大大提高,性能也就对应下降了。

乐观锁

顾名思义,就是很乐观,在获取数据时认为别人不会对数据做修改,所以不上锁,但是在更新的时候会先判断别人有没有更新了此数据,最通用的实现是使用版本号判断方式,java的juc并发包中的原子操作类使用的CAS机制,其实也是一种乐观锁实现。

悲观锁

与乐观锁是相对的,操作前都假设最坏的情况,在获取数据的认为别人会对数据做修改,所以每次操作前都会上锁。而别人想操作此数据就会阻塞直到它拿到锁。Java中synchronized关键字ReentrantLock的实现便是悲观锁。

公平锁

公平锁,讲究公平,当锁对象被占用时,参与锁对象争夺的线程按照FIFO的顺序排序等待锁释放,人人有机会,不争不抢。

public class Resource implements Runnable {
 private ReentrantLock lock = new ReentrantLock(true); //公平锁
 public void run() {
 System.out.println(Thread.currentThread().getName() + " 进入了.....");
 lock.lock(); //争锁
 try {
 System.out.println(Thread.currentThread().getName() + " 获取锁并执行了.....");
 }finally {
 lock.unlock();
 }
 }
}
public class App {
 public static void main(String[] args) throws InterruptedException {
 Resource resource = new Resource(); //共享资源
 for (int i = 0; i <10; i++) {
 new Thread(resource, "t_" + i).start();
 }
 }
}
t_4 进入了.....
t_2 进入了.....
t_4 获取锁并执行了.....
t_7 进入了.....
t_6 进入了.....
t_3 进入了.....
t_5 进入了.....
t_2 获取锁并执行了.....
t_7 获取锁并执行了.....
t_6 获取锁并执行了.....
t_3 获取锁并执行了.....
t_5 获取锁并执行了.....
t_0 进入了.....
t_0 获取锁并执行了.....
t_1 进入了.....
t_1 获取锁并执行了.....
t_8 进入了.....
t_8 获取锁并执行了.....
t_9 进入了.....
t_9 获取锁并执行了.....

观察执行结果,当锁是公平锁时(new ReentrantLock(true))会发现进入顺序是4,2,7,6,3,5,0,1,8,9 而执行的顺序是4,2,7,6,3,5,0,1,8,9。两者顺序一样,这就是公平的体现,谁先来,谁先执行。

非公平锁

与公平锁相对,当锁对象被释放时,所有参与争夺锁对象的线程各凭本事,撑死胆大的,饿死胆小的。

其他代码不变,仅仅将参数改为false或者去掉
private ReentrantLock lock = new ReentrantLock(false); //非公平锁
t_4 进入了.....
t_2 进入了.....
t_5 进入了.....
t_3 进入了.....
t_4 获取锁并执行了.....
t_7 进入了.....
t_7 获取锁并执行了.....
t_0 进入了.....
t_1 进入了.....
t_0 获取锁并执行了.....
t_8 进入了.....
t_2 获取锁并执行了.....
t_9 进入了.....
t_9 获取锁并执行了.....
t_6 进入了.....
t_5 获取锁并执行了.....
t_3 获取锁并执行了.....
t_1 获取锁并执行了.....
t_8 获取锁并执行了.....
t_6 获取锁并执行了.....

当锁是非公平锁时(new ReentrantLock(false))进入的顺序与执行顺序不一样啦,这就是非公平锁,争夺CPU各凭本事。

概念

ArrayBlockingQueue 是一个有界阻塞的队列。有界原因是它底层维护了一个数组,初始化时,可以直接指定。要注意,一旦创建成功后,数组将无法进行再扩容。而阻塞是因为它对入列出列做了加锁处理,如果队列满了,再入列则需要阻塞等待, 如果队列是空的,出列时也需要阻塞等待。

ArrayBlockingQueue 底层是一个有界数组,遵循FIFO原则,对进入的元素进行排序,先进先出。

ArrayBlockingQueue 使用ReentrantLock锁,再配合两种Condition实现队列的线程安全操作。并发环境下ArrayBlockingQueue 使用频率较高

ArrayBlockingQueue 支持公平与非公平2种操作策略,在创建对象时通过构造函数将fair参数设置为true/false即可,需要注意的是,如果fair设置为false,表示持有公平锁,这种操作会降低系统吞吐量,慎用。

内部结构

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
 implements BlockingQueue<E>, java.io.Serializable {
 final Object[] items; //存放元素数组
 final ReentrantLock lock; //互斥锁对象
 private final Condition notEmpty; //非空条件变量
 private final Condition notFull; //非满条件变量
 ....
}

从内部结构源码上看,ArrayBlockingQueue 内部维护一个final数组,当队列初始化后将无法再进行拓展,保证队列的有界性。lock 互斥锁,在出队入队中保证线程的安全。而notEmpty 跟 notFull 条件变量保证队列在满队时入队等待, 当队列空列时,出队等待。

初始化

//参数1:队列初始长度 
//参数2:是否为公平队列 fasle: 是, true 不是
public ArrayBlockingQueue(int capacity, boolean fair) {
 if (capacity <= 0)
 throw new IllegalArgumentException();
 this.items = new Object[capacity];
 lock = new ReentrantLock(fair);
 notEmpty = lock.newCondition();
 notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity) {
 this(capacity, false);
}
//参数3:队列初始化元素
public ArrayBlockingQueue(int capacity, boolean fair,
 Collection<? extends E> c) {
 .....
}

ArrayBlockingQueue 有3个构造器,核心是2个参数的构造器, capacity表示队列初始化长度, fair 指定ArrayBlockingQueue是公平队列还是非公平队列。

入列

ArrayBlockingQueue 入列方式有大体三种:

public class App {
 public static void main(String[] args) throws InterruptedException {
 //队列长度为2
 ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
 //方式1:满列抛异常
 ///System.out.println(queue.add("add")); //true
 ///System.out.println(queue.add("add")); //true
 ///System.out.println(queue.add("add")); //满列异常
 //方式2:满列返回false,不阻塞
 //System.out.println(queue.offer("offer")); //true
 //System.out.println(queue.offer("offer")); //true
 //System.out.println(queue.offer("offer")); //false
 //方式3:满列阻塞(推荐)
 queue.put("put");
 queue.put("put");
 queue.put("put"); //满列阻塞等待
 }
}

这里我们以put方法为例

 public void put(E e) throws InterruptedException {
 checkNotNull(e);
 final ReentrantLock lock = this.lock;
 lock.lockInterruptibly(); //取锁: 线程运行中断
 try {
 while (count == items.length)
 notFull.await(); //队列满队,需要暂停等待
 enqueue(e); //入列
 } finally {
 lock.unlock(); //释放锁
 }
 }

在put方法开始前, 先获取可中断lock.lockInterruptibly(), 对put核心逻辑进行加锁,当判断到队列已满,阻塞当前线程。反之, 执行enqueue()实现入列逻辑。

 private void enqueue(E x) {
 final Object[] items = this.items;
 items[putIndex] = x; //入列
 //putIndex 表示下一个入列所以, 如果为队列长度, 下一个轮回
 //原因: 队列为数组, 操作所以从0开始
 if (++putIndex == items.length) 
 putIndex = 0; 
 count++; //总数+1
 notEmpty.signal(); //唤醒等待出列线程
 }

进入enqueue之后, 因为该方法已经持有锁,所以无法再进行锁重入,在enqueue方法之后, 执行notEmpty.signal(); 唤醒出列等待线程。

出列

ArrayBlockingQueue 出列也对应的有3中方式

public class App {
 public static void main(String[] args) throws InterruptedException {
 //队列长度为2
 ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
 queue.put("admin");
 queue.put("admin");
 //方式1:空列出队时,抛异常
 //System.out.println(queue.remove());
 //System.out.println(queue.remove());
 //System.out.println(queue.remove()); //空列报异常
 //方式2:空列出队时,返回null
 System.out.println(queue.poll());
 System.out.println(queue.poll());
 System.out.println(queue.poll()); //空列返回null
 //方式3:空列出队时,阻塞(推荐)
 System.out.println(queue.take());
 System.out.println(queue.take());
 System.out.println(queue.take()); //空列阻塞
 }
}

这里我们以take方法为例

public E take() throws InterruptedException {
 final ReentrantLock lock = this.lock;
 lock.lockInterruptibly();
 try {
 while (count == 0)
 notEmpty.await(); // 队长为0,需要暂停等待
 return dequeue();
 } finally {
 lock.unlock();
 }
 }

跟put方法操作一样, 进入方法之后, 先获取锁,再判断队列长度是否为0, 如果为0, 当前线程进入阻塞。反之,进入dequeue 方法执行出列操作。

 private E dequeue() {
 final Object[] items = this.items;
 @SuppressWarnings("unchecked")
 E x = (E) items[takeIndex];
 items[takeIndex] = null; //出列之后,原先队列设置为null 
 //takeIndex 下一个出列的数据索引, 一个轮回后,设置为0
 if (++takeIndex == items.length) 
 takeIndex = 0;
 count--;
 if (itrs != null)
 itrs.elementDequeued();
 notFull.signal(); //唤醒等待入列线程
 return x;
 }

公平/非公平队列

ArrayBlockingQueue 可以实现公平与非公平2种队列, 公平队列表示在并发环境下,如果队列已经满列了,入列线程按照FIFO的顺序阻塞,等待召唤。非公平队列就没有这种规矩,谁先抢到,谁先入列。

来看一下例子:

需求:开启10个线程往边界为3的队列添加数据, 同时开始一个线程不断出列。

public class App {
 public static void main(String[] args) throws InterruptedException {
 //队列长度为3
 //公平队列
 ArrayBlockingQueue queue = new ArrayBlockingQueue(3, true);
 for (int i= 0; i < 10; i++){
 new Thread(new Runnable() {
 @Override
 public void run() {
 try {
 Thread.sleep((long) (Math.random() * 10)); //将问题放大
 //线程进入
 System.out.println("进入-"+ Thread.currentThread().getName());
 //阻塞等待入列
 queue.put("出列-" + Thread.currentThread().getName());
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 },"t_" + i).start();
 }
 new Thread(new Runnable() {
 @Override
 public void run() {
 while(true){
 try {
 //按顺序出列
 System.out.println("------" + queue.take());
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
 }).start();
 }
}
进入-t_5
进入-t_1
------出列-t_5
------出列-t_1
进入-t_8
------出列-t_8
进入-t_7
------出列-t_7
进入-t_2
------出列-t_2
进入-t_9
------出列-t_9
进入-t_0
------出列-t_0
进入-t_3
进入-t_6
------出列-t_3
------出列-t_6
进入-t_4
------出列-t_4

观察结果,发现进入顺序跟出列顺序一样。公平队列讲究公平, 进入0到9线程启动后,执行run方法,都能执行 “进入” 代码,但是入列的操作是阻塞的,同一时间点只允许一个线程进入。其他线程必须等待,那么谁先打印 “进入” 代码,就表示谁先阻塞,依照公平FIFO原则,就应该谁先出列。 所以当进入顺序与出列一致就把表示公平原则生效。

将参数改为false,我们再看打印结果

ArrayBlockingQueue queue = new ArrayBlockingQueue(3, false);
进入-t_3
进入-t_7
进入-t_6
进入-t_0
进入-t_4
进入-t_8
进入-t_5
进入-t_1
------出列-t_3
------出列-t_7
------出列-t_6
------出列-t_8
------出列-t_4
------出列-t_5
------出列-t_1
------出列-t_0
进入-t_9
------出列-t_9
进入-t_2
------出列-t_2

观察, 很明显进入与出列顺序不一致,这就是非公平队列。

注意: 10个线程效果不是太明显,可以适当加大。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言