玖叶教程网

前端编程开发入门

这两个BlockingQueue,一下子就打开了我防范OOM异常的思路

这两天同时在几个地方被安利了这两个BlockingQueue的实现类,号称能很好的解决OOM问题,本着好奇的目光点进去看了后,感觉甚是有趣

BlockingQueue介绍

BlockingQueue是jdk中自带的一个接口,它继承了Queue接口,并新增了put(e),take()两个方法

他是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调 用者;当出队列时,若队列为空,则阻塞调用者。

主要方法有

//入队,如果队列满,等待直到队列有空间
void put(E e) throws InterruptedException;
//出队,如果队列空,等待直到队列不为空,返回头部元素
E take() throws InterruptedException;
//入队,如果队列满,最多等待指定的时间,如果超时还是满,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出队,如果队列空,最多等待指定的时间,如果超时还是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;

在java的Concurrent包中,有众多的实现类

  • ArrayBlockingQueue、
  • LinkedBlockingQueue、
  • DelayQueue。
  • PriorityBlockingQueue
  • SynchronousQueue

等,大家或多或少地见过用过。

主角登场

这次讲的主角是MemoryLimitedLinkedBlockingQueueMemorySafeLinkedBlockingQueue,也是BlockingQueue的实现类,据说最初出自开源Apache ShenYu项目。

队列实现类的名字很长,但是也让人很好理解,最大特点都是跟内存的使用有关,一个是内存的使用限制,一个是内存的使用安全,感觉起来是不是很相似?

那为什么要这两个实现呢,都可以在哪些场景下使用呢

我们可能都看过阿里巴巴的java开发规范,其中有一项就是线程池不能通过Executors来创建,有一个理由是像FixedThreadPool和SingleThreadPool这种方式创建的线程池允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,进而导致OOM异常。

我们查看Executors的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
							0L, TimeUnit.MILLISECONDS,
							new LinkedBlockingQueue<Runnable>());

用的是LinkedBlockingQueue,并没有指定长度,也就是说可以向队列里无限添加数据了。怎么一想,是不是真的挺危险。

那怎么办呢,一般来说,需要我们在初始化的时候指定一个合理的值,但是这个值到底是多少,可能我们需要根据使用的场景和过往的经验来给值了。

这样一琢磨,这里面就确实值得玩味,或许换个思路用上面两主角来尝试解决这个问题

MemoryLimitedLinkedBlockingQueue

根据提交者的描述,它可以用来Executors中代替LinkedBlockingQueue队列,可避免OOM的问题,比如,限制这个队列可以使用的最大内存为 100M,超过使用内存限制就不允许继续往里添加。这样是不是就解决问题了

说起来容易,那就把家伙亮出来让我们开开眼吧

以下是完整类代码

package org.apache.shenyu.common.concurrent;

import java.lang.instrument.Instrumentation;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}.
 *
 * @see org.apache.shenyu.common.concurrent.MemoryLimiter
 * @see org.apache.shenyu.common.concurrent.MemoryLimitCalculator
 */
public class MemoryLimitedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = -6106022470621447542L;

    private final MemoryLimiter memoryLimiter;

    public MemoryLimitedLinkedBlockingQueue(final Instrumentation inst) {
        this(Integer.MAX_VALUE, inst);
    }

    public MemoryLimitedLinkedBlockingQueue(final long memoryLimit,
                                            final Instrumentation inst) {
        super(Integer.MAX_VALUE);
        this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
    }

    public MemoryLimitedLinkedBlockingQueue(final Collection<? extends E> c,
                                            final long memoryLimit,
                                            final Instrumentation inst) {
        super(c);
        this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
    }

    /**
     * set the memory limit.
     *
     * @param memoryLimit the memory limit
     */
    public void setMemoryLimit(final long memoryLimit) {
        memoryLimiter.setMemoryLimit(memoryLimit);
    }

    /**
     * get the memory limit.
     *
     * @return the memory limit
     */
    public long getMemoryLimit() {
        return memoryLimiter.getMemoryLimit();
    }

    /**
     * get the current memory.
     *
     * @return the current memory
     */
    public long getCurrentMemory() {
        return memoryLimiter.getCurrentMemory();
    }

    /**
     * get the current remain memory.
     *
     * @return the current remain memory
     */
    public long getCurrentRemainMemory() {
        return memoryLimiter.getCurrentRemainMemory();
    }

    @Override
    public void put(final E e) throws InterruptedException {
        memoryLimiter.acquireInterruptibly(e);
        super.put(e);
    }

    @Override
    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
        return memoryLimiter.acquire(e, timeout, unit) && super.offer(e, timeout, unit);
    }

    @Override
    public boolean offer(final E e) {
        return memoryLimiter.acquire(e) && super.offer(e);
    }

    @Override
    public E take() throws InterruptedException {
        final E e = super.take();
        memoryLimiter.releaseInterruptibly(e);
        return e;
    }

    @Override
    public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
        final E e = super.poll(timeout, unit);
        memoryLimiter.releaseInterruptibly(e, timeout, unit);
        return e;
    }

    @Override
    public E poll() {
        final E e = super.poll();
        memoryLimiter.release(e);
        return e;
    }

    @Override
    public boolean remove(final Object o) {
        final boolean success = super.remove(o);
        if (success) {
            memoryLimiter.release(o);
        }
        return success;
    }

    @Override
    public void clear() {
        super.clear();
        memoryLimiter.reset();
    }
}

MemoryLimitedLinkedBlockingQueue继承自 LinkedBlockingQueue,然后重写了它的几个核心方法。内部定义了一个MemoryLimiter对象,在每一次对队列的操作都会使用到这个对象。

MemoryLimiter源码截图:

里面有两个成员变量类型比较陌生

LongAdder:作用和AtomicLong是一样,都是一个实现了原子操作的累加器。在阿里巴巴的java开发手册也推荐用LongAdder来替代AtomicLong,可以减少乐观锁的重试次数

Instrumentation: 查询百度介绍是这样的,

开发者可以构建一个独立于应用程序的代理程序(Agent),用来监测和协助运行在 JVM 上的程序,甚至能够替换和修改某些类的定义。有了这样的功能,开发者就可以实现更为灵活的运行时虚拟机监控和 Java 类操作了,这样的特性实际上提供了一种虚拟机级别支持的 AOP 实现方式,使得开发者无需对 JDK 做任何升级和改动,就可以实现某些 AOP 的功能了。

在 Java SE 6 里面,instrumentation 包被赋予了更强大的功能:启动后的 instrument、本地代码(native code)instrument,以及动态改变 classpath 等等。这些改变,意味着 Java 具有了更强的动态控制、解释能力,它使得 Java 语言变得更加灵活多变。

看起来感觉很牛逼也比较难懂,我们接着看在put操作队列时都具体做了些什么


getObjectSize看起来是获取对象e的大小,有个这个神器,再结合这个队列的描述,就比较容易理解作者的意图了。

往队列里put时,先检查当前队列已使用内存和要put的对象内存大小,如果两者的和超过内存限制大小,就一直阻塞等待,否则就放行并更新队列已使用内存大小

从队列中take时,从已使用内存大小中减去对象e的内存大小值

需要注意的是,上述代码其实是有一个bug的,此处我们略过不谈

MemoryLimitedLinkedBlockingQueue整个思路就是这么简单粗暴,接下来我们看下MemorySafeLinkedBlockingQueue是什么鬼

MemorySafeLinkedBlockingQueue

从作者的提交说明来看,它也能解决OOM的问题,并且不依赖Instrumentation,还比MemoryLimitedLinkedBlockingQueue要更简单好用。

以下是完整类代码

/**
 * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
 * does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
 * {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}.
 */
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 8032578371749960142L;

    private int maxFreeMemory;

    private Rejector<E> rejector;

    public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
        super(Integer.MAX_VALUE);
        this.maxFreeMemory = maxFreeMemory;
        //default as DiscardPolicy to ensure compatibility with the old version
        this.rejector = new DiscardPolicy<>();
    }

    public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
                                         final int maxFreeMemory) {
        super(c);
        this.maxFreeMemory = maxFreeMemory;
        //default as DiscardPolicy to ensure compatibility with the old version
        this.rejector = new DiscardPolicy<>();
    }

    /**
     * set the max free memory.
     *
     * @param maxFreeMemory the max free memory
     */
    public void setMaxFreeMemory(final int maxFreeMemory) {
        this.maxFreeMemory = maxFreeMemory;
    }

    /**
     * get the max free memory.
     *
     * @return the max free memory limit
     */
    public int getMaxFreeMemory() {
        return maxFreeMemory;
    }

    /**
     * set the rejector.
     *
     * @param rejector the rejector
     */
    public void setRejector(final Rejector<E> rejector) {
        this.rejector = rejector;
    }

    /**
     * determine if there is any remaining free memory.
     *
     * @return true if has free memory
     */
    public boolean hasRemainedMemory() {
        return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
    }

    @Override
    public void put(final E e) throws InterruptedException {
        if (hasRemainedMemory()) {
            super.put(e);
        }
        rejector.reject(e, this);
    }

    @Override
    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
        if (!hasRemainedMemory()) {
            rejector.reject(e, this);
            return false;
        }
        return super.offer(e, timeout, unit);
    }

    @Override
    public boolean offer(final E e) {
        if (!hasRemainedMemory()) {
            rejector.reject(e, this);
            return false;
        }
        return super.offer(e);
    }
}

MemorySafeLinkedBlockingQueue也是继承自 LinkedBlockingQueue,有一成员变量maxFreeMemory,默认256M。

琢磨一下,莫非它会获取整个JVM 里面的剩余空间,当put操作计算剩余空间不足maxFreeMemory就会阻塞?

继续看,计算内存使用大小时,依赖了一个MemoryMXBean 对象。好像这个类也比较陌生,继续百度:提供管理接口,用于监视和管理 Java 虚拟机以及 Java 虚拟机在其上运行的操作系统。

MemorySafeLinkedBlockingQueue的核心方法是:hasRemainedMemory() ,即时判断JVM可用空间是否超过maxFreeMemory的限制。

继续看MemoryLimitCalculator源码:

核心方法是refresh,更新剩余可用空间;

另有一定时器,以50ms每次的频率调用refresh()方法更新剩余可用空间值。

再来梳理一下作者的思路:

put和offer时,检查是否有剩余足够的可用空间,是则允许添加

从作者的提供的示例来看,还支持动态设置maxFreeMemory的值

这样一比较,好家伙,看起来确实比MemoryLimitedLinkedBlockingQueue从设计上来说要好那么一些

总结

上面的这两个实现类都不依赖第三方框架,其代码直接拿来就用,而且代码也没几行。其实不管是用 Instrumentation 还是 ManagementFactory,本质上都是要限制内存

MemoryLimitedLinkedBlockingQueue算是正向思维,我需要使用队列时不报OOM,就解决队列自身的使用内存问题

MemorySafeLinkedBlockingQueue算是逆向思维,通过感知外界的内存使用情况,始终JVM预留一定大小的可用内存空间,来避免OOM问题。

其实从这两个BlockingQueue的实现类的设计思路,也可以用在我们的项目中呢,比方说使用的各种容器,也可能存在OOM 的风险,我们也可以借鉴上面的思路来解决呀。这次的写文章主要目的不是为了推广这两个类的使用,更重要的是学会拓宽自己的思维,有时换个方向也能取得更好的结果

最后,关于这两个类,其实在实现也有不少的缺陷。比如有大佬提出了自己的看法,比如 Instrumetation.getObjectSize()可能并不能准确的获取对象的实际内存大小,MemoryMXBean 获取的可用内存没有考虑到GC的情况

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言