这两天同时在几个地方被安利了这两个BlockingQueue的实现类,号称能很好的解决OOM问题,本着好奇的目光点进去看了后,感觉甚是有趣
BlockingQueue介绍
BlockingQueue是jdk中自带的一个接口,它继承了Queue接口,并新增了put(e),take()两个方法
他是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调 用者;当出队列时,若队列为空,则阻塞调用者。
主要方法有
//入队,如果队列满,等待直到队列有空间
void put(E e) throws InterruptedException;
//出队,如果队列空,等待直到队列不为空,返回头部元素
E take() throws InterruptedException;
//入队,如果队列满,最多等待指定的时间,如果超时还是满,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出队,如果队列空,最多等待指定的时间,如果超时还是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
在java的Concurrent包中,有众多的实现类
- ArrayBlockingQueue、
- LinkedBlockingQueue、
- DelayQueue。
- PriorityBlockingQueue
- SynchronousQueue
等,大家或多或少地见过用过。
主角登场
这次讲的主角是MemoryLimitedLinkedBlockingQueue和MemorySafeLinkedBlockingQueue,也是BlockingQueue的实现类,据说最初出自开源Apache ShenYu项目。
队列实现类的名字很长,但是也让人很好理解,最大特点都是跟内存的使用有关,一个是内存的使用限制,一个是内存的使用安全,感觉起来是不是很相似?
那为什么要这两个实现呢,都可以在哪些场景下使用呢
我们可能都看过阿里巴巴的java开发规范,其中有一项就是线程池不能通过Executors来创建,有一个理由是像FixedThreadPool和SingleThreadPool这种方式创建的线程池允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,进而导致OOM异常。
我们查看Executors的源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
用的是LinkedBlockingQueue,并没有指定长度,也就是说可以向队列里无限添加数据了。怎么一想,是不是真的挺危险。
那怎么办呢,一般来说,需要我们在初始化的时候指定一个合理的值,但是这个值到底是多少,可能我们需要根据使用的场景和过往的经验来给值了。
这样一琢磨,这里面就确实值得玩味,或许换个思路用上面两主角来尝试解决这个问题
MemoryLimitedLinkedBlockingQueue
根据提交者的描述,它可以用来Executors中代替LinkedBlockingQueue队列,可避免OOM的问题,比如,限制这个队列可以使用的最大内存为 100M,超过使用内存限制就不允许继续往里添加。这样是不是就解决问题了
说起来容易,那就把家伙亮出来让我们开开眼吧
以下是完整类代码
package org.apache.shenyu.common.concurrent;
import java.lang.instrument.Instrumentation;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}.
*
* @see org.apache.shenyu.common.concurrent.MemoryLimiter
* @see org.apache.shenyu.common.concurrent.MemoryLimitCalculator
*/
public class MemoryLimitedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = -6106022470621447542L;
private final MemoryLimiter memoryLimiter;
public MemoryLimitedLinkedBlockingQueue(final Instrumentation inst) {
this(Integer.MAX_VALUE, inst);
}
public MemoryLimitedLinkedBlockingQueue(final long memoryLimit,
final Instrumentation inst) {
super(Integer.MAX_VALUE);
this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
}
public MemoryLimitedLinkedBlockingQueue(final Collection<? extends E> c,
final long memoryLimit,
final Instrumentation inst) {
super(c);
this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
}
/**
* set the memory limit.
*
* @param memoryLimit the memory limit
*/
public void setMemoryLimit(final long memoryLimit) {
memoryLimiter.setMemoryLimit(memoryLimit);
}
/**
* get the memory limit.
*
* @return the memory limit
*/
public long getMemoryLimit() {
return memoryLimiter.getMemoryLimit();
}
/**
* get the current memory.
*
* @return the current memory
*/
public long getCurrentMemory() {
return memoryLimiter.getCurrentMemory();
}
/**
* get the current remain memory.
*
* @return the current remain memory
*/
public long getCurrentRemainMemory() {
return memoryLimiter.getCurrentRemainMemory();
}
@Override
public void put(final E e) throws InterruptedException {
memoryLimiter.acquireInterruptibly(e);
super.put(e);
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
return memoryLimiter.acquire(e, timeout, unit) && super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
return memoryLimiter.acquire(e) && super.offer(e);
}
@Override
public E take() throws InterruptedException {
final E e = super.take();
memoryLimiter.releaseInterruptibly(e);
return e;
}
@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final E e = super.poll(timeout, unit);
memoryLimiter.releaseInterruptibly(e, timeout, unit);
return e;
}
@Override
public E poll() {
final E e = super.poll();
memoryLimiter.release(e);
return e;
}
@Override
public boolean remove(final Object o) {
final boolean success = super.remove(o);
if (success) {
memoryLimiter.release(o);
}
return success;
}
@Override
public void clear() {
super.clear();
memoryLimiter.reset();
}
}
MemoryLimitedLinkedBlockingQueue继承自 LinkedBlockingQueue,然后重写了它的几个核心方法。内部定义了一个MemoryLimiter对象,在每一次对队列的操作都会使用到这个对象。
MemoryLimiter源码截图:
里面有两个成员变量类型比较陌生
LongAdder:作用和AtomicLong是一样,都是一个实现了原子操作的累加器。在阿里巴巴的java开发手册也推荐用LongAdder来替代AtomicLong,可以减少乐观锁的重试次数
Instrumentation: 查询百度介绍是这样的,
开发者可以构建一个独立于应用程序的代理程序(Agent),用来监测和协助运行在 JVM 上的程序,甚至能够替换和修改某些类的定义。有了这样的功能,开发者就可以实现更为灵活的运行时虚拟机监控和 Java 类操作了,这样的特性实际上提供了一种虚拟机级别支持的 AOP 实现方式,使得开发者无需对 JDK 做任何升级和改动,就可以实现某些 AOP 的功能了。
在 Java SE 6 里面,instrumentation 包被赋予了更强大的功能:启动后的 instrument、本地代码(native code)instrument,以及动态改变 classpath 等等。这些改变,意味着 Java 具有了更强的动态控制、解释能力,它使得 Java 语言变得更加灵活多变。
看起来感觉很牛逼也比较难懂,我们接着看在put操作队列时都具体做了些什么
getObjectSize看起来是获取对象e的大小,有个这个神器,再结合这个队列的描述,就比较容易理解作者的意图了。
往队列里put时,先检查当前队列已使用内存和要put的对象内存大小,如果两者的和超过内存限制大小,就一直阻塞等待,否则就放行并更新队列已使用内存大小
从队列中take时,从已使用内存大小中减去对象e的内存大小值
需要注意的是,上述代码其实是有一个bug的,此处我们略过不谈
MemoryLimitedLinkedBlockingQueue整个思路就是这么简单粗暴,接下来我们看下MemorySafeLinkedBlockingQueue是什么鬼
MemorySafeLinkedBlockingQueue
从作者的提交说明来看,它也能解决OOM的问题,并且不依赖Instrumentation,还比MemoryLimitedLinkedBlockingQueue要更简单好用。
以下是完整类代码
/**
* Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
* does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
* {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}.
*/
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 8032578371749960142L;
private int maxFreeMemory;
private Rejector<E> rejector;
public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
final int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}
/**
* get the max free memory.
*
* @return the max free memory limit
*/
public int getMaxFreeMemory() {
return maxFreeMemory;
}
/**
* set the rejector.
*
* @param rejector the rejector
*/
public void setRejector(final Rejector<E> rejector) {
this.rejector = rejector;
}
/**
* determine if there is any remaining free memory.
*
* @return true if has free memory
*/
public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
}
@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
}
rejector.reject(e, this);
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e);
}
}
MemorySafeLinkedBlockingQueue也是继承自 LinkedBlockingQueue,有一成员变量maxFreeMemory,默认256M。
琢磨一下,莫非它会获取整个JVM 里面的剩余空间,当put操作计算剩余空间不足maxFreeMemory就会阻塞?
继续看,计算内存使用大小时,依赖了一个MemoryMXBean 对象。好像这个类也比较陌生,继续百度:提供管理接口,用于监视和管理 Java 虚拟机以及 Java 虚拟机在其上运行的操作系统。
MemorySafeLinkedBlockingQueue的核心方法是:hasRemainedMemory() ,即时判断JVM可用空间是否超过maxFreeMemory的限制。
继续看MemoryLimitCalculator源码:
核心方法是refresh,更新剩余可用空间;
另有一定时器,以50ms每次的频率调用refresh()方法更新剩余可用空间值。
再来梳理一下作者的思路:
put和offer时,检查是否有剩余足够的可用空间,是则允许添加
从作者的提供的示例来看,还支持动态设置maxFreeMemory的值
这样一比较,好家伙,看起来确实比MemoryLimitedLinkedBlockingQueue从设计上来说要好那么一些
总结
上面的这两个实现类都不依赖第三方框架,其代码直接拿来就用,而且代码也没几行。其实不管是用 Instrumentation 还是 ManagementFactory,本质上都是要限制内存
MemoryLimitedLinkedBlockingQueue算是正向思维,我需要使用队列时不报OOM,就解决队列自身的使用内存问题
MemorySafeLinkedBlockingQueue算是逆向思维,通过感知外界的内存使用情况,始终JVM预留一定大小的可用内存空间,来避免OOM问题。
其实从这两个BlockingQueue的实现类的设计思路,也可以用在我们的项目中呢,比方说使用的各种容器,也可能存在OOM 的风险,我们也可以借鉴上面的思路来解决呀。这次的写文章主要目的不是为了推广这两个类的使用,更重要的是学会拓宽自己的思维,有时换个方向也能取得更好的结果
最后,关于这两个类,其实在实现也有不少的缺陷。比如有大佬提出了自己的看法,比如 Instrumetation.getObjectSize()可能并不能准确的获取对象的实际内存大小,MemoryMXBean 获取的可用内存没有考虑到GC的情况