序
本文主要研究一下artemis message的priority
priority
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
public class CoreMessage extends RefCountMessage implements ICoreMessage {? ? //...... ? protected byte priority;? ? public byte getPriority() { ? ? return priority; ? }? ? public CoreMessage setPriority(byte priority) { ? ? this.priority = priority; ? ? messageChanged(); ? ? return this; ? }? ? //......}
- CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法
messageReferences.add
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
public class QueueImpl extends CriticalComponentImpl implements Queue {? ? //......? ? private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());? ? //...... ? ? ? private synchronized void internalAddTail(final MessageReference ref) { ? ? refAdded(ref); ? ? messageReferences.addTail(ref, getPriority(ref)); ? ? pendingMetrics.incrementMetrics(ref); ? ? enforceRing(false); ? }? ? private void internalAddHead(final MessageReference ref) { ? ? queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); ? ? pendingMetrics.incrementMetrics(ref); ? ? refAdded(ref);? ? ? int priority = getPriority(ref);? ? ? messageReferences.addHead(ref, priority);? ? ? ref.setInDelivery(false); ? }? ? private void internalAddSorted(final MessageReference ref) { ? ? queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); ? ? pendingMetrics.incrementMetrics(ref); ? ? refAdded(ref);? ? ? int priority = getPriority(ref);? ? ? messageReferences.addSorted(ref, priority); ? }? ? private int getPriority(MessageReference ref) { ? ? try { ? ? ? ? return ref.getMessage().getPriority(); ? ? } catch (Throwable e) { ? ? ? ? ActiveMQServerLogger.LOGGER.unableToGetMessagePriority(e); ? ? ? ? return 4; // the default one in case of failure ? ? } ? }? ? //......}
- QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列
PriorityLinkedList
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
public interface PriorityLinkedList<T> {? ? void addHead(T t, int priority);? ? void addTail(T t, int priority);? ? void addSorted(T t, int priority);? ? T poll();? ? void clear();? ? /** ? * Returns the size of this list.<br> ? * It is safe to be called concurrently. ? */ ? int size();? ? LinkedListIterator<T> iterator();? ? /** ? * Returns {@code true} if empty, {@code false} otherwise.<br> ? * It is safe to be called concurrently. ? */ ? boolean isEmpty();}
- PriorityLinkedList接口定义了根据priority的addHead、addTail、addSorted方法,其size以及isEmpty要求是线程安全的
PriorityLinkedListImpl
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {? ? private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");? ? protected LinkedListImpl<T>[] levels;? ? private volatile int size;? ? private int lastReset;? ? private int highestPriority = -1;? ? private int lastPriority = -1;? ? public PriorityLinkedListImpl(final int priorities) { ? ? this(priorities, null); ? }?? ? public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) { ? ? levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);? ? ? for (int i = 0; i < priorities; i++) { ? ? ? ? levels[i] = new LinkedListImpl<>(comparator); ? ? } ? }? ? private void checkHighest(final int priority) { ? ? if (lastPriority != priority || priority > highestPriority) { ? ? ? ? lastPriority = priority; ? ? ? ? if (lastReset == Integer.MAX_VALUE) { ? ? ? ? ? lastReset = 0; ? ? ? ? } else { ? ? ? ? ? lastReset++; ? ? ? ? } ? ? }? ? ? if (priority > highestPriority) { ? ? ? ? highestPriority = priority; ? ? } ? }? ? @Override ? public void addHead(final T t, final int priority) { ? ? checkHighest(priority);? ? ? levels[priority].addHead(t);? ? ? exclusiveIncrementSize(1); ? }? ? @Override ? public void addTail(final T t, final int priority) { ? ? checkHighest(priority);? ? ? levels[priority].addTail(t);? ? ? exclusiveIncrementSize(1); ? }? ? @Override ? public void addSorted(T t, int priority) { ? ? checkHighest(priority);? ? ? levels[priority].addSorted(t);? ? ? exclusiveIncrementSize(1); ? }? ? @Override ? public T poll() { ? ? T t = null;? ? ? // We are just using a simple prioritization algorithm: ? ? // Highest priority refs always get returned first. ? ? // This could cause starvation of lower priority refs.? ? ? // TODO - A better prioritization algorithm? ? ? for (int i = highestPriority; i >= 0; i--) { ? ? ? ? LinkedListImpl<T> ll = levels[i];? ? ? ? ? if (ll.size() != 0) { ? ? ? ? ? t = ll.poll();? ? ? ? ? ? if (t != null) { ? ? ? ? ? ? ? exclusiveIncrementSize(-1);? ? ? ? ? ? ? ? if (ll.size() == 0) { ? ? ? ? ? ? ? ? if (highestPriority == i) { ? ? ? ? ? ? ? ? ? ? highestPriority--; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } ? ? ? ? ? }? ? ? ? ? ? break; ? ? ? ? } ? ? }? ? ? return t; ? }? ? @Override ? public void clear() { ? ? for (LinkedListImpl<T> list : levels) { ? ? ? ? list.clear(); ? ? }? ? ? exclusiveSetSize(0); ? }? ? private void exclusiveIncrementSize(int amount) { ? ? SIZE_UPDATER.lazySet(this, this.size + amount); ? }? ? private void exclusiveSetSize(int value) { ? ? SIZE_UPDATER.lazySet(this, value); ? }? ? @Override ? public int size() { ? ? return size; ? }? ? @Override ? public boolean isEmpty() { ? ? return size == 0; ? }? ? @Override ? public LinkedListIterator<T> iterator() { ? ? return new PriorityLinkedListIterator(); ? }? ? //......}
- PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;其addHead、addTail、addSorted先执行checkHighest(priority)维护highestPriority,之后调用对应priority的LinkedListImpl的addHead、addTail、addSorted方法,最后调用exclusiveIncrementSize方法递增size;其poll方法会从highestPriority的LinkedListImpl开始poll
小结
CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法;QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列;PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;PriorityLinkedListImpl的addHead、addTail、addSorted均委托给LinkedListImpl类
doc
- QueueImpl