日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Copycat - 状态

發布時間:2023/12/18 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Copycat - 状态 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Member.Status

status的變遷是源于heartbeat

heartbeat,append空的entries

/*** Triggers a heartbeat to a majority of the cluster.* <p>* For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be* created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()* call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever* result in a single AppendRequest to each follower at any given time, and the returned future will be* shared by all concurrent calls.** @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.*/public CompletableFuture<Long> appendEntries() {// If there are no other active members in the cluster, simply complete the append operation.if (context.getClusterState().getRemoteMemberStates().isEmpty())return CompletableFuture.completedFuture(null);// If no heartbeat future already exists, that indicates there's no heartbeat currently under way.// Create a new heartbeat future and commit to all members in the cluster.if (heartbeatFuture == null) {CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();heartbeatFuture = newHeartbeatFuture;heartbeatTime = System.currentTimeMillis();for (MemberState member : context.getClusterState().getRemoteMemberStates()) {appendEntries(member); // 對所有member發起appendEntries}return newHeartbeatFuture;}

heartbeat的邏輯是會向所有的getRemoteMemberStates,發起heartbeat

?

AVAILABLE

在初始化的時候,每個ServerMember默認是Status.AVAILABLE

public final class ServerMember implements Member, CatalystSerializable, AutoCloseable {private Member.Type type;private Status status = Status.AVAILABLE;

?

LeaderAppender

@Overrideprotected void succeedAttempt(MemberState member) {super.succeedAttempt(member);// If the member is currently marked as UNAVAILABLE, change its status to AVAILABLE and update the configuration.if (member.getMember().status() == ServerMember.Status.UNAVAILABLE && !leader.configuring()) {member.getMember().update(ServerMember.Status.AVAILABLE, Instant.now());leader.configure(context.getCluster().members());}}

?

在succeedAttempt里面會將unavailable轉換成available;在super.succeedAttempt中會將fail count清空

?

這個當收到AppendResponseOk的時候會調用,

protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {// Reset the member failure count and update the member's availability status if necessary.succeedAttempt(member);

leader的心跳是通過空AppendResponse實現的,所以可以收到ResponseOK,說明member是available的

?

UNAVAILABLE

在fail Attempt中被調用

@Overrideprotected void failAttempt(MemberState member, Throwable error) {super.failAttempt(member, error);// Verify that the leader has contacted a majority of the cluster within the last two election timeouts.// If the leader is not able to contact a majority of the cluster within two election timeouts, assume// that a partition occurred and transition back to the FOLLOWER state.if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());context.setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}// If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so.else if (member.getFailureCount() >= 3) {// If the member is currently marked as AVAILABLE, change its status to UNAVAILABLE and update the configuration.if (member.getMember().status() == ServerMember.Status.AVAILABLE && !leader.configuring()) {member.getMember().update(ServerMember.Status.UNAVAILABLE, Instant.now());leader.configure(context.getCluster().members());}}}

super.failAttempt中,會重置connection,和increase failcount

member.incrementFailureCount();

?

第一個判斷Math.max(heartbeatTime(), leaderTime)

heartbeatTime

/*** Returns the last time a majority of the cluster was contacted.* <p>* This is calculated by sorting the list of active members and getting the last time the majority of* the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE* members, index 1 (the second member) will be used to determine the commit time in a sorted members list.*/private long heartbeatTime() {int quorumIndex = quorumIndex();if (quorumIndex >= 0) {return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();}return System.currentTimeMillis();}

這個意思將ActiveMember按heartbeat排序,然后取出quorumIndex的heartbeat,即多數派中最早的heartbeat
如果leader收到的有效heartbeat達不到多數派,說明發生腦裂

這時,leader會退化成follower

?

第二個判斷,當一個member的failcount>3,就把他標記為UNAVAILABLE

?

而failAttempt,會在各種fail response里面被調用

AbstractAppender handleAppendRequestFailure, handleAppendResponseFailure, handleConfigureRequestFailure, handleInstallRequestFailure

?

?

CopycatServer.State

?

public enum State {/*** Represents the state of an inactive server.* <p>* All servers start in this state and return to this state when {@link #leave() stopped}.*/INACTIVE,/*** Represents the state of a server that is a reserve member of the cluster.* <p>* Reserve servers only receive notification of leader, term, and configuration changes.*/RESERVE,/*** Represents the state of a server in the process of catching up its log.* <p>* Upon successfully joining an existing cluster, the server will transition to the passive state and remain there* until the leader determines that the server has caught up enough to be promoted to a full member.*/PASSIVE,/*** Represents the state of a server participating in normal log replication.* <p>* The follower state is a standard Raft state in which the server receives replicated log entries from the leader.*/FOLLOWER,/*** Represents the state of a server attempting to become the leader.* <p>* When a server in the follower state fails to receive communication from a valid leader for some time period,* the follower will transition to the candidate state. During this period, the candidate requests votes from* each of the other servers in the cluster. If the candidate wins the election by receiving votes from a majority* of the cluster, it will transition to the leader state.*/CANDIDATE,/*** Represents the state of a server which is actively coordinating and replicating logs with other servers.* <p>* Leaders are responsible for handling and replicating writes from clients. Note that more than one leader can* exist at any given time, but Raft guarantees that no two leaders will exist for the same {@link Cluster#term()}.*/LEADER}

?

在serverContext初始化的時候,state為Inactive

public class ServerContext implements AutoCloseable {//......protected ServerState state = new InactiveState(this);

?

比較tricky的是,在Member里面有,

enum Type {/*** Represents an inactive member.* <p>* The {@code INACTIVE} member type represents a member which does not participate in any communication* and is not an active member of the cluster. This is typically the state of a member prior to joining* or after leaving a cluster.*/INACTIVE,/*** Represents a member which does not participate in replication.* <p>* The {@code RESERVE} member type is representative of a member that does not participate in any* replication of state but only maintains contact with the cluster leader and is an active member* of the {@link Cluster}. Typically, reserve members act as standby nodes which can be* {@link #promote() promoted} to a {@link #PASSIVE} or {@link #ACTIVE} role when needed.*/RESERVE,/*** Represents a member which participates in asynchronous replication but does not vote in elections* or otherwise participate in the Raft consensus algorithm.* <p>* The {@code PASSIVE} member type is representative of a member that receives state changes from* follower nodes asynchronously. As state changes are committed via the {@link #ACTIVE} Raft nodes,* committed state changes are asynchronously replicated by followers to passive members. This allows* passive members to maintain nearly up-to-date state with minimal impact on the performance of the* Raft algorithm itself, and allows passive members to be quickly promoted to {@link #ACTIVE} voting* members if necessary.*/PASSIVE,/*** Represents a full voting member of the Raft cluster which participates fully in leader election* and replication algorithms.* <p>* The {@code ACTIVE} member type represents a full voting member of the Raft cluster. Active members* participate in the Raft leader election and replication algorithms and can themselves be elected* leaders.*/ACTIVE,}

看看不同,這里面有Active,而State里面沒有

除此state包含type;

意思是,memeber可以是inactive,reserve,passive和active

當member是inactive,reserve,passive時,那么server的state也和其相應

當member是active時,那么server的state,可能是follower,candidator或leader其中之一

?

在CopycatServer.builder中,

public static class Builder implements io.atomix.catalyst.util.Builder<CopycatServer> {//......private Member.Type type = Member.Type.ACTIVE;

?

而注意,transition是根據Member.type,來transition state的

/*** Transitions the server to the base state for the given member type.*/protected void transition(Member.Type type) {switch (type) {case ACTIVE:if (!(state instanceof ActiveState)) {transition(CopycatServer.State.FOLLOWER);}break;case PASSIVE:if (this.state.type() != CopycatServer.State.PASSIVE) {transition(CopycatServer.State.PASSIVE);}break;case RESERVE:if (this.state.type() != CopycatServer.State.RESERVE) {transition(CopycatServer.State.RESERVE);}break;default:if (this.state.type() != CopycatServer.State.INACTIVE) {transition(CopycatServer.State.INACTIVE);}break;}}

注意Active的處理,

當Member.type為active,如果這個時候state不是ActiveState,就transition到follower;顯然candidator和leader不是能直接transition過去的

?

可以看到上面ServerContext在初始化的時候,state的初始狀態是inactive
何時會變成active,

在server bootstrap或join一個cluster時, 都會調用ClusterState.join,里面會做狀態的transition

@Overridepublic CompletableFuture<Void> bootstrap(Collection<Address> cluster) {if (configuration == null) {if (member.type() != Member.Type.ACTIVE) {return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));} else {// Create a set of active members.Set<Member> activeMembers = cluster.stream().filter(m -> !m.equals(member.serverAddress())).map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())).collect(Collectors.toSet());// Add the local member to the set of active members. activeMembers.add(member);// Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));}}return join();}

?

@Overridepublic synchronized CompletableFuture<Void> join(Collection<Address> cluster) {// If no configuration was loaded from disk, create a new configuration.if (configuration == null) {// Create a set of cluster members, excluding the local member which is joining a cluster.Set<Member> activeMembers = cluster.stream().filter(m -> !m.equals(member.serverAddress())).map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())).collect(Collectors.toSet());// Create a new configuration and configure the cluster. Once the cluster is configured, the configuration// will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //修改配置 }return join();}/*** Starts the join to the cluster.*/private synchronized CompletableFuture<Void> join() {joinFuture = new CompletableFuture<>();context.getThreadContext().executor().execute(() -> {// Transition the server to the appropriate state for the local member type.context.transition(member.type()); //transition state// Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster// will result in the member attempting to get elected. This allows initial clusters to form.List<MemberState> activeMembers = getActiveMemberStates();if (!activeMembers.isEmpty()) {join(getActiveMemberStates().iterator());} else {joinFuture.complete(null);}});

?

?

下面看看leader,candidator和follower之間的轉化條件,

Leader

只有當Candidator發起vote,得到majority同意時,

context.transition(CopycatServer.State.LEADER) /*** Resets the election timer.*/private void sendVoteRequests() {//.........// Send vote requests to all nodes. The vote request that is sent// to this node will be automatically successful.// First check if the quorum is null. If the quorum isn't null then that// indicates that another vote is already going on.final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {complete.set(true);if (elected) {context.transition(CopycatServer.State.LEADER); //checkComplete()調用} else {context.transition(CopycatServer.State.FOLLOWER);}});// Once we got the last log term, iterate through each current member// of the cluster and vote each member for a vote.for (ServerMember member : votingMembers) {LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());VoteRequest request = VoteRequest.builder().withTerm(context.getTerm()).withCandidate(context.getCluster().member().id()).withLogIndex(lastIndex).withLogTerm(lastTerm).build();context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> {context.checkThread();if (isOpen() && !complete.get()) {if (error != null) {LOGGER.warn(error.getMessage());quorum.fail();} else {//........} else {LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member);quorum.succeed(); //member同意,succeeded++;checkComplete(); }}}}, context.getThreadContext().executor());});

?

Candidator

只有當Follower發起Poll請求,并得到majority的同意后,

/*** Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.*/private void sendPollRequests() {final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {// If a majority of the cluster indicated they would vote for us then transition to candidate.complete.set(true);if (elected) {context.transition(CopycatServer.State.CANDIDATE);} else {resetHeartbeatTimeout();}});//......

?

Follower

Leader –> Follower

在LeaderAppender中,由于heartbeat觸發

/*** Handles a {@link Response.Status#OK} response.*/protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {//......// If we've received a greater term, update the term and transition back to follower.else if (response.term() > context.getTerm()) {context.setTerm(response.term()).setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}

如果收到Response OK,但是response的term大于我的term,說明我已經不是leader了
所以要退化成follower

/*** Handles a {@link Response.Status#ERROR} response.*/protected void handleAppendResponseError(MemberState member, AppendRequest request, AppendResponse response) {// If we've received a greater term, update the term and transition back to follower.if (response.term() > context.getTerm()) {context.setTerm(response.term()).setLeader(0);context.transition(CopycatServer.State.FOLLOWER);

對于ResponseError也一樣

@Overrideprotected void failAttempt(MemberState member, Throwable error) {super.failAttempt(member, error);// Verify that the leader has contacted a majority of the cluster within the last two election timeouts.// If the leader is not able to contact a majority of the cluster within two election timeouts, assume// that a partition occurred and transition back to the FOLLOWER state.if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());context.setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}

failAttemp時,兩個getElectionTimeout超時內,收不到majority的heartbeat,說明發生partition
退化成follower

?

在LeaderState中,

leader初始化失敗時,

/*** Commits a no-op entry to the log, ensuring any entries from a previous term are committed.*/private CompletableFuture<Void> commitInitialEntries() {// The Raft protocol dictates that leaders cannot commit entries from previous terms until// at least one entry from their current term has been stored on a majority of servers. Thus,// we force entries to be appended up to the leader's no-op entry. The LeaderAppender will ensure// that the commitIndex is not increased until the no-op entry (appender.index()) is committed.CompletableFuture<Void> future = new CompletableFuture<>();appender.appendEntries(appender.index()).whenComplete((resultIndex, error) -> {context.checkThread();if (isOpen()) {if (error == null) {context.getStateMachine().apply(resultIndex);future.complete(null);} else {context.setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}}});return future;}

也會退化為follower

?

Candidator –> Follower

Vote失敗時,退化為follower

/*** Resets the election timer.*/private void sendVoteRequests() {//......// Send vote requests to all nodes. The vote request that is sent// to this node will be automatically successful.// First check if the quorum is null. If the quorum isn't null then that// indicates that another vote is already going on.final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {complete.set(true);if (elected) {context.transition(CopycatServer.State.LEADER);} else {context.transition(CopycatServer.State.FOLLOWER); //沒被選中 }});

?

ActiveState –> Follower

包含LeaderState,CandidatorState,在響應vote,append請求時,都會下面的邏輯

// If the request indicates a term that is greater than the current term then// assign that term and leader to the current context and transition to follower.boolean transition = updateTermAndLeader(request.term(), request.leader());// If a transition is required then transition back to the follower state.// If the node is already a follower then the transition will be ignored.if (transition) {context.transition(CopycatServer.State.FOLLOWER);}

?

/*** Updates the term and leader.*/protected boolean updateTermAndLeader(long term, int leader) {// If the request indicates a term that is greater than the current term or no leader has been// set for the current term, update leader and term.if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {context.setTerm(term);context.setLeader(leader);// Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset();return true;}return false;}

轉載于:https://www.cnblogs.com/fxjwind/p/6519940.html

總結

以上是生活随笔為你收集整理的Copycat - 状态的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。