玖叶教程网

前端编程开发入门

聊聊chronos的pullFromDefaultCFAndPush

本文主要研究一下chronos的pullFromDefaultCFAndPush



pullFromDefaultCFAndPush

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {
?
    //......
?
 ?  public void pullFromDefaultCFAndPush() {
 ? ? ?  final long seekTimestamp = MetaService.getSeekTimestamp();
 ? ? ?  final long zkSeekTimestamp = MetaService.getZkSeekTimestamp();
?
 ? ? ?  // backup的seekTimestamp不能超过master的seekTimestamp
 ? ? ?  if (MasterElection.isBackup()) {
 ? ? ? ? ?  if (seekTimestamp >= zkSeekTimestamp) {
 ? ? ? ? ? ? ?  LOGGER.debug("backup's pull from db should stop for seekTimestamp > zkSeekTimestamp, seekTimestamp:{}, zkSeekTimestamp:{}, Thread:{}",
 ? ? ? ? ? ? ? ? ? ? ?  seekTimestamp, zkSeekTimestamp, Thread.currentThread().getName());
 ? ? ? ? ? ? ?  try {
 ? ? ? ? ? ? ? ? ?  TimeUnit.SECONDS.sleep(2);
 ? ? ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  return;
 ? ? ? ? ?  }
 ? ? ?  }
?
 ? ? ?  // seekTimestamp不能超过当前时间
 ? ? ?  final long now = TsUtils.genTS();
 ? ? ?  if (seekTimestamp > now) {
 ? ? ? ? ?  LOGGER.debug("pull from db should stop for seekTimestamp > now, seekTimestamp:{}, now:{}, Thread:{}",
 ? ? ? ? ? ? ? ? ?  seekTimestamp, now, round, Thread.currentThread().getName());
 ? ? ? ? ?  try {
 ? ? ? ? ? ? ?  TimeUnit.MILLISECONDS.sleep(100);
 ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ?  }
 ? ? ? ? ?  return;
 ? ? ?  }
?
 ? ? ?  round++;
 ? ? ?  final long start = System.currentTimeMillis();
 ? ? ?  final long diff = start / 1000 - seekTimestamp;
 ? ? ?  LOGGER.info("pull from db start, seekTimestamp:{}, currTimestamp:{}, diff:{} round:{}",
 ? ? ? ? ? ? ?  seekTimestamp, start / 1000, diff, round);
 ? ? ?  MetricService.putSeekLatency(MasterElection.getState().toString(), diff + 10); // 因为0上传到metric之后不显示
?
 ? ? ?  // 迭代出当前 seekTimestamp 下所有数据
 ? ? ?  int count = 0;
 ? ? ?  try (RocksIterator it = RDB.newIterator(CFManager.CFH_DEFAULT)) {
 ? ? ? ? ?  for (it.seek(KeyUtils.genSeekKey(seekTimestamp)); it.isValid(); it.next()) {
 ? ? ? ? ? ? ?  final String dMsgId = new String(it.key());
 ? ? ? ? ? ? ?  final InternalKey internalKey = new InternalKey(dMsgId);
?
 ? ? ? ? ? ? ?  //......
?
 ? ? ? ? ? ? ?  boolean needMetricWriteQpsAfterSplit = false;
?
 ? ? ? ? ? ? ?  // 循环消息需要插入一条新的消息, 如果失效, 则不再插入
 ? ? ? ? ? ? ?  if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()
 ? ? ? ? ? ? ? ? ? ? ?  || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
 ? ? ? ? ? ? ? ? ?  final InternalKey nextInternalKey = new InternalKey(internalKey).nextUniqDelayMsgId();
 ? ? ? ? ? ? ? ? ?  if (!KeyUtils.isInvalidMsg(nextInternalKey)) {
 ? ? ? ? ? ? ? ? ? ? ?  batcher.putToDefaultCF(nextInternalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), it.value(), null, nextInternalKey, Actions.ADD.getValue());
 ? ? ? ? ? ? ? ? ? ? ?  needMetricWriteQpsAfterSplit = true;
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ?  byte[] bytes = it.value();
 ? ? ? ? ? ? ?  if (internalKey.getSegmentNum() > 0) {
 ? ? ? ? ? ? ? ? ?  try {
 ? ? ? ? ? ? ? ? ? ? ?  OUTPUT.write(it.value());
 ? ? ? ? ? ? ? ? ? ? ?  LOGGER.info("segment merge, dMsgId:{}, value.len:{}, value.acc.len:{}", internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), it.value().length, OUTPUT.size());
 ? ? ? ? ? ? ? ? ? ? ?  if (internalKey.getSegmentNum() != (internalKey.getSegmentIndex() - Constants.SEGMENT_INDEX_BASE + 1)) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  continue;
 ? ? ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ? ? ? ?  bytes = OUTPUT.toByteArray();
 ? ? ? ? ? ? ? ? ? ? ?  OUTPUT.reset();
 ? ? ? ? ? ? ? ? ?  } catch (IOException e) {
 ? ? ? ? ? ? ? ? ? ? ?  LOGGER.error("error while output.write byte array, msg:{}", e.getMessage(), e);
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ?  // 如果解析不出来, 说明格式有问题, 抛弃掉该条消息, 不阻塞
 ? ? ? ? ? ? ?  final InternalValue internalValue = JsonUtils.fromJsonString(bytes, InternalValue.class);
 ? ? ? ? ? ? ?  if (internalValue == null) {
 ? ? ? ? ? ? ? ? ?  continue;
 ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ?  //......
?
 ? ? ? ? ? ? ?  count++;
?
 ? ? ? ? ? ? ?  try {
 ? ? ? ? ? ? ? ? ?  blockingQueue.put(new InternalPair(internalKey, internalValue));
 ? ? ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ? ? ? ?  LOGGER.error("error while put to blockingQueue, dMsgId:{}", dMsgId);
 ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ?  if (count % INTERNAL_PAIR_COUNT == 0) {
 ? ? ? ? ? ? ? ? ?  sendConcurrent(blockingQueue, round);
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
?
 ? ? ? ? ?  sendConcurrent(blockingQueue, round);
 ? ? ?  }
?
 ? ? ?  needCancelMap.forEach((uniqDelayMsgId, tombstoneKey) -> {
 ? ? ? ? ?  final InternalKey internalKey = new InternalKey(uniqDelayMsgId);
 ? ? ? ? ?  final InternalKey tombstoneInternalKey = new InternalKey(tombstoneKey);
?
 ? ? ? ? ?  // 残留的循环消息取消需要重新添加进去, 否则会删除不掉
 ? ? ? ? ?  if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()
 ? ? ? ? ? ? ? ? ?  || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
 ? ? ? ? ? ? ?  final InternalKey nextTombstoneKey = tombstoneInternalKey.nextUniqDelayMsgId();
 ? ? ? ? ? ? ?  final InternalKey nextInternalKey = internalKey.nextUniqDelayMsgId();
 ? ? ? ? ? ? ?  if (!KeyUtils.isInvalidMsg(nextTombstoneKey)) {
 ? ? ? ? ? ? ? ? ?  String topic = needCancelTopicMap.get(uniqDelayMsgId);
 ? ? ? ? ? ? ? ? ?  batcher.putToDefaultCF(nextTombstoneKey.genUniqDelayMsgId(),
 ? ? ? ? ? ? ? ? ? ? ? ? ?  new CancelWrap(nextInternalKey.genUniqDelayMsgId(), topic).toJsonString(),
 ? ? ? ? ? ? ? ? ? ? ? ? ?  topic, nextInternalKey, Actions.CANCEL.getValue());
?
 ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ?  LOGGER.info("pull from db succ cancel message of tombstone key, tombstone dMsgId:{}",
 ? ? ? ? ? ? ? ? ? ? ? ? ?  nextTombstoneKey.genUniqDelayMsgId());
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  });
 ? ? ?  batcher.flush();
?
 ? ? ?  needCancelMap.clear();
 ? ? ?  needCancelTopicMap.clear();
?
 ? ? ?  // 更新offset
 ? ? ?  MetaService.nextSeekTimestamp();
?
 ? ? ?  LOGGER.info("pull from db finish push, pushCost:{}ms, count:{}, seekTimestamp:{}, round:{}",
 ? ? ? ? ? ? ?  System.currentTimeMillis() - start, count, seekTimestamp, round);
 ?  }
?
 ?  //......
}
  • pullFromDefaultCFAndPush方法先从metaService获取seekTimestamp及zkSeekTimestamp,若seekTimestamp超过当前时间则提前返回;之后从RDB.newIterator(CFManager.CFH_DEFAULT)获取RocksIterator进行遍历,读取dMsgId构造internalKey,若其type是LOOP_DELAY或者LOOP_EXPONENT_DELAY则通过batcher.putToDefaultCF重新放入rocksdb;之后读取it.value()构造internalValue,紧接着构造InternalPair放入到blockingQueue,之后在count % INTERNAL_PAIR_COUNT == 0时执行sendConcurrent,在循环结束之后再执行一次sendConcurrent;最后更新MetaService.nextSeekTimestamp()

sendConcurrent

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {
?
    //......
?
 ?  private void sendConcurrent(final BlockingQueue<InternalPair> blockingQueue, final long round) {
 ? ? ?  if (blockingQueue.size() == 0) {
 ? ? ? ? ?  LOGGER.info("pull from db sendConcurrent start, return for no message to send, round:{}", round);
 ? ? ? ? ?  return;
 ? ? ?  }
?
 ? ? ?  final long sendCount = blockingQueue.size();
 ? ? ?  LOGGER.info("pull from db sendConcurrent start, send count:{}, round:{}", sendCount, round);
 ? ? ?  final long start = System.currentTimeMillis();
 ? ? ?  final CountDownLatch cdl = new CountDownLatch(blockingQueue.size());
 ? ? ?  InternalPair internalPair;
 ? ? ?  while ((internalPair = blockingQueue.poll()) != null) {
 ? ? ? ? ?  final InternalPair immutableInternalPair = internalPair;
 ? ? ? ? ?  pushThreadPool.execute(() -> {
 ? ? ? ? ? ? ?  while (!send(
 ? ? ? ? ? ? ? ? ? ? ?  immutableInternalPair.getInternalValue().getTopic(),
 ? ? ? ? ? ? ? ? ? ? ?  immutableInternalPair.getInternalValue().getBody().getBytes(Charsets.UTF_8),
 ? ? ? ? ? ? ? ? ? ? ?  immutableInternalPair.getInternalKey(),
 ? ? ? ? ? ? ? ? ? ? ?  immutableInternalPair.getInternalValue().getTags(),
 ? ? ? ? ? ? ? ? ? ? ?  immutableInternalPair.getInternalValue().getProperties(),
 ? ? ? ? ? ? ? ? ? ? ?  false)) {
 ? ? ? ? ? ? ? ? ?  try {
 ? ? ? ? ? ? ? ? ? ? ?  TimeUnit.MILLISECONDS.sleep(100);
 ? ? ? ? ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  cdl.countDown();
 ? ? ? ? ?  });
 ? ? ?  }
?
 ? ? ?  try {
 ? ? ? ? ?  cdl.await();
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ?  e.printStackTrace();
 ? ? ?  }
 ? ? ?  final long cost = System.currentTimeMillis() - start;
 ? ? ?  LOGGER.info("pull from db sendConcurrent end, send count:{}, round:{}, cost:{}ms", sendCount, round, cost);
 ?  }
?
 ?  //......
}
  • sendConcurrent方法会执行blockingQueue.poll(),然后执行send方法

send

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {
?
    //......
?
 ?  private boolean send(final String topic, final byte[] body, final InternalKey internalKey, final String tags, final Map<String, String> properties,
 ? ? ? ? ? ? ? ? ? ? ?  final boolean direct) {
 ? ? ?  final long start = System.nanoTime();
 ? ? ?  final String key = internalKey.genUniqDelayMsgId();
 ? ? ?  MetricMsgType metricMsgType;
?
 ? ? ?  if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
 ? ? ? ? ?  metricMsgType = MetricMsgType.DELAY;
 ? ? ?  } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
 ? ? ? ? ?  metricMsgType = MetricMsgType.LOOP_DELAY;
 ? ? ?  } else {
 ? ? ? ? ?  metricMsgType = MetricMsgType.UNKNOWN;
 ? ? ?  }
?
 ? ? ?  int len = 0;
 ? ? ?  if (body != null) {
 ? ? ? ? ?  len = body.length;
 ? ? ?  }
?
 ? ? ?  if (MasterElection.isBackup()) {
 ? ? ? ? ?  if (direct) {
 ? ? ? ? ? ? ?  LOGGER.info("succ send message(but cancel for backup) directly, topic:{}, dMsgId:{}, len:{}", topic, key, len);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.BACKUP);
 ? ? ? ? ?  } else {
 ? ? ? ? ? ? ?  LOGGER.info("succ send message(but cancel for backup) from db, topic:{}, dMsgId:{}, len:{}", topic, key, len);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.BACKUP);
 ? ? ? ? ?  }
 ? ? ? ? ?  return true;
 ? ? ?  }
?
 ? ? ?  if (ConfigManager.getConfig().isFakeSend()) {
 ? ? ? ? ?  try {
 ? ? ? ? ? ? ?  TimeUnit.MILLISECONDS.sleep(1);
 ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ?  }
 ? ? ? ? ?  if (direct) {
 ? ? ? ? ? ? ?  LOGGER.info("succ send message directly(fakeSend), topic:{}, dMsgId:{}, len:{}", topic, key, len);
 ? ? ? ? ?  } else {
 ? ? ? ? ? ? ?  LOGGER.info("succ send message from db(fakeSend), topic:{}, dMsgId:{}, len:{}", topic, key, len);
 ? ? ? ? ?  }
 ? ? ? ? ?  return true;
 ? ? ?  }
?
 ? ? ?  MessageBuilder messageBuilder = producer.messageBuilder().setTopic(topic).setBody(body).setKey(key).setTags(tags).setRandomPartition();
 ? ? ?  if (properties != null && properties.size() > 0) {
 ? ? ? ? ?  for (Map.Entry<String, String> entry : properties.entrySet()) {
 ? ? ? ? ? ? ?  LOGGER.debug("properties, topic:{}, dMsgId:{}, key:{}, value:{}", topic, key, entry.getKey(), entry.getValue());
 ? ? ? ? ? ? ?  // IMPORTANT: If use addProperty for isPressureTraffic, the property will be ignored
 ? ? ? ? ? ? ?  if (PRESSURE_TRAFFIC_KEY.equals(entry.getKey())) {
 ? ? ? ? ? ? ? ? ?  messageBuilder.setPressureTraffic(Boolean.parseBoolean(entry.getValue()));
 ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ?  messageBuilder.addProperty(entry.getKey(), entry.getValue());
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  }
 ? ? ?  messageBuilder.addProperty(PROPERTY_KEY_FROM_CHRONOS, PROPERTY_KEY_FROM_CHRONOS);
?
 ? ? ?  final Result result = messageBuilder.send();
 ? ? ?  final long cost = (System.nanoTime() - start) / 1000;
 ? ? ?  MetricService.putPushLatency(topic, cost);
?
 ? ? ?  if (result.getCode() == CarreraReturnCode.OK) {
 ? ? ? ? ?  if (direct) {
 ? ? ? ? ? ? ?  LOGGER.info("succ send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.OK);
 ? ? ? ? ?  } else {
 ? ? ? ? ? ? ?  LOGGER.info("succ send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.OK);
 ? ? ? ? ?  }
 ? ? ? ? ?  return true;
 ? ? ?  } else if (result.getCode() == CarreraReturnCode.FAIL_TOPIC_NOT_EXIST
 ? ? ? ? ? ? ?  || result.getCode() == CarreraReturnCode.FAIL_TOPIC_NOT_ALLOWED
 ? ? ? ? ? ? ?  || result.getCode() == CarreraReturnCode.FAIL_ILLEGAL_MSG
 ? ? ? ? ? ? ?  || result.getCode() == CarreraReturnCode.MISSING_PARAMETERS) {
 ? ? ? ? ?  if (direct) {
 ? ? ? ? ? ? ?  LOGGER.error("fail send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.FAIL);
 ? ? ? ? ?  } else {
 ? ? ? ? ? ? ?  LOGGER.error("fail send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.FAIL);
 ? ? ? ? ?  }
 ? ? ? ? ?  return true;
 ? ? ?  } else {
 ? ? ? ? ?  if (direct) {
 ? ? ? ? ? ? ?  LOGGER.error("error while send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.FAIL);
 ? ? ? ? ?  } else {
 ? ? ? ? ? ? ?  LOGGER.error("error while send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
 ? ? ? ? ? ? ?  MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.FAIL);
 ? ? ? ? ?  }
 ? ? ? ? ?  return false;
 ? ? ?  }
 ?  }
?
 ?  //......
}
  • send方法主要是构造messageBuilder,然后执行messageBuilder.send()

小结

pullFromDefaultCFAndPush方法先从metaService获取seekTimestamp及zkSeekTimestamp,若seekTimestamp超过当前时间则提前返回;之后从RDB.newIterator(CFManager.CFH_DEFAULT)获取RocksIterator进行遍历,读取dMsgId构造internalKey,若其type是LOOP_DELAY或者LOOP_EXPONENT_DELAY则通过batcher.putToDefaultCF重新放入rocksdb;之后读取it.value()构造internalValue,紧接着构造InternalPair放入到blockingQueue,之后在count % INTERNAL_PAIR_COUNT == 0时执行sendConcurrent,在循环结束之后再执行一次sendConcurrent;最后更新MetaService.nextSeekTimestamp()

doc

  • carrera-chronos

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言