玖叶教程网

前端编程开发入门

聊聊artemis message的priority

本文主要研究一下artemis message的priority



priority

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

public class CoreMessage extends RefCountMessage implements ICoreMessage {? ? //...... ? protected byte priority;? ? public byte getPriority() { ? ?  return priority; ? }? ? public CoreMessage setPriority(byte priority) { ? ?  this.priority = priority; ? ?  messageChanged(); ? ?  return this; ? }? ? //......}
  • CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法

messageReferences.add

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {? ? //......? ? private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());? ? //...... ? ? ? private synchronized void internalAddTail(final MessageReference ref) { ? ?  refAdded(ref); ? ?  messageReferences.addTail(ref, getPriority(ref)); ? ?  pendingMetrics.incrementMetrics(ref); ? ?  enforceRing(false); ? }? ? private void internalAddHead(final MessageReference ref) { ? ?  queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); ? ?  pendingMetrics.incrementMetrics(ref); ? ?  refAdded(ref);? ? ?  int priority = getPriority(ref);? ? ?  messageReferences.addHead(ref, priority);? ? ?  ref.setInDelivery(false); ? }? ? private void internalAddSorted(final MessageReference ref) { ? ?  queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); ? ?  pendingMetrics.incrementMetrics(ref); ? ?  refAdded(ref);? ? ?  int priority = getPriority(ref);? ? ?  messageReferences.addSorted(ref, priority); ? }? ? private int getPriority(MessageReference ref) { ? ?  try { ? ? ? ? return ref.getMessage().getPriority(); ? ?  } catch (Throwable e) { ? ? ? ? ActiveMQServerLogger.LOGGER.unableToGetMessagePriority(e); ? ? ? ? return 4; // the default one in case of failure ? ?  } ? }? ? //......}
  • QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列

PriorityLinkedList

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java

public interface PriorityLinkedList<T> {? ? void addHead(T t, int priority);? ? void addTail(T t, int priority);? ? void addSorted(T t, int priority);? ? T poll();? ? void clear();? ? /** ?  * Returns the size of this list.<br> ?  * It is safe to be called concurrently. ?  */ ? int size();? ? LinkedListIterator<T> iterator();? ? /** ?  * Returns {@code true} if empty, {@code false} otherwise.<br> ?  * It is safe to be called concurrently. ?  */ ? boolean isEmpty();}
  • PriorityLinkedList接口定义了根据priority的addHead、addTail、addSorted方法,其size以及isEmpty要求是线程安全的

PriorityLinkedListImpl

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java

public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {? ? private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");? ? protected LinkedListImpl<T>[] levels;? ? private volatile int size;? ? private int lastReset;? ? private int highestPriority = -1;? ? private int lastPriority = -1;? ? public PriorityLinkedListImpl(final int priorities) { ? ?  this(priorities, null); ? }?? ? public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) { ? ?  levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);? ? ?  for (int i = 0; i < priorities; i++) { ? ? ? ? levels[i] = new LinkedListImpl<>(comparator); ? ?  } ? }? ? private void checkHighest(final int priority) { ? ?  if (lastPriority != priority || priority > highestPriority) { ? ? ? ? lastPriority = priority; ? ? ? ? if (lastReset == Integer.MAX_VALUE) { ? ? ? ? ?  lastReset = 0; ? ? ? ? } else { ? ? ? ? ?  lastReset++; ? ? ? ? } ? ?  }? ? ?  if (priority > highestPriority) { ? ? ? ? highestPriority = priority; ? ?  } ? }? ? @Override ? public void addHead(final T t, final int priority) { ? ?  checkHighest(priority);? ? ?  levels[priority].addHead(t);? ? ?  exclusiveIncrementSize(1); ? }? ? @Override ? public void addTail(final T t, final int priority) { ? ?  checkHighest(priority);? ? ?  levels[priority].addTail(t);? ? ?  exclusiveIncrementSize(1); ? }? ? @Override ? public void addSorted(T t, int priority) { ? ?  checkHighest(priority);? ? ?  levels[priority].addSorted(t);? ? ?  exclusiveIncrementSize(1); ? }? ? @Override ? public T poll() { ? ?  T t = null;? ? ?  // We are just using a simple prioritization algorithm: ? ?  // Highest priority refs always get returned first. ? ?  // This could cause starvation of lower priority refs.? ? ?  // TODO - A better prioritization algorithm? ? ?  for (int i = highestPriority; i >= 0; i--) { ? ? ? ? LinkedListImpl<T> ll = levels[i];? ? ? ? ? if (ll.size() != 0) { ? ? ? ? ?  t = ll.poll();? ? ? ? ? ?  if (t != null) { ? ? ? ? ? ? ? exclusiveIncrementSize(-1);? ? ? ? ? ? ? ? if (ll.size() == 0) { ? ? ? ? ? ? ? ?  if (highestPriority == i) { ? ? ? ? ? ? ? ? ? ? highestPriority--; ? ? ? ? ? ? ? ?  } ? ? ? ? ? ? ? } ? ? ? ? ?  }? ? ? ? ? ?  break; ? ? ? ? } ? ?  }? ? ?  return t; ? }? ? @Override ? public void clear() { ? ?  for (LinkedListImpl<T> list : levels) { ? ? ? ? list.clear(); ? ?  }? ? ?  exclusiveSetSize(0); ? }? ? private void exclusiveIncrementSize(int amount) { ? ?  SIZE_UPDATER.lazySet(this, this.size + amount); ? }? ? private void exclusiveSetSize(int value) { ? ?  SIZE_UPDATER.lazySet(this, value); ? }? ? @Override ? public int size() { ? ?  return size; ? }? ? @Override ? public boolean isEmpty() { ? ?  return size == 0; ? }? ? @Override ? public LinkedListIterator<T> iterator() { ? ?  return new PriorityLinkedListIterator(); ? }? ? //......}
  • PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;其addHead、addTail、addSorted先执行checkHighest(priority)维护highestPriority,之后调用对应priority的LinkedListImpl的addHead、addTail、addSorted方法,最后调用exclusiveIncrementSize方法递增size;其poll方法会从highestPriority的LinkedListImpl开始poll

小结

CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法;QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列;PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;PriorityLinkedListImpl的addHead、addTail、addSorted均委托给LinkedListImpl类

doc

  • QueueImpl

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言