玖叶教程网

前端编程开发入门

Java集合之ArrayBlockingQueue

基于JDK 1.8 版本

概念

ArrayBlockingQueue是基于数组实现的阻塞队列,通过先进先出的顺序来访问元素。ArrayBlockingQueue是一个固定大小的有界队列,一旦创建,它的容量就不能再变化。如果向一个已经满的ArrayBlockingQueue中添加元素,则会导致该操作阻塞,直到该操作成功为止,同样的,如果从一个空的ArrayBlockingQueue中获取元素,也会导致阻塞。

使用

public static void main(String[] args) throws InterruptedException {
    // 创建一个容量为10的对象
    ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(10);
    // 初始化数据
    for (int i = 0; i < 10; i++) {
        arrayBlockingQueue.put(String.valueOf(i));
    }
    // 创建一个消费线程
    Thread consumer = new Thread(() -> {
        String value = "";
        try {
            // 等待10秒,降低消费频率
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        while ((value = arrayBlockingQueue.poll()) != null) {
            System.out.println(value);
        }
    });

    consumer.start();
    // 主线程在arrayBlockingQueue满的情况下,再增加数据
    long now = System.currentTimeMillis();
    System.out.println("put 第11个数据,当前时间:" + now);
    arrayBlockingQueue.put("aaaa");
    System.out.println("put 第11个数据,花费:" + (System.currentTimeMillis() - now));
}

该段代码的逻辑是首先创建一个ArrayBlockingQueue对象,容量设置为10,并初始化数据。开启一个消费线程来消费该队列的数据,同时主线程在队列满的情况下,继续添加元素,为了达到等待效果,消费线程休眠10s钟,这时可以看到主线程在调用put方法时,会被阻塞。具体的执行结果如下:

分析

首先看下ArrayBlockingQueue的类图:

ArrayBlockingQueue实现了 Queue、Collection接口,所以它既有集合,也有队列相关的行为。

数据结构

    /** The queued items */
    final Object[] items; // 队列数组

    /** items index for next take, poll, peek or remove */
    int takeIndex; // 当前出队列的索引

    /** items index for next put, offer, or add */
    int putIndex; // 当前入队列的索引

    /** Number of elements in the queue */
    int count; // 当前队列的数量


    /** Main lock guarding all access */
    final ReentrantLock lock; // 并发锁

    /** Condition for waiting takes */
    private final Condition notEmpty; // 队列不空的信号量

    /** Condition for waiting puts */
    private final Condition notFull; // 队列不满的信号量

构造函数

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 根据传入的参数,来决定是否创建公平锁
        lock = new ReentrantLock(fair);
        // 同一个锁创建的两个condition
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

在 ArrayBlockingQueue数据结构中,ReentrantLock 创建了两个 Condition对象:

notEmpty: 当队列不空的时候,会触发信号量,通知等待的线程

notFull: 当队列未满时,会触发信号量,通知等待的线程

注: Condition对象,是由指定的Lock对象创建, 当调用 Condition的await 方法时,会自动释放lock锁,并将当前线程阻塞,直到有其他线程调用该Condition的signal 方法、signalAll 方法、或是其他线程调用当前线程的interupt 方法,在await 方法返回当前线程时,当前线程必须重新获取和该Condition关联的锁。

put 操作

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        // 可重入锁
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lockInterruptibly();
        try {
            // 如果当前已经满了,则等待信号量
            while (count == items.length)
                notFull.await();// 这里会释放锁,线程阻塞等待
            // 如果成功,则元素入队
            enqueue(e);
        } finally {
            // 解锁成功
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // notEmpty的信号量通知
        notEmpty.signal();
    }

put方法处理:

  1. 获取锁
  2. 如果获取锁成功,判断当前的元素数量是否满了,如果未满,直接加入队列,否则等待未满的信号量
  3. 释放锁

poll 方法

    public E poll() {
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            // 如果数量为空,则返回null,否则调用dequeue
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        // 返回taskIndex处的数据
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // notFull的信号量通知
        notFull.signal();
        return x;
    }

处理逻辑:

  1. 获取锁
  2. 如果元素为空,返回 null, 如果不为空,返回takeIndex处理的元素,并处理notFull的信息量
  3. 释放锁

lock.lock() 与 lock.lockInterruptibly 的区别

在分析 offer与 put方法时,发现使用的锁定的方式是不同的

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
         // lock 
        lock.lock();
        try {
         ...
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
           ...
        } finally {
            lock.unlock();
        }
    }

lock.lock()与 lock.lockInterruptibly()的差异在于,调用锁定的线程是否允许被其他线程调用该线程的interupt方法,通常情况下,当锁被其他线程持有时,另一个线程去申请时,会被阻塞,一直等待在那边,如 synchronized 关键字、lock.lock() 方法等, 但当调用 lock.lockInterruptibly() 方法时,如果线程被其他的线程调用了interupt 方法时,则可以使被阻塞的线程跳出阻塞等待的逻辑。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言