欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊elasticsearch的RoutingService

发布时间:2025/7/14 编程问答 31 豆豆
生活随笔 收集整理的这篇文章主要介绍了 聊聊elasticsearch的RoutingService 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

为什么80%的码农都做不了架构师?>>>   

本文主要研究一下elasticsearch的RoutingService

RoutingService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

public class RoutingService extends AbstractLifecycleComponent {private static final Logger logger = LogManager.getLogger(RoutingService.class);private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";private final ClusterService clusterService;private final AllocationService allocationService;private AtomicBoolean rerouting = new AtomicBoolean();@Injectpublic RoutingService(ClusterService clusterService, AllocationService allocationService) {this.clusterService = clusterService;this.allocationService = allocationService;}@Overrideprotected void doStart() {}@Overrideprotected void doStop() {}@Overrideprotected void doClose() {}/*** Initiates a reroute.*/public final void reroute(String reason) {try {if (lifecycle.stopped()) {return;}if (rerouting.compareAndSet(false, true) == false) {logger.trace("already has pending reroute, ignoring {}", reason);return;}logger.trace("rerouting {}", reason);clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",new ClusterStateUpdateTask(Priority.HIGH) {@Overridepublic ClusterState execute(ClusterState currentState) {rerouting.set(false);return allocationService.reroute(currentState, reason);}@Overridepublic void onNoLongerMaster(String source) {rerouting.set(false);// no biggie}@Overridepublic void onFailure(String source, Exception e) {rerouting.set(false);ClusterState state = clusterService.state();if (logger.isTraceEnabled()) {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",source, state), e);} else {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",source, state.version()), e);}}});} catch (Exception e) {rerouting.set(false);ClusterState state = clusterService.state();logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);}} }
  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute

AllocationService.reroute

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

public class AllocationService {//....../*** Reroutes the routing table based on the live nodes.* <p>* If the same instance of ClusterState is returned, then no change has been made.*/public ClusterState reroute(ClusterState clusterState, String reason) {return reroute(clusterState, reason, false);}/*** Reroutes the routing table based on the live nodes.* <p>* If the same instance of ClusterState is returned, then no change has been made.*/protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);// shuffle the unassigned nodes, just so we won't have things like poison failed shardsroutingNodes.unassigned().shuffle();RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,clusterInfoService.getClusterInfo(), currentNanoTime());allocation.debugDecision(debug);reroute(allocation);if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {return clusterState;}return buildResultAndLogHealthChange(clusterState, allocation, reason);}private void reroute(RoutingAllocation allocation) {assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :"auto-expand replicas out of sync with number of nodes in the cluster";// now allocate all the unassigned to available nodesif (allocation.routingNodes().unassigned().size() > 0) {removeDelayMarkers(allocation);gatewayAllocator.allocateUnassigned(allocation);}shardsAllocator.allocate(allocation);assert RoutingNodes.assertShardStats(allocation.routingNodes());}//...... }
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)

BalancedShardsAllocator.allocate

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

public class BalancedShardsAllocator implements ShardsAllocator {//......public void allocate(RoutingAllocation allocation) {if (allocation.routingNodes().size() == 0) {/* with no nodes this is pointless */return;}final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);balancer.allocateUnassigned();balancer.moveShards();balancer.balance();}//...... }
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

小结

  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

doc

  • RoutingService

转载于:https://my.oschina.net/go4it/blog/3049600

总结

以上是生活随笔为你收集整理的聊聊elasticsearch的RoutingService的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。