玖叶教程网

前端编程开发入门

聊聊elasticsearch的SeedHostsResolver

本文主要研究一下elasticsearch的SeedHostsResolver

ConfiguredHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

 public interface ConfiguredHostsResolver {
 /**
 * Attempt to resolve the configured unicast hosts list to a list of transport addresses.
 *
 * @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in
 * progress.
 */
 void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer);
 }
  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {
 public static final Setting<Integer> LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
 Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope,
 Setting.Property.Deprecated);
 public static final Setting<TimeValue> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
 Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5),
 Setting.Property.NodeScope, Setting.Property.Deprecated);
 public static final Setting<Integer> DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING =
 Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Setting.Property.NodeScope);
 public static final Setting<TimeValue> DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING =
 Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);
?
 private static final Logger logger = LogManager.getLogger(SeedHostsResolver.class);
?
 private final Settings settings;
 private final AtomicBoolean resolveInProgress = new AtomicBoolean();
 private final TransportService transportService;
 private final SeedHostsProvider hostsProvider;
 private final SetOnce<ExecutorService> executorService = new SetOnce<>();
 private final TimeValue resolveTimeout;
 private final String nodeName;
 private final int concurrentConnects;
?
 public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService,
 SeedHostsProvider seedProvider) {
 this.settings = settings;
 this.nodeName = nodeName;
 this.transportService = transportService;
 this.hostsProvider = seedProvider;
 resolveTimeout = getResolveTimeout(settings);
 concurrentConnects = getMaxConcurrentResolvers(settings);
 }
?
 public static int getMaxConcurrentResolvers(Settings settings) {
 if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) {
 if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) {
 throw new IllegalArgumentException("it is forbidden to set both ["
 + DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and ["
 + LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]");
 }
 return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
 }
 return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings);
 }
?
 public static TimeValue getResolveTimeout(Settings settings) {
 if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) {
 if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) {
 throw new IllegalArgumentException("it is forbidden to set both ["
 + DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and ["
 + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]");
 }
 return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
 }
 return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings);
 }
?
 /**
 * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of
 * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up
 * to the specified resolve timeout.
 *
 * @param executorService the executor service used to parallelize hostname lookups
 * @param logger logger used for logging messages regarding hostname lookups
 * @param hosts the hosts to resolve
 * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
 * @param transportService the transport service
 * @param resolveTimeout the timeout before returning from hostname lookups
 * @return a list of resolved transport addresses
 */
 public static List<TransportAddress> resolveHostsLists(
 final ExecutorService executorService,
 final Logger logger,
 final List<String> hosts,
 final int limitPortCounts,
 final TransportService transportService,
 final TimeValue resolveTimeout) {
 Objects.requireNonNull(executorService);
 Objects.requireNonNull(logger);
 Objects.requireNonNull(hosts);
 Objects.requireNonNull(transportService);
 Objects.requireNonNull(resolveTimeout);
 if (resolveTimeout.nanos() < 0) {
 throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
 }
 // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
 final List<Callable<TransportAddress[]>> callables =
 hosts
 .stream()
 .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
 .collect(Collectors.toList());
 final List<Future<TransportAddress[]>> futures;
 try {
 futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 return Collections.emptyList();
 }
 final List<TransportAddress> transportAddresses = new ArrayList<>();
 final Set<TransportAddress> localAddresses = new HashSet<>();
 localAddresses.add(transportService.boundAddress().publishAddress());
 localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
 // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
 // hostname with the corresponding task by iterating together
 final Iterator<String> it = hosts.iterator();
 for (final Future<TransportAddress[]> future : futures) {
 final String hostname = it.next();
 if (!future.isCancelled()) {
 assert future.isDone();
 try {
 final TransportAddress[] addresses = future.get();
 logger.trace("resolved host [{}] to {}", hostname, addresses);
 for (int addressId = 0; addressId < addresses.length; addressId++) {
 final TransportAddress address = addresses[addressId];
 // no point in pinging ourselves
 if (localAddresses.contains(address) == false) {
 transportAddresses.add(address);
 }
 }
 } catch (final ExecutionException e) {
 assert e.getCause() != null;
 final String message = "failed to resolve host [" + hostname + "]";
 logger.warn(message, e.getCause());
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 // ignore
 }
 } else {
 logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
 }
 }
 return Collections.unmodifiableList(transportAddresses);
 }
?
 @Override
 protected void doStart() {
 logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]", concurrentConnects, resolveTimeout);
 final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]");
 executorService.set(EsExecutors.newScaling(nodeName + "/" + "unicast_configured_hosts_resolver",
 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext()));
 }
?
 @Override
 protected void doStop() {
 ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
 }
?
 @Override
 protected void doClose() {
 }
?
 @Override
 public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
 if (lifecycle.started() == false) {
 logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle);
 return;
 }
?
 if (resolveInProgress.compareAndSet(false, true)) {
 transportService.getThreadPool().generic().execute(new AbstractRunnable() {
 @Override
 public void onFailure(Exception e) {
 logger.debug("failure when resolving unicast hosts list", e);
 }
?
 @Override
 protected void doRun() {
 if (lifecycle.started() == false) {
 logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle);
 return;
 }
?
 List<TransportAddress> providedAddresses
 = hostsProvider.getSeedAddresses((hosts, limitPortCounts)
 -> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
 transportService, resolveTimeout));
?
 consumer.accept(providedAddresses);
 }
?
 @Override
 public void onAfter() {
 resolveInProgress.set(false);
 }
?
 @Override
 public String toString() {
 return "SeedHostsResolver resolving unicast hosts list";
 }
 });
 }
 }
}
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池
  • resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

小结

  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池;resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

doc

  • SeedHostsResolver

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言