日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊elasticsearch的RoutingService

發布時間:2025/7/14 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊elasticsearch的RoutingService 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

為什么80%的碼農都做不了架構師?>>> ??

本文主要研究一下elasticsearch的RoutingService

RoutingService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

public class RoutingService extends AbstractLifecycleComponent {private static final Logger logger = LogManager.getLogger(RoutingService.class);private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";private final ClusterService clusterService;private final AllocationService allocationService;private AtomicBoolean rerouting = new AtomicBoolean();@Injectpublic RoutingService(ClusterService clusterService, AllocationService allocationService) {this.clusterService = clusterService;this.allocationService = allocationService;}@Overrideprotected void doStart() {}@Overrideprotected void doStop() {}@Overrideprotected void doClose() {}/*** Initiates a reroute.*/public final void reroute(String reason) {try {if (lifecycle.stopped()) {return;}if (rerouting.compareAndSet(false, true) == false) {logger.trace("already has pending reroute, ignoring {}", reason);return;}logger.trace("rerouting {}", reason);clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",new ClusterStateUpdateTask(Priority.HIGH) {@Overridepublic ClusterState execute(ClusterState currentState) {rerouting.set(false);return allocationService.reroute(currentState, reason);}@Overridepublic void onNoLongerMaster(String source) {rerouting.set(false);// no biggie}@Overridepublic void onFailure(String source, Exception e) {rerouting.set(false);ClusterState state = clusterService.state();if (logger.isTraceEnabled()) {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",source, state), e);} else {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",source, state.version()), e);}}});} catch (Exception e) {rerouting.set(false);ClusterState state = clusterService.state();logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);}} }
  • RoutingService的構造器要求輸入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托給allocationService.reroute

AllocationService.reroute

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

public class AllocationService {//....../*** Reroutes the routing table based on the live nodes.* <p>* If the same instance of ClusterState is returned, then no change has been made.*/public ClusterState reroute(ClusterState clusterState, String reason) {return reroute(clusterState, reason, false);}/*** Reroutes the routing table based on the live nodes.* <p>* If the same instance of ClusterState is returned, then no change has been made.*/protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);// shuffle the unassigned nodes, just so we won't have things like poison failed shardsroutingNodes.unassigned().shuffle();RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,clusterInfoService.getClusterInfo(), currentNanoTime());allocation.debugDecision(debug);reroute(allocation);if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {return clusterState;}return buildResultAndLogHealthChange(clusterState, allocation, reason);}private void reroute(RoutingAllocation allocation) {assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :"auto-expand replicas out of sync with number of nodes in the cluster";// now allocate all the unassigned to available nodesif (allocation.routingNodes().unassigned().size() > 0) {removeDelayMarkers(allocation);gatewayAllocator.allocateUnassigned(allocation);}shardsAllocator.allocate(allocation);assert RoutingNodes.assertShardStats(allocation.routingNodes());}//...... }
  • AllocationService的reroute方法主要是構建RoutingAllocation,然后在進行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)

BalancedShardsAllocator.allocate

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

public class BalancedShardsAllocator implements ShardsAllocator {//......public void allocate(RoutingAllocation allocation) {if (allocation.routingNodes().size() == 0) {/* with no nodes this is pointless */return;}final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);balancer.allocateUnassigned();balancer.moveShards();balancer.balance();}//...... }
  • BalancedShardsAllocator的allocate方法,則創建Balancer,然后執行Balancer的allocateUnassigned()、moveShards()、balance()方法

小結

  • RoutingService的構造器要求輸入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托給allocationService.reroute
  • AllocationService的reroute方法主要是構建RoutingAllocation,然后在進行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
  • BalancedShardsAllocator的allocate方法,則創建Balancer,然后執行Balancer的allocateUnassigned()、moveShards()、balance()方法

doc

  • RoutingService

轉載于:https://my.oschina.net/go4it/blog/3049600

總結

以上是生活随笔為你收集整理的聊聊elasticsearch的RoutingService的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。