Copycat - 状态
Member.Status
status的變遷是源于heartbeat
heartbeat,append空的entries
/*** Triggers a heartbeat to a majority of the cluster.* <p>* For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be* created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()* call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever* result in a single AppendRequest to each follower at any given time, and the returned future will be* shared by all concurrent calls.** @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.*/public CompletableFuture<Long> appendEntries() {// If there are no other active members in the cluster, simply complete the append operation.if (context.getClusterState().getRemoteMemberStates().isEmpty())return CompletableFuture.completedFuture(null);// If no heartbeat future already exists, that indicates there's no heartbeat currently under way.// Create a new heartbeat future and commit to all members in the cluster.if (heartbeatFuture == null) {CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();heartbeatFuture = newHeartbeatFuture;heartbeatTime = System.currentTimeMillis();for (MemberState member : context.getClusterState().getRemoteMemberStates()) {appendEntries(member); // 對所有member發起appendEntries}return newHeartbeatFuture;}heartbeat的邏輯是會向所有的getRemoteMemberStates,發起heartbeat
?
AVAILABLE
在初始化的時候,每個ServerMember默認是Status.AVAILABLE
public final class ServerMember implements Member, CatalystSerializable, AutoCloseable {private Member.Type type;private Status status = Status.AVAILABLE;?
LeaderAppender
@Overrideprotected void succeedAttempt(MemberState member) {super.succeedAttempt(member);// If the member is currently marked as UNAVAILABLE, change its status to AVAILABLE and update the configuration.if (member.getMember().status() == ServerMember.Status.UNAVAILABLE && !leader.configuring()) {member.getMember().update(ServerMember.Status.AVAILABLE, Instant.now());leader.configure(context.getCluster().members());}}?
在succeedAttempt里面會將unavailable轉換成available;在super.succeedAttempt中會將fail count清空
?
這個當收到AppendResponseOk的時候會調用,
protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {// Reset the member failure count and update the member's availability status if necessary.succeedAttempt(member);leader的心跳是通過空AppendResponse實現的,所以可以收到ResponseOK,說明member是available的
?
UNAVAILABLE
在fail Attempt中被調用
@Overrideprotected void failAttempt(MemberState member, Throwable error) {super.failAttempt(member, error);// Verify that the leader has contacted a majority of the cluster within the last two election timeouts.// If the leader is not able to contact a majority of the cluster within two election timeouts, assume// that a partition occurred and transition back to the FOLLOWER state.if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());context.setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}// If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so.else if (member.getFailureCount() >= 3) {// If the member is currently marked as AVAILABLE, change its status to UNAVAILABLE and update the configuration.if (member.getMember().status() == ServerMember.Status.AVAILABLE && !leader.configuring()) {member.getMember().update(ServerMember.Status.UNAVAILABLE, Instant.now());leader.configure(context.getCluster().members());}}}super.failAttempt中,會重置connection,和increase failcount
member.incrementFailureCount();?
第一個判斷Math.max(heartbeatTime(), leaderTime)
heartbeatTime
/*** Returns the last time a majority of the cluster was contacted.* <p>* This is calculated by sorting the list of active members and getting the last time the majority of* the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE* members, index 1 (the second member) will be used to determine the commit time in a sorted members list.*/private long heartbeatTime() {int quorumIndex = quorumIndex();if (quorumIndex >= 0) {return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();}return System.currentTimeMillis();}這個意思將ActiveMember按heartbeat排序,然后取出quorumIndex的heartbeat,即多數派中最早的heartbeat
如果leader收到的有效heartbeat達不到多數派,說明發生腦裂
這時,leader會退化成follower
?
第二個判斷,當一個member的failcount>3,就把他標記為UNAVAILABLE
?
而failAttempt,會在各種fail response里面被調用
AbstractAppender handleAppendRequestFailure, handleAppendResponseFailure, handleConfigureRequestFailure, handleInstallRequestFailure?
?
CopycatServer.State
?
public enum State {/*** Represents the state of an inactive server.* <p>* All servers start in this state and return to this state when {@link #leave() stopped}.*/INACTIVE,/*** Represents the state of a server that is a reserve member of the cluster.* <p>* Reserve servers only receive notification of leader, term, and configuration changes.*/RESERVE,/*** Represents the state of a server in the process of catching up its log.* <p>* Upon successfully joining an existing cluster, the server will transition to the passive state and remain there* until the leader determines that the server has caught up enough to be promoted to a full member.*/PASSIVE,/*** Represents the state of a server participating in normal log replication.* <p>* The follower state is a standard Raft state in which the server receives replicated log entries from the leader.*/FOLLOWER,/*** Represents the state of a server attempting to become the leader.* <p>* When a server in the follower state fails to receive communication from a valid leader for some time period,* the follower will transition to the candidate state. During this period, the candidate requests votes from* each of the other servers in the cluster. If the candidate wins the election by receiving votes from a majority* of the cluster, it will transition to the leader state.*/CANDIDATE,/*** Represents the state of a server which is actively coordinating and replicating logs with other servers.* <p>* Leaders are responsible for handling and replicating writes from clients. Note that more than one leader can* exist at any given time, but Raft guarantees that no two leaders will exist for the same {@link Cluster#term()}.*/LEADER}?
在serverContext初始化的時候,state為Inactive
public class ServerContext implements AutoCloseable {//......protected ServerState state = new InactiveState(this);?
比較tricky的是,在Member里面有,
enum Type {/*** Represents an inactive member.* <p>* The {@code INACTIVE} member type represents a member which does not participate in any communication* and is not an active member of the cluster. This is typically the state of a member prior to joining* or after leaving a cluster.*/INACTIVE,/*** Represents a member which does not participate in replication.* <p>* The {@code RESERVE} member type is representative of a member that does not participate in any* replication of state but only maintains contact with the cluster leader and is an active member* of the {@link Cluster}. Typically, reserve members act as standby nodes which can be* {@link #promote() promoted} to a {@link #PASSIVE} or {@link #ACTIVE} role when needed.*/RESERVE,/*** Represents a member which participates in asynchronous replication but does not vote in elections* or otherwise participate in the Raft consensus algorithm.* <p>* The {@code PASSIVE} member type is representative of a member that receives state changes from* follower nodes asynchronously. As state changes are committed via the {@link #ACTIVE} Raft nodes,* committed state changes are asynchronously replicated by followers to passive members. This allows* passive members to maintain nearly up-to-date state with minimal impact on the performance of the* Raft algorithm itself, and allows passive members to be quickly promoted to {@link #ACTIVE} voting* members if necessary.*/PASSIVE,/*** Represents a full voting member of the Raft cluster which participates fully in leader election* and replication algorithms.* <p>* The {@code ACTIVE} member type represents a full voting member of the Raft cluster. Active members* participate in the Raft leader election and replication algorithms and can themselves be elected* leaders.*/ACTIVE,}看看不同,這里面有Active,而State里面沒有
除此state包含type;
意思是,memeber可以是inactive,reserve,passive和active
當member是inactive,reserve,passive時,那么server的state也和其相應
當member是active時,那么server的state,可能是follower,candidator或leader其中之一
?
在CopycatServer.builder中,
public static class Builder implements io.atomix.catalyst.util.Builder<CopycatServer> {//......private Member.Type type = Member.Type.ACTIVE;?
而注意,transition是根據Member.type,來transition state的
/*** Transitions the server to the base state for the given member type.*/protected void transition(Member.Type type) {switch (type) {case ACTIVE:if (!(state instanceof ActiveState)) {transition(CopycatServer.State.FOLLOWER);}break;case PASSIVE:if (this.state.type() != CopycatServer.State.PASSIVE) {transition(CopycatServer.State.PASSIVE);}break;case RESERVE:if (this.state.type() != CopycatServer.State.RESERVE) {transition(CopycatServer.State.RESERVE);}break;default:if (this.state.type() != CopycatServer.State.INACTIVE) {transition(CopycatServer.State.INACTIVE);}break;}}注意Active的處理,
當Member.type為active,如果這個時候state不是ActiveState,就transition到follower;顯然candidator和leader不是能直接transition過去的
?
可以看到上面ServerContext在初始化的時候,state的初始狀態是inactive
何時會變成active,
在server bootstrap或join一個cluster時, 都會調用ClusterState.join,里面會做狀態的transition
@Overridepublic CompletableFuture<Void> bootstrap(Collection<Address> cluster) {if (configuration == null) {if (member.type() != Member.Type.ACTIVE) {return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));} else {// Create a set of active members.Set<Member> activeMembers = cluster.stream().filter(m -> !m.equals(member.serverAddress())).map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())).collect(Collectors.toSet());// Add the local member to the set of active members. activeMembers.add(member);// Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));}}return join();}?
@Overridepublic synchronized CompletableFuture<Void> join(Collection<Address> cluster) {// If no configuration was loaded from disk, create a new configuration.if (configuration == null) {// Create a set of cluster members, excluding the local member which is joining a cluster.Set<Member> activeMembers = cluster.stream().filter(m -> !m.equals(member.serverAddress())).map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())).collect(Collectors.toSet());// Create a new configuration and configure the cluster. Once the cluster is configured, the configuration// will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //修改配置 }return join();}/*** Starts the join to the cluster.*/private synchronized CompletableFuture<Void> join() {joinFuture = new CompletableFuture<>();context.getThreadContext().executor().execute(() -> {// Transition the server to the appropriate state for the local member type.context.transition(member.type()); //transition state// Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster// will result in the member attempting to get elected. This allows initial clusters to form.List<MemberState> activeMembers = getActiveMemberStates();if (!activeMembers.isEmpty()) {join(getActiveMemberStates().iterator());} else {joinFuture.complete(null);}});?
?
下面看看leader,candidator和follower之間的轉化條件,
Leader
只有當Candidator發起vote,得到majority同意時,
context.transition(CopycatServer.State.LEADER) /*** Resets the election timer.*/private void sendVoteRequests() {//.........// Send vote requests to all nodes. The vote request that is sent// to this node will be automatically successful.// First check if the quorum is null. If the quorum isn't null then that// indicates that another vote is already going on.final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {complete.set(true);if (elected) {context.transition(CopycatServer.State.LEADER); //checkComplete()調用} else {context.transition(CopycatServer.State.FOLLOWER);}});// Once we got the last log term, iterate through each current member// of the cluster and vote each member for a vote.for (ServerMember member : votingMembers) {LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());VoteRequest request = VoteRequest.builder().withTerm(context.getTerm()).withCandidate(context.getCluster().member().id()).withLogIndex(lastIndex).withLogTerm(lastTerm).build();context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> {context.checkThread();if (isOpen() && !complete.get()) {if (error != null) {LOGGER.warn(error.getMessage());quorum.fail();} else {//........} else {LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member);quorum.succeed(); //member同意,succeeded++;checkComplete(); }}}}, context.getThreadContext().executor());});?
Candidator
只有當Follower發起Poll請求,并得到majority的同意后,
/*** Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.*/private void sendPollRequests() {final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {// If a majority of the cluster indicated they would vote for us then transition to candidate.complete.set(true);if (elected) {context.transition(CopycatServer.State.CANDIDATE);} else {resetHeartbeatTimeout();}});//......?
Follower
Leader –> Follower
在LeaderAppender中,由于heartbeat觸發
/*** Handles a {@link Response.Status#OK} response.*/protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {//......// If we've received a greater term, update the term and transition back to follower.else if (response.term() > context.getTerm()) {context.setTerm(response.term()).setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}如果收到Response OK,但是response的term大于我的term,說明我已經不是leader了
所以要退化成follower
對于ResponseError也一樣
@Overrideprotected void failAttempt(MemberState member, Throwable error) {super.failAttempt(member, error);// Verify that the leader has contacted a majority of the cluster within the last two election timeouts.// If the leader is not able to contact a majority of the cluster within two election timeouts, assume// that a partition occurred and transition back to the FOLLOWER state.if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());context.setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}failAttemp時,兩個getElectionTimeout超時內,收不到majority的heartbeat,說明發生partition
退化成follower
?
在LeaderState中,
leader初始化失敗時,
/*** Commits a no-op entry to the log, ensuring any entries from a previous term are committed.*/private CompletableFuture<Void> commitInitialEntries() {// The Raft protocol dictates that leaders cannot commit entries from previous terms until// at least one entry from their current term has been stored on a majority of servers. Thus,// we force entries to be appended up to the leader's no-op entry. The LeaderAppender will ensure// that the commitIndex is not increased until the no-op entry (appender.index()) is committed.CompletableFuture<Void> future = new CompletableFuture<>();appender.appendEntries(appender.index()).whenComplete((resultIndex, error) -> {context.checkThread();if (isOpen()) {if (error == null) {context.getStateMachine().apply(resultIndex);future.complete(null);} else {context.setLeader(0);context.transition(CopycatServer.State.FOLLOWER);}}});return future;}也會退化為follower
?
Candidator –> Follower
Vote失敗時,退化為follower
/*** Resets the election timer.*/private void sendVoteRequests() {//......// Send vote requests to all nodes. The vote request that is sent// to this node will be automatically successful.// First check if the quorum is null. If the quorum isn't null then that// indicates that another vote is already going on.final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {complete.set(true);if (elected) {context.transition(CopycatServer.State.LEADER);} else {context.transition(CopycatServer.State.FOLLOWER); //沒被選中 }});?
ActiveState –> Follower
包含LeaderState,CandidatorState,在響應vote,append請求時,都會下面的邏輯
// If the request indicates a term that is greater than the current term then// assign that term and leader to the current context and transition to follower.boolean transition = updateTermAndLeader(request.term(), request.leader());// If a transition is required then transition back to the follower state.// If the node is already a follower then the transition will be ignored.if (transition) {context.transition(CopycatServer.State.FOLLOWER);}?
/*** Updates the term and leader.*/protected boolean updateTermAndLeader(long term, int leader) {// If the request indicates a term that is greater than the current term or no leader has been// set for the current term, update leader and term.if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {context.setTerm(term);context.setLeader(leader);// Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset();return true;}return false;}轉載于:https://www.cnblogs.com/fxjwind/p/6519940.html
總結
以上是生活随笔為你收集整理的Copycat - 状态的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: struts2的namespace的问题
- 下一篇: win10使用Composer-Setu