本文作者:王一飞,叩丁狼高级讲师
在正式讲解ArrayBlockingQueue类前,先来科普一下线程中各类锁,只有了解这些锁之后,理解ArrayBlockingQueue那就更轻松了。
可重入锁
一种递归无阻塞的同步机制,也叫做递归锁。简单讲一个线程获取到锁对象之后,还是可以再次获取该锁对象时,不会发生阻塞。
java中 synchronized 跟ReentrantLock 都是可重入锁, synchronized 为隐性, 而ReentrantLock 为显示。 下面以synchronized 为例:
public class ThreadDemo { public synchronized void method1(){ System.out.println(Thread.currentThread().getName() + "进入method1...."); try { Thread.sleep(1000); method2(); } catch (InterruptedException e) { e.printStackTrace(); } } public synchronized void method2(){ System.out.println(Thread.currentThread().getName() + "进入method2...."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } public class App { public static void main(String[] args) { ThreadDemo threadDemo = new ThreadDemo(); new Thread(new Runnable() { @Override public void run() { threadDemo.method1(); } }, "t1").start(); new Thread(new Runnable() { @Override public void run() { threadDemo.method2(); } }, "t2").start(); new Thread(new Runnable() { @Override public void run() { threadDemo.method2(); } }, "t3").start(); } }
上面代码, t1线程先进入method1, 1s之后进入method2, 因为使用synchronized 加锁,t2, t3线程无法进入method2,必须等,而t1线程执行完method1后,可以直接进入method2, 无需要重复获取锁的操作。
改进: 可以再多开几个线程访问method2方法,会发现结果是一致的。
不可重入锁
不可重入锁是跟重入锁是对立的,表示一线程获取到锁对象后,想再次获取该锁对象时,必须先释放之前获取锁对象,否则阻塞等待。
java中没有线程类实现不可重入锁,更多时候,需要我们编程实现。
//自定义锁对象模拟不可重入锁 public class MyLock { private boolean isLock = false; //模拟获取锁 public synchronized void lock() throws InterruptedException { //自旋排除一些硬件执行干扰 while(isLock){ wait(); } isLock = true; } //模拟释放锁 public synchronized void unLock(){ isLock = false; notify(); } } public class ThreadDemo { private MyLock lock = new MyLock(); public void method1() throws InterruptedException { lock.lock(); System.out.println("method1...in"); method2(); System.out.println("method1...out"); lock.unLock(); } public void method2() throws InterruptedException { lock.lock(); System.out.println("method2......"); lock.unLock(); } } public class App { public static void main(String[] args) throws InterruptedException { ThreadDemo demo = new ThreadDemo(); demo.method1(); } }
运行之后, 打印只有method1…in, 在执行method1时,调用lock.lock()方法, isLock标记量改为true,表示锁被持有,跳过循环。 执行method2时,再次执行lock.lock(),isLock标签为true, 进入循环,线程等待。模拟拉当锁已经被持有,同一个线程第二次申请同一把锁,需要等待。
互斥锁
同一个时刻,只允许获取到所对象的线程执行。synchronized ReentrantLock 本身就是互斥锁。
public class ThreadDemo { //同一个时刻只允许一个线程进入 public synchronized void method1(){ System.out.println(Thread.currentThread().getName() + "进入...."); } } public class ThreadDemo { private ReentrantLock lock = new ReentrantLock(); //同一个时刻只允许一个线程进入 public void method1(){ lock.lock(); try { System.out.println(Thread.currentThread().getName() + "进入...."); }finally { lock.unlock(); } } }
自旋锁
一种非阻塞锁,也就是说,如果某线程需要获取锁,但锁已经被线程占用时,线程不阻塞等待,而是通过空循环来消耗CPU时间片,等待其他线程释放锁。注意,自旋锁中的循环也不是瞎循环, 一般会设置一定循环次数或者循环跳出条件。
自旋锁运用非常广泛, jdk中的juc包原子操作类中都是, 比如: AtomicInteger
public class AtomicInteger extends Number implements java.io.Serializable { public final int getAndSet(int newValue) { return unsafe.getAndSetInt(this, valueOffset, newValue); } }
Unsafe类
public final int getAndSetInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var4)); return var5; }
偏向锁 / 轻量级锁 / 重量级锁
偏向锁,轻量锁,重量锁并不是对线程加锁机制,而是jdk1.6之后提出的对线程加锁的优化策略。
偏向锁:当线程没有竞争的环境下,需要重复获取某个锁对象时,jvm为减少开销,让线程进入偏向模式,再次获取锁对象时,取消之前已经获取锁同步操作(即一系列的cas判断),直接取得锁对象。如果期间有其他线程参与竞争,则退出偏向模式。
当线程退出偏向模式最后,进入轻量级锁模式。此时,线程尝试使用自旋方式来获取锁,如果获取成功,继续逻辑执行, 如果获取失败,表示当前锁对象存在竞争,锁就会膨胀成重量级锁。
当线程进入重量级锁模式, 所有操作就跟我们所认知那样,争夺CUP,在操作过程中,争夺失败线程会被操作系统挂起,阻塞等待。那么线程间的切换和调用成本就会大大提高,性能也就对应下降了。
乐观锁
顾名思义,就是很乐观,在获取数据时认为别人不会对数据做修改,所以不上锁,但是在更新的时候会先判断别人有没有更新了此数据,最通用的实现是使用版本号判断方式,java的juc并发包中的原子操作类使用的CAS机制,其实也是一种乐观锁实现。
悲观锁
与乐观锁是相对的,操作前都假设最坏的情况,在获取数据的认为别人会对数据做修改,所以每次操作前都会上锁。而别人想操作此数据就会阻塞直到它拿到锁。Java中synchronized关键字ReentrantLock的实现便是悲观锁。
公平锁
公平锁,讲究公平,当锁对象被占用时,参与锁对象争夺的线程按照FIFO的顺序排序等待锁释放,人人有机会,不争不抢。
public class Resource implements Runnable { private ReentrantLock lock = new ReentrantLock(true); //公平锁 public void run() { System.out.println(Thread.currentThread().getName() + " 进入了....."); lock.lock(); //争锁 try { System.out.println(Thread.currentThread().getName() + " 获取锁并执行了....."); }finally { lock.unlock(); } } } public class App { public static void main(String[] args) throws InterruptedException { Resource resource = new Resource(); //共享资源 for (int i = 0; i <10; i++) { new Thread(resource, "t_" + i).start(); } } } t_4 进入了..... t_2 进入了..... t_4 获取锁并执行了..... t_7 进入了..... t_6 进入了..... t_3 进入了..... t_5 进入了..... t_2 获取锁并执行了..... t_7 获取锁并执行了..... t_6 获取锁并执行了..... t_3 获取锁并执行了..... t_5 获取锁并执行了..... t_0 进入了..... t_0 获取锁并执行了..... t_1 进入了..... t_1 获取锁并执行了..... t_8 进入了..... t_8 获取锁并执行了..... t_9 进入了..... t_9 获取锁并执行了.....
观察执行结果,当锁是公平锁时(new ReentrantLock(true))会发现进入顺序是4,2,7,6,3,5,0,1,8,9 而执行的顺序是4,2,7,6,3,5,0,1,8,9。两者顺序一样,这就是公平的体现,谁先来,谁先执行。
非公平锁
与公平锁相对,当锁对象被释放时,所有参与争夺锁对象的线程各凭本事,撑死胆大的,饿死胆小的。
其他代码不变,仅仅将参数改为false或者去掉 private ReentrantLock lock = new ReentrantLock(false); //非公平锁 t_4 进入了..... t_2 进入了..... t_5 进入了..... t_3 进入了..... t_4 获取锁并执行了..... t_7 进入了..... t_7 获取锁并执行了..... t_0 进入了..... t_1 进入了..... t_0 获取锁并执行了..... t_8 进入了..... t_2 获取锁并执行了..... t_9 进入了..... t_9 获取锁并执行了..... t_6 进入了..... t_5 获取锁并执行了..... t_3 获取锁并执行了..... t_1 获取锁并执行了..... t_8 获取锁并执行了..... t_6 获取锁并执行了.....
当锁是非公平锁时(new ReentrantLock(false))进入的顺序与执行顺序不一样啦,这就是非公平锁,争夺CPU各凭本事。
概念
ArrayBlockingQueue 是一个有界阻塞的队列。有界原因是它底层维护了一个数组,初始化时,可以直接指定。要注意,一旦创建成功后,数组将无法进行再扩容。而阻塞是因为它对入列出列做了加锁处理,如果队列满了,再入列则需要阻塞等待, 如果队列是空的,出列时也需要阻塞等待。
ArrayBlockingQueue 底层是一个有界数组,遵循FIFO原则,对进入的元素进行排序,先进先出。
ArrayBlockingQueue 使用ReentrantLock锁,再配合两种Condition实现队列的线程安全操作。并发环境下ArrayBlockingQueue 使用频率较高
ArrayBlockingQueue 支持公平与非公平2种操作策略,在创建对象时通过构造函数将fair参数设置为true/false即可,需要注意的是,如果fair设置为false,表示持有公平锁,这种操作会降低系统吞吐量,慎用。
内部结构
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; //存放元素数组 final ReentrantLock lock; //互斥锁对象 private final Condition notEmpty; //非空条件变量 private final Condition notFull; //非满条件变量 .... }
从内部结构源码上看,ArrayBlockingQueue 内部维护一个final数组,当队列初始化后将无法再进行拓展,保证队列的有界性。lock 互斥锁,在出队入队中保证线程的安全。而notEmpty 跟 notFull 条件变量保证队列在满队时入队等待, 当队列空列时,出队等待。
初始化
//参数1:队列初始长度 //参数2:是否为公平队列 fasle: 是, true 不是 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity) { this(capacity, false); } //参数3:队列初始化元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { ..... }
ArrayBlockingQueue 有3个构造器,核心是2个参数的构造器, capacity表示队列初始化长度, fair 指定ArrayBlockingQueue是公平队列还是非公平队列。
入列
ArrayBlockingQueue 入列方式有大体三种:
public class App { public static void main(String[] args) throws InterruptedException { //队列长度为2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2); //方式1:满列抛异常 ///System.out.println(queue.add("add")); //true ///System.out.println(queue.add("add")); //true ///System.out.println(queue.add("add")); //满列异常 //方式2:满列返回false,不阻塞 //System.out.println(queue.offer("offer")); //true //System.out.println(queue.offer("offer")); //true //System.out.println(queue.offer("offer")); //false //方式3:满列阻塞(推荐) queue.put("put"); queue.put("put"); queue.put("put"); //满列阻塞等待 } }
这里我们以put方法为例
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //取锁: 线程运行中断 try { while (count == items.length) notFull.await(); //队列满队,需要暂停等待 enqueue(e); //入列 } finally { lock.unlock(); //释放锁 } }
在put方法开始前, 先获取可中断lock.lockInterruptibly(), 对put核心逻辑进行加锁,当判断到队列已满,阻塞当前线程。反之, 执行enqueue()实现入列逻辑。
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //入列 //putIndex 表示下一个入列所以, 如果为队列长度, 下一个轮回 //原因: 队列为数组, 操作所以从0开始 if (++putIndex == items.length) putIndex = 0; count++; //总数+1 notEmpty.signal(); //唤醒等待出列线程 }
进入enqueue之后, 因为该方法已经持有锁,所以无法再进行锁重入,在enqueue方法之后, 执行notEmpty.signal(); 唤醒出列等待线程。
出列
ArrayBlockingQueue 出列也对应的有3中方式
public class App { public static void main(String[] args) throws InterruptedException { //队列长度为2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2); queue.put("admin"); queue.put("admin"); //方式1:空列出队时,抛异常 //System.out.println(queue.remove()); //System.out.println(queue.remove()); //System.out.println(queue.remove()); //空列报异常 //方式2:空列出队时,返回null System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); //空列返回null //方式3:空列出队时,阻塞(推荐) System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); //空列阻塞 } }
这里我们以take方法为例
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); // 队长为0,需要暂停等待 return dequeue(); } finally { lock.unlock(); } }
跟put方法操作一样, 进入方法之后, 先获取锁,再判断队列长度是否为0, 如果为0, 当前线程进入阻塞。反之,进入dequeue 方法执行出列操作。
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; //出列之后,原先队列设置为null //takeIndex 下一个出列的数据索引, 一个轮回后,设置为0 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); //唤醒等待入列线程 return x; }
公平/非公平队列
ArrayBlockingQueue 可以实现公平与非公平2种队列, 公平队列表示在并发环境下,如果队列已经满列了,入列线程按照FIFO的顺序阻塞,等待召唤。非公平队列就没有这种规矩,谁先抢到,谁先入列。
来看一下例子:
需求:开启10个线程往边界为3的队列添加数据, 同时开始一个线程不断出列。
public class App { public static void main(String[] args) throws InterruptedException { //队列长度为3 //公平队列 ArrayBlockingQueue queue = new ArrayBlockingQueue(3, true); for (int i= 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10)); //将问题放大 //线程进入 System.out.println("进入-"+ Thread.currentThread().getName()); //阻塞等待入列 queue.put("出列-" + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } },"t_" + i).start(); } new Thread(new Runnable() { @Override public void run() { while(true){ try { //按顺序出列 System.out.println("------" + queue.take()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } } 进入-t_5 进入-t_1 ------出列-t_5 ------出列-t_1 进入-t_8 ------出列-t_8 进入-t_7 ------出列-t_7 进入-t_2 ------出列-t_2 进入-t_9 ------出列-t_9 进入-t_0 ------出列-t_0 进入-t_3 进入-t_6 ------出列-t_3 ------出列-t_6 进入-t_4 ------出列-t_4
观察结果,发现进入顺序跟出列顺序一样。公平队列讲究公平, 进入0到9线程启动后,执行run方法,都能执行 “进入” 代码,但是入列的操作是阻塞的,同一时间点只允许一个线程进入。其他线程必须等待,那么谁先打印 “进入” 代码,就表示谁先阻塞,依照公平FIFO原则,就应该谁先出列。 所以当进入顺序与出列一致就把表示公平原则生效。
将参数改为false,我们再看打印结果
ArrayBlockingQueue queue = new ArrayBlockingQueue(3, false); 进入-t_3 进入-t_7 进入-t_6 进入-t_0 进入-t_4 进入-t_8 进入-t_5 进入-t_1 ------出列-t_3 ------出列-t_7 ------出列-t_6 ------出列-t_8 ------出列-t_4 ------出列-t_5 ------出列-t_1 ------出列-t_0 进入-t_9 ------出列-t_9 进入-t_2 ------出列-t_2
观察, 很明显进入与出列顺序不一致,这就是非公平队列。
注意: 10个线程效果不是太明显,可以适当加大。