/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.jraft.core;

import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.CatchUpClosure;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.core.NodeMetrics;
import com.alipay.sofa.jraft.core.TimerManager;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.LogEntry;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.RaftOutter;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReplicatorOptions;
import com.alipay.sofa.jraft.rpc.RaftClientService;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.util.ByteBufferCollector;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.RecyclableByteBufferList;
import com.alipay.sofa.jraft.util.RecycleUtil;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.protobuf.Message;
import com.google.protobuf.ZeroByteStringHelper;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class Replicator
implements ThreadId.OnError {
    private static final Logger LOG = LoggerFactory.getLogger(Replicator.class);
    private final RaftClientService rpcService;
    private volatile long nextIndex;
    private int consecutiveErrorTimes = 0;
    private boolean hasSucceeded;
    private long timeoutNowIndex;
    private volatile long lastRpcSendTimestamp;
    private volatile long heartbeatCounter = 0L;
    private volatile long appendEntriesCounter = 0L;
    private volatile long installSnapshotCounter = 0L;
    protected Stat statInfo = new Stat();
    private ScheduledFuture<?> blockTimer;
    private Inflight rpcInFly;
    private Future<Message> heartbeatInFly;
    private Future<Message> timeoutNowInFly;
    private final ArrayDeque<Inflight> inflights = new ArrayDeque();
    private long waitId = -1L;
    protected ThreadId id;
    private final ReplicatorOptions options;
    private final RaftOptions raftOptions;
    private ScheduledFuture<?> heartbeatTimer;
    private volatile SnapshotReader reader;
    private CatchUpClosure catchUpClosure;
    private final TimerManager timerManager;
    private final NodeMetrics nodeMetrics;
    private volatile State state;
    private int reqSeq = 0;
    private int requiredNextSeq = 0;
    private int version = 0;
    private final PriorityQueue<RpcResponse> pendingResponses = new PriorityQueue(50);

    private int getAndIncrementReqSeq() {
        int prev = this.reqSeq++;
        if (this.reqSeq < 0) {
            this.reqSeq = 0;
        }
        return prev;
    }

    private int getAndIncrementRequiredNextSeq() {
        int prev = this.requiredNextSeq++;
        if (this.requiredNextSeq < 0) {
            this.requiredNextSeq = 0;
        }
        return prev;
    }

    public Replicator(ReplicatorOptions replicatorOptions, RaftOptions raftOptions) {
        this.options = replicatorOptions;
        this.nodeMetrics = this.options.getNode().getNodeMetrics();
        this.nextIndex = this.options.getLogManager().getLastLogIndex() + 1L;
        this.timerManager = replicatorOptions.getTimerManager();
        this.raftOptions = raftOptions;
        this.rpcService = replicatorOptions.getRaftRpcService();
    }

    @OnlyForTest
    ArrayDeque<Inflight> getInflights() {
        return this.inflights;
    }

    @OnlyForTest
    State getState() {
        return this.state;
    }

    @OnlyForTest
    void setState(State state) {
        this.state = state;
    }

    @OnlyForTest
    int getReqSeq() {
        return this.reqSeq;
    }

    @OnlyForTest
    int getRequiredNextSeq() {
        return this.requiredNextSeq;
    }

    @OnlyForTest
    int getVersion() {
        return this.version;
    }

    @OnlyForTest
    public PriorityQueue<RpcResponse> getPendingResponses() {
        return this.pendingResponses;
    }

    @OnlyForTest
    long getWaitId() {
        return this.waitId;
    }

    @OnlyForTest
    ScheduledFuture<?> getBlockTimer() {
        return this.blockTimer;
    }

    @OnlyForTest
    long getTimeoutNowIndex() {
        return this.timeoutNowIndex;
    }

    @OnlyForTest
    ReplicatorOptions getOpts() {
        return this.options;
    }

    @OnlyForTest
    long getRealNextIndex() {
        return this.nextIndex;
    }

    @OnlyForTest
    Future<Message> getRpcInFly() {
        if (this.rpcInFly == null) {
            return null;
        }
        return this.rpcInFly.rpcFuture;
    }

    @OnlyForTest
    Future<Message> getHeartbeatInFly() {
        return this.heartbeatInFly;
    }

    @OnlyForTest
    ScheduledFuture<?> getHeartbeatTimer() {
        return this.heartbeatTimer;
    }

    @OnlyForTest
    void setHasSucceeded() {
        this.hasSucceeded = true;
    }

    @OnlyForTest
    Future<Message> getTimeoutNowInFly() {
        return this.timeoutNowInFly;
    }

    private void addInflight(RequestType reqType, long startIndex, int count, int size, int seq, Future<Message> rpcInfly) {
        this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
        this.inflights.add(this.rpcInFly);
        this.nodeMetrics.recordSize("replicate-inflights-count", this.inflights.size());
    }

    long getNextSendIndex() {
        if (this.inflights.isEmpty()) {
            return this.nextIndex;
        }
        if (this.inflights.size() > this.raftOptions.getMaxReplicatorInflightMsgs()) {
            return -1L;
        }
        if (this.rpcInFly != null && this.rpcInFly.isSendingLogEntries()) {
            return this.rpcInFly.startIndex + (long)this.rpcInFly.count;
        }
        return -1L;
    }

    private Inflight pollInflight() {
        return this.inflights.poll();
    }

    private void startHeartbeatTimer(long startMs) {
        long dueTime = startMs + (long)this.options.getDynamicHeartBeatTimeoutMs();
        try {
            this.heartbeatTimer = this.timerManager.schedule(() -> Replicator.onTimeout(this.id), dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.error("Fail to schedule heartbeat timer", (Throwable)e);
            Replicator.onTimeout(this.id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void installSnapshot() {
        if (this.state == State.Snapshot) {
            LOG.warn("Replicator {} is installing snapshot, ignore the new request.", (Object)this.options.getPeerId());
            this.id.unlock();
            return;
        }
        boolean doUnlock = true;
        try {
            Requires.requireTrue(this.reader == null, "Replicator %s already has a snapshot reader, current state is %s", new Object[]{this.options.getPeerId(), this.state});
            this.reader = this.options.getSnapshotStorage().open();
            if (this.reader == null) {
                NodeImpl node = this.options.getNode();
                RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                error.setStatus(new Status(RaftError.EIO, "Fail to open snapshot", new Object[0]));
                this.id.unlock();
                doUnlock = false;
                node.onError(error);
                return;
            }
            String uri = this.reader.generateURIForCopy();
            if (uri == null) {
                NodeImpl node = this.options.getNode();
                RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                error.setStatus(new Status(RaftError.EIO, "Fail to generate uri for snapshot reader", new Object[0]));
                this.releaseReader();
                this.id.unlock();
                doUnlock = false;
                node.onError(error);
                return;
            }
            RaftOutter.SnapshotMeta meta = this.reader.load();
            if (meta == null) {
                String snapshotPath = this.reader.getPath();
                NodeImpl node = this.options.getNode();
                RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
                error.setStatus(new Status(RaftError.EIO, "Fail to load meta from %s", snapshotPath));
                this.releaseReader();
                this.id.unlock();
                doUnlock = false;
                node.onError(error);
                return;
            }
            RpcRequests.InstallSnapshotRequest.Builder rb = RpcRequests.InstallSnapshotRequest.newBuilder();
            rb.setTerm(this.options.getTerm());
            rb.setGroupId(this.options.getGroupId());
            rb.setServerId(this.options.getServerId().toString());
            rb.setPeerId(this.options.getPeerId().toString());
            rb.setMeta(meta);
            rb.setUri(uri);
            this.statInfo.runningState = RunningState.INSTALLING_SNAPSHOT;
            this.statInfo.lastLogIncluded = meta.getLastIncludedIndex();
            this.statInfo.lastTermIncluded = meta.getLastIncludedTerm();
            final RpcRequests.InstallSnapshotRequest request = rb.build();
            this.state = State.Snapshot;
            ++this.installSnapshotCounter;
            final long monotonicSendTimeMs = Utils.monotonicMs();
            final int stateVersion = this.version;
            final int seq = this.getAndIncrementReqSeq();
            Future<Message> rpcFuture = this.rpcService.installSnapshot(this.options.getPeerId().getEndpoint(), request, (RpcResponseClosure<RpcRequests.InstallSnapshotResponse>)new RpcResponseClosureAdapter<RpcRequests.InstallSnapshotResponse>(){

                @Override
                public void run(Status status) {
                    Replicator.onRpcReturned(Replicator.this.id, RequestType.Snapshot, status, (Message)request, this.getResponse(), seq, stateVersion, monotonicSendTimeMs);
                }
            });
            this.addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture);
        }
        finally {
            if (doUnlock) {
                this.id.unlock();
            }
        }
    }

    static boolean onInstallSnapshotReturned(ThreadId id, Replicator r, Status status, RpcRequests.InstallSnapshotRequest request, RpcRequests.InstallSnapshotResponse response) {
        boolean success = true;
        r.releaseReader();
        StringBuilder sb = new StringBuilder("Node ").append(r.options.getGroupId()).append(":").append(r.options.getServerId()).append(" received InstallSnapshotResponse from ").append(r.options.getPeerId()).append(" lastIncludedIndex=").append(request.getMeta().getLastIncludedIndex()).append(" lastIncludedTerm=").append(request.getMeta().getLastIncludedTerm());
        if (!status.isOk()) {
            sb.append(" error:").append(status);
            LOG.info(sb.toString());
            if (++r.consecutiveErrorTimes % 10 == 0) {
                LOG.warn("Fail to install snapshot at peer={}, error={}", (Object)r.options.getPeerId(), (Object)status);
            }
            success = false;
        } else if (!response.getSuccess()) {
            sb.append(" success=false");
            LOG.info(sb.toString());
            success = false;
        } else {
            r.nextIndex = request.getMeta().getLastIncludedIndex() + 1L;
            sb.append(" success=true");
            LOG.info(sb.toString());
        }
        if (!success) {
            r.resetInflights();
            r.state = State.Probe;
            r.block(Utils.nowMs(), status.getCode());
            return false;
        }
        r.hasSucceeded = true;
        r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
        if (r.timeoutNowIndex > 0L && r.timeoutNowIndex < r.nextIndex) {
            r.sendTimeoutNow(false, false);
        }
        r.state = State.Replicate;
        return true;
    }

    private void sendEmptyEntries(boolean isHeartbeat) {
        this.sendEmptyEntries(isHeartbeat, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEmptyEntries(boolean isHeartbeat, RpcResponseClosure<RpcRequests.AppendEntriesResponse> heartBeatClosure) {
        RpcRequests.AppendEntriesRequest.Builder rb = RpcRequests.AppendEntriesRequest.newBuilder();
        if (!this.fillCommonFields(rb, this.nextIndex - 1L, isHeartbeat)) {
            this.installSnapshot();
            if (isHeartbeat && heartBeatClosure != null) {
                Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN, "Fail to send heartbeat to peer %s", this.options.getPeerId()));
            }
            return;
        }
        try {
            final long monotonicSendTimeMs = Utils.monotonicMs();
            final RpcRequests.AppendEntriesRequest request = rb.build();
            if (isHeartbeat) {
                ++this.heartbeatCounter;
                RpcResponseClosureAdapter<RpcRequests.AppendEntriesResponse> heartbeatDone = heartBeatClosure != null ? heartBeatClosure : new RpcResponseClosureAdapter<RpcRequests.AppendEntriesResponse>(){

                    @Override
                    public void run(Status status) {
                        Replicator.onHeartbeatReturned(Replicator.this.id, status, request, (RpcRequests.AppendEntriesResponse)this.getResponse(), monotonicSendTimeMs);
                    }
                };
                this.heartbeatInFly = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, this.options.getElectionTimeoutMs() / 2, (RpcResponseClosure<RpcRequests.AppendEntriesResponse>)heartbeatDone);
            } else {
                this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
                this.statInfo.firstLogIndex = this.nextIndex;
                this.statInfo.lastLogIndex = this.nextIndex - 1L;
                ++this.appendEntriesCounter;
                this.state = State.Probe;
                final int stateVersion = this.version;
                final int seq = this.getAndIncrementReqSeq();
                Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1, (RpcResponseClosure<RpcRequests.AppendEntriesResponse>)new RpcResponseClosureAdapter<RpcRequests.AppendEntriesResponse>(){

                    @Override
                    public void run(Status status) {
                        Replicator.onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, (Message)request, this.getResponse(), seq, stateVersion, monotonicSendTimeMs);
                    }
                });
                this.addInflight(RequestType.AppendEntries, this.nextIndex, 0, 0, seq, rpcFuture);
            }
            LOG.debug("Node {} send HeartbeatRequest to {} term {} lastCommittedIndex {}", new Object[]{this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex()});
        }
        finally {
            this.id.unlock();
        }
    }

    boolean prepareEntry(long nextSendingIndex, int offset, RaftOutter.EntryMeta.Builder emb, RecyclableByteBufferList dateBuffer) {
        if (dateBuffer.getCapacity() >= this.raftOptions.getMaxBodySize()) {
            return false;
        }
        long logIndex = nextSendingIndex + (long)offset;
        LogEntry entry = this.options.getLogManager().getEntry(logIndex);
        if (entry == null) {
            return false;
        }
        emb.setTerm(entry.getId().getTerm());
        if (entry.hasChecksum()) {
            emb.setChecksum(entry.getChecksum());
        }
        emb.setType(entry.getType());
        if (entry.getPeers() != null) {
            Requires.requireTrue(!entry.getPeers().isEmpty(), "Empty peers at logIndex=%d", logIndex);
            for (PeerId peer : entry.getPeers()) {
                emb.addPeers(peer.toString());
            }
            if (entry.getOldPeers() != null) {
                for (PeerId peer : entry.getOldPeers()) {
                    emb.addOldPeers(peer.toString());
                }
            }
        } else {
            Requires.requireTrue(entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION, "Empty peers but is ENTRY_TYPE_CONFIGURATION type at logIndex=%d", logIndex);
        }
        int remaining = entry.getData() != null ? entry.getData().remaining() : 0;
        emb.setDataLen(remaining);
        if (entry.getData() != null) {
            dateBuffer.add(entry.getData().slice());
        }
        return true;
    }

    public static ThreadId start(ReplicatorOptions opts, RaftOptions raftOptions) {
        if (opts.getLogManager() == null || opts.getBallotBox() == null || opts.getNode() == null) {
            throw new IllegalArgumentException("Invalid ReplicatorOptions.");
        }
        Replicator r = new Replicator(opts, raftOptions);
        if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) {
            LOG.error("Fail to init sending channel to {}", (Object)opts.getPeerId());
            return null;
        }
        MetricRegistry metricRegistry = opts.getNode().getNodeMetrics().getMetricRegistry();
        if (metricRegistry != null) {
            try {
                String replicatorMetricName = Replicator.getReplicatorMetricName(opts);
                if (!metricRegistry.getNames().contains(replicatorMetricName)) {
                    metricRegistry.register(replicatorMetricName, (Metric)new ReplicatorMetricSet(opts, r));
                }
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }
        r.id = new ThreadId(r, r);
        r.id.lock();
        LOG.info("Replicator={}@{} is started", (Object)r.id, (Object)r.options.getPeerId());
        r.catchUpClosure = null;
        r.lastRpcSendTimestamp = Utils.monotonicMs();
        r.startHeartbeatTimer(Utils.nowMs());
        r.sendEmptyEntries(false);
        return r.id;
    }

    private static String getReplicatorMetricName(ReplicatorOptions opts) {
        return "replicator-" + opts.getNode().getGroupId() + "/" + opts.getPeerId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void waitForCaughtUp(ThreadId id, long maxMargin, long dueTime, CatchUpClosure done) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "No such replicator", new Object[0]));
            return;
        }
        try {
            if (r.catchUpClosure != null) {
                LOG.error("Previous wait_for_caught_up is not over");
                Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Duplicated call", new Object[0]));
                return;
            }
            done.setMaxMargin(maxMargin);
            if (dueTime > 0L) {
                done.setTimer(r.timerManager.schedule(() -> Replicator.onCatchUpTimedOut(id), dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS));
            }
            r.catchUpClosure = done;
        }
        finally {
            id.unlock();
        }
    }

    public String toString() {
        return "Replicator [state=" + (Object)((Object)this.state) + ", statInfo=" + this.statInfo + ",peerId=" + this.options.getPeerId() + "]";
    }

    static void onBlockTimeoutInNewThread(ThreadId id) {
        if (id != null) {
            Replicator.continueSending(id, RaftError.ETIMEDOUT.getNumber());
        }
    }

    static void unBlockAndSendNow(ThreadId id) {
        if (id == null) {
            return;
        }
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return;
        }
        try {
            if (r.blockTimer != null && r.blockTimer.cancel(true)) {
                Replicator.onBlockTimeout(id);
            }
        }
        finally {
            id.unlock();
        }
    }

    static boolean continueSending(ThreadId id, int errCode) {
        if (id == null) {
            return true;
        }
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return false;
        }
        r.waitId = -1L;
        if (errCode == RaftError.ETIMEDOUT.getNumber()) {
            r.sendEmptyEntries(false);
        } else if (errCode != RaftError.ESTOP.getNumber()) {
            r.sendEntries();
        } else {
            LOG.warn("Replicator {} stops sending entries.", (Object)id);
            id.unlock();
        }
        return true;
    }

    static void onBlockTimeout(ThreadId arg) {
        Utils.runInThread(() -> Replicator.onBlockTimeoutInNewThread(arg));
    }

    void block(long startTimeMs, int errorCode) {
        long dueTime = startTimeMs + (long)this.options.getDynamicHeartBeatTimeoutMs();
        try {
            LOG.debug("Blocking {} for {} ms", (Object)this.options.getPeerId(), (Object)this.options.getDynamicHeartBeatTimeoutMs());
            this.blockTimer = this.timerManager.schedule(() -> Replicator.onBlockTimeout(this.id), dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS);
            this.statInfo.runningState = RunningState.BLOCKING;
            this.id.unlock();
        }
        catch (Exception e) {
            LOG.error("Fail to add timer", (Throwable)e);
            this.sendEmptyEntries(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(ThreadId id, Object data, int errorCode) {
        Replicator r = (Replicator)data;
        if (errorCode == RaftError.ESTOP.getNumber()) {
            try {
                for (Inflight inflight : r.inflights) {
                    if (inflight == r.rpcInFly) continue;
                    inflight.rpcFuture.cancel(true);
                }
                if (r.rpcInFly != null) {
                    r.rpcInFly.rpcFuture.cancel(true);
                    r.rpcInFly = null;
                }
                if (r.heartbeatInFly != null) {
                    r.heartbeatInFly.cancel(true);
                    r.heartbeatInFly = null;
                }
                if (r.timeoutNowInFly != null) {
                    r.timeoutNowInFly.cancel(true);
                    r.timeoutNowInFly = null;
                }
                if (r.heartbeatTimer != null) {
                    r.heartbeatTimer.cancel(true);
                    r.heartbeatTimer = null;
                }
                if (r.blockTimer != null) {
                    r.blockTimer.cancel(true);
                    r.blockTimer = null;
                }
                if (r.waitId >= 0L) {
                    r.options.getLogManager().removeWaiter(r.waitId);
                }
                r.notifyOnCaughtUp(errorCode, true);
            }
            finally {
                r.destroy();
            }
        } else if (errorCode == RaftError.ETIMEDOUT.getNumber()) {
            id.unlock();
            Utils.runInThread(() -> Replicator.sendHeartbeat(id));
        } else {
            id.unlock();
            Requires.requireTrue(false, "Unknown error code for replicator: " + errorCode);
        }
    }

    private static void onCatchUpTimedOut(ThreadId id) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return;
        }
        try {
            r.notifyOnCaughtUp(RaftError.ETIMEDOUT.getNumber(), false);
        }
        finally {
            id.unlock();
        }
    }

    private void notifyOnCaughtUp(int code, boolean beforeDestroy) {
        if (this.catchUpClosure == null) {
            return;
        }
        if (code != RaftError.ETIMEDOUT.getNumber()) {
            if (this.nextIndex - 1L + this.catchUpClosure.getMaxMargin() < this.options.getLogManager().getLastLogIndex()) {
                return;
            }
            if (this.catchUpClosure.isErrorWasSet()) {
                return;
            }
            this.catchUpClosure.setErrorWasSet(true);
            if (code != RaftError.SUCCESS.getNumber()) {
                this.catchUpClosure.getStatus().setError(code, RaftError.describeCode(code), new Object[0]);
            }
            if (this.catchUpClosure.hasTimer() && !beforeDestroy && !this.catchUpClosure.getTimer().cancel(true)) {
                return;
            }
        } else if (!this.catchUpClosure.isErrorWasSet()) {
            this.catchUpClosure.getStatus().setError(code, RaftError.describeCode(code), new Object[0]);
        }
        CatchUpClosure savedClosure = this.catchUpClosure;
        this.catchUpClosure = null;
        Utils.runClosureInThread(savedClosure, savedClosure.getStatus());
    }

    private static void onTimeout(ThreadId id) {
        if (id != null) {
            id.setError(RaftError.ETIMEDOUT.getNumber());
        } else {
            LOG.warn("Replicator id is null when timeout, maybe it's destroyed.");
        }
    }

    void destroy() {
        ThreadId savedId = this.id;
        LOG.info("Replicator {} is going to quit", (Object)savedId);
        this.id = null;
        this.releaseReader();
        if (this.options.getNode().getNodeMetrics().isEnabled()) {
            this.options.getNode().getNodeMetrics().getMetricRegistry().remove(Replicator.getReplicatorMetricName(this.options));
        }
        this.state = State.Destroyed;
        savedId.unlockAndDestroy();
    }

    private void releaseReader() {
        if (this.reader != null) {
            Utils.closeQuietly(this.reader);
            this.reader = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void onHeartbeatReturned(ThreadId id, Status status, RpcRequests.AppendEntriesRequest request, RpcRequests.AppendEntriesResponse response, long rpcSendTime) {
        if (id == null) {
            return;
        }
        long startTimeMs = Utils.nowMs();
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return;
        }
        try {
            boolean isLogDebugEnabled = LOG.isDebugEnabled();
            StringBuilder sb = null;
            if (isLogDebugEnabled) {
                sb = new StringBuilder("Node ").append(r.options.getGroupId()).append(":").append(r.options.getServerId()).append(" received HeartbeatResponse from ").append(r.options.getPeerId()).append(" prevLogIndex=").append(request.getPrevLogIndex()).append(" prevLogTerm=").append(request.getPrevLogTerm());
            }
            if (!status.isOk()) {
                if (isLogDebugEnabled) {
                    sb.append(" fail, sleep.");
                    LOG.debug(sb.toString());
                }
                r.state = State.Probe;
                if (++r.consecutiveErrorTimes % 10 == 0) {
                    LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", new Object[]{r.options.getPeerId(), r.consecutiveErrorTimes, status});
                }
                r.startHeartbeatTimer(startTimeMs);
                return;
            }
            r.consecutiveErrorTimes = 0;
            if (response.getTerm() > r.options.getTerm()) {
                if (isLogDebugEnabled) {
                    sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ").append(r.options.getTerm());
                    LOG.debug(sb.toString());
                }
                NodeImpl node = r.options.getNode();
                r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
                r.destroy();
                node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
                return;
            }
            if (isLogDebugEnabled) {
                LOG.debug(sb.toString());
            }
            if (rpcSendTime > r.lastRpcSendTimestamp) {
                r.lastRpcSendTimestamp = rpcSendTime;
            }
            r.startHeartbeatTimer(startTimeMs);
        }
        finally {
            id.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    static void onRpcReturned(ThreadId id, RequestType reqType, Status status, Message request, Message response, int seq, int stateVersion, long rpcSendTime) {
        if (id == null) {
            return;
        }
        startTimeMs = Utils.nowMs();
        r = (Replicator)id.lock();
        if (r == null) {
            return;
        }
        if (stateVersion != r.version) {
            Replicator.LOG.debug("Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.", new Object[]{r, stateVersion, r.version, request, response, status});
            id.unlock();
            return;
        }
        holdingQueue = r.pendingResponses;
        holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
        if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
            Replicator.LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}", new Object[]{holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs()});
            r.resetInflights();
            r.state = State.Probe;
            r.sendEmptyEntries(false);
            return;
        }
        continueSendEntries = false;
        isLogDebugEnabled = Replicator.LOG.isDebugEnabled();
        sb = null;
        if (isLogDebugEnabled) {
            sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,");
        }
        try {
            processed = 0;
lbl27:
            // 7 sources

            block12: while (!holdingQueue.isEmpty()) {
                queuedPipelinedResponse = holdingQueue.peek();
                if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
                    if (processed > 0) {
                        if (isLogDebugEnabled) {
                            sb.append("has processed ").append(processed).append(" responses,");
                        }
                        break;
                    }
                    continueSendEntries = false;
                    id.unlock();
                    return;
                }
                holdingQueue.remove();
                ++processed;
                inflight = r.pollInflight();
                if (inflight == null) {
                    if (!isLogDebugEnabled) continue;
                    sb.append("ignore response because request not found:").append(queuedPipelinedResponse).append(",\n");
                    continue;
                }
                if (inflight.seq != queuedPipelinedResponse.seq) {
                    Replicator.LOG.warn("Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.", new Object[]{r, inflight.seq, queuedPipelinedResponse.seq});
                    r.resetInflights();
                    r.state = State.Probe;
                    continueSendEntries = false;
                    r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
                    return;
                }
                try {
                    switch (6.$SwitchMap$com$alipay$sofa$jraft$core$Replicator$RequestType[queuedPipelinedResponse.requestType.ordinal()]) {
                        case 1: {
                            continueSendEntries = Replicator.onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status, (RpcRequests.AppendEntriesRequest)queuedPipelinedResponse.request, (RpcRequests.AppendEntriesResponse)queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
                            ** break;
                        }
                        case 2: {
                            continueSendEntries = Replicator.onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status, (RpcRequests.InstallSnapshotRequest)queuedPipelinedResponse.request, (RpcRequests.InstallSnapshotResponse)queuedPipelinedResponse.response);
                            continue block12;
                        }
                        ** default:
lbl63:
                        // 1 sources

                        continue block12;
                    }
                }
                finally {
                    if (continueSendEntries) {
                        r.getAndIncrementRequiredNextSeq();
                        continue;
                    }
                    break;
                }
            }
        }
        finally {
            if (isLogDebugEnabled) {
                sb.append(", after processed, continue to send entries: ").append(continueSendEntries);
                Replicator.LOG.debug(sb.toString());
            }
            if (continueSendEntries) {
                r.sendEntries();
            }
        }
    }

    void resetInflights() {
        int rs;
        ++this.version;
        this.inflights.clear();
        this.pendingResponses.clear();
        this.reqSeq = this.requiredNextSeq = (rs = Math.max(this.reqSeq, this.requiredNextSeq));
        this.releaseReader();
    }

    private static boolean onAppendEntriesReturned(ThreadId id, Inflight inflight, Status status, RpcRequests.AppendEntriesRequest request, RpcRequests.AppendEntriesResponse response, long rpcSendTime, long startTimeMs, Replicator r) {
        int entriesSize;
        if (inflight.startIndex != request.getPrevLogIndex() + 1L) {
            LOG.warn("Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, requset prevLogIndex={}, reset the replicator state and probe again.", new Object[]{r, inflight.startIndex, request.getPrevLogIndex()});
            r.resetInflights();
            r.state = State.Probe;
            r.sendEmptyEntries(false);
            return false;
        }
        if (request.getEntriesCount() > 0) {
            r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
            r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount());
            r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() != null ? (long)request.getData().size() : 0L);
        }
        boolean isLogDebugEnabled = LOG.isDebugEnabled();
        StringBuilder sb = null;
        if (isLogDebugEnabled) {
            sb = new StringBuilder("Node ").append(r.options.getGroupId()).append(":").append(r.options.getServerId()).append(" received AppendEntriesResponse from ").append(r.options.getPeerId()).append(" prevLogIndex=").append(request.getPrevLogIndex()).append(" prevLogTerm=").append(request.getPrevLogTerm()).append(" count=").append(request.getEntriesCount());
        }
        if (!status.isOk()) {
            if (isLogDebugEnabled) {
                sb.append(" fail, sleep.");
                LOG.debug(sb.toString());
            }
            if (++r.consecutiveErrorTimes % 10 == 0) {
                LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", new Object[]{r.options.getPeerId(), r.consecutiveErrorTimes, status});
            }
            r.resetInflights();
            r.state = State.Probe;
            r.block(startTimeMs, status.getCode());
            return false;
        }
        r.consecutiveErrorTimes = 0;
        if (!response.getSuccess()) {
            if (response.getTerm() > r.options.getTerm()) {
                if (isLogDebugEnabled) {
                    sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ").append(r.options.getTerm());
                    LOG.debug(sb.toString());
                }
                NodeImpl node = r.options.getNode();
                r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
                r.destroy();
                node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
                return false;
            }
            if (isLogDebugEnabled) {
                sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.getLastLogIndex()).append(" local nextIndex ").append(r.nextIndex);
                LOG.debug(sb.toString());
            }
            if (rpcSendTime > r.lastRpcSendTimestamp) {
                r.lastRpcSendTimestamp = rpcSendTime;
            }
            r.resetInflights();
            if (response.getLastLogIndex() + 1L < r.nextIndex) {
                LOG.debug("LastLogIndex at peer={} is {}", (Object)r.options.getPeerId(), (Object)response.getLastLogIndex());
                r.nextIndex = response.getLastLogIndex() + 1L;
            } else if (r.nextIndex > 1L) {
                LOG.debug("logIndex={} dismatch", (Object)r.nextIndex);
                --r.nextIndex;
            } else {
                LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen", (Object)r.options.getPeerId());
            }
            r.sendEmptyEntries(false);
            return false;
        }
        if (isLogDebugEnabled) {
            sb.append(", success");
            LOG.debug(sb.toString());
        }
        if (response.getTerm() != r.options.getTerm()) {
            r.resetInflights();
            r.state = State.Probe;
            LOG.error("Fail, response term {} dismatch, expect term {}", (Object)response.getTerm(), (Object)r.options.getTerm());
            id.unlock();
            return false;
        }
        if (rpcSendTime > r.lastRpcSendTimestamp) {
            r.lastRpcSendTimestamp = rpcSendTime;
        }
        if ((entriesSize = request.getEntriesCount()) > 0) {
            r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + (long)entriesSize - 1L, r.options.getPeerId());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Replicated logs in [{}, {}] to peer {}", new Object[]{r.nextIndex, r.nextIndex + (long)entriesSize - 1L, r.options.getPeerId()});
            }
        } else {
            r.state = State.Replicate;
        }
        r.nextIndex += (long)entriesSize;
        r.hasSucceeded = true;
        r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
        if (r.timeoutNowIndex > 0L && r.timeoutNowIndex < r.nextIndex) {
            r.sendTimeoutNow(false, false);
        }
        return true;
    }

    private boolean fillCommonFields(RpcRequests.AppendEntriesRequest.Builder rb, long prevLogIndex, boolean isHeartbeat) {
        long prevLogTerm = this.options.getLogManager().getTerm(prevLogIndex);
        if (prevLogTerm == 0L && prevLogIndex != 0L) {
            if (!isHeartbeat) {
                Requires.requireTrue(prevLogIndex < this.options.getLogManager().getFirstLogIndex());
                LOG.debug("logIndex={} was compacted", (Object)prevLogIndex);
                return false;
            }
            prevLogIndex = 0L;
        }
        rb.setTerm(this.options.getTerm());
        rb.setGroupId(this.options.getGroupId());
        rb.setServerId(this.options.getServerId().toString());
        rb.setPeerId(this.options.getPeerId().toString());
        rb.setPrevLogIndex(prevLogIndex);
        rb.setPrevLogTerm(prevLogTerm);
        rb.setCommittedIndex(this.options.getBallotBox().getLastCommittedIndex());
        return true;
    }

    private void waitMoreEntries(long nextWaitIndex) {
        try {
            LOG.debug("Node {} waits more entries", (Object)this.options.getNode().getNodeId());
            if (this.waitId >= 0L) {
                return;
            }
            this.waitId = this.options.getLogManager().wait(nextWaitIndex - 1L, (arg, errorCode) -> Replicator.continueSending((ThreadId)arg, errorCode), this.id);
            this.statInfo.runningState = RunningState.IDLE;
        }
        finally {
            this.id.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendEntries() {
        boolean doUnlock = true;
        try {
            long nextSendingIndex;
            long prevSendIndex = -1L;
            while ((nextSendingIndex = this.getNextSendIndex()) > prevSendIndex) {
                if (this.sendEntries(nextSendingIndex)) {
                    prevSendIndex = nextSendingIndex;
                    continue;
                }
                doUnlock = false;
                break;
            }
        }
        finally {
            if (doUnlock) {
                this.id.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean sendEntries(long nextSendingIndex) {
        RpcRequests.AppendEntriesRequest.Builder rb = RpcRequests.AppendEntriesRequest.newBuilder();
        if (!this.fillCommonFields(rb, nextSendingIndex - 1L, false)) {
            this.installSnapshot();
            return false;
        }
        ByteBufferCollector dataBuf = null;
        int maxEntriesSize = this.raftOptions.getMaxEntriesSize();
        RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
        try {
            RaftOutter.EntryMeta.Builder emb;
            int i;
            for (i = 0; i < maxEntriesSize && this.prepareEntry(nextSendingIndex, i, emb = RaftOutter.EntryMeta.newBuilder(), byteBufList); i += 1) {
                rb.addEntries(emb.build());
            }
            if (rb.getEntriesCount() == 0) {
                if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
                    this.installSnapshot();
                    i = 0;
                    return i != 0;
                }
                this.waitMoreEntries(nextSendingIndex);
                i = 0;
                return i != 0;
            }
            if (byteBufList.getCapacity() > 0) {
                dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
                for (ByteBuffer b : byteBufList) {
                    dataBuf.put(b);
                }
                ByteBuffer buf = dataBuf.getBuffer();
                buf.flip();
                rb.setData(ZeroByteStringHelper.wrap(buf));
            }
        }
        finally {
            RecycleUtil.recycle(byteBufList);
        }
        final RpcRequests.AppendEntriesRequest request = rb.build();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Node {} send AppendEntriesRequest to {} term {} lastCommittedIndex {} prevLogIndex {} prevLogTerm {} logIndex {} count {}", new Object[]{this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex(), request.getPrevLogIndex(), request.getPrevLogTerm(), nextSendingIndex, request.getEntriesCount()});
        }
        this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
        this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1L;
        this.statInfo.lastLogIndex = rb.getPrevLogIndex() + (long)rb.getEntriesCount();
        final ByteBufferCollector recyclable = dataBuf;
        final int v = this.version;
        final long monotonicSendTimeMs = Utils.monotonicMs();
        final int seq = this.getAndIncrementReqSeq();
        Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1, (RpcResponseClosure<RpcRequests.AppendEntriesResponse>)new RpcResponseClosureAdapter<RpcRequests.AppendEntriesResponse>(){

            @Override
            public void run(Status status) {
                RecycleUtil.recycle(recyclable);
                Replicator.onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, (Message)request, this.getResponse(), seq, v, monotonicSendTimeMs);
            }
        });
        this.addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(), seq, rpcFuture);
        return true;
    }

    public static void sendHeartbeat(ThreadId id, RpcResponseClosure<RpcRequests.AppendEntriesResponse> closure) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Peer %s is not connected", id));
            return;
        }
        r.sendEmptyEntries(true, closure);
    }

    private static void sendHeartbeat(ThreadId id) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return;
        }
        r.sendEmptyEntries(true);
    }

    private void sendTimeoutNow(boolean unlockId, boolean stopAfterFinish) {
        this.sendTimeoutNow(unlockId, stopAfterFinish, -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendTimeoutNow(boolean unlockId, boolean stopAfterFinish, int timeoutMs) {
        RpcRequests.TimeoutNowRequest.Builder rb = RpcRequests.TimeoutNowRequest.newBuilder();
        rb.setTerm(this.options.getTerm());
        rb.setGroupId(this.options.getGroupId());
        rb.setServerId(this.options.getServerId().toString());
        rb.setPeerId(this.options.getPeerId().toString());
        try {
            if (!stopAfterFinish) {
                this.timeoutNowInFly = this.timeoutNow(rb, false, timeoutMs);
                this.timeoutNowIndex = 0L;
            } else {
                this.timeoutNow(rb, true, timeoutMs);
            }
        }
        finally {
            if (unlockId) {
                this.id.unlock();
            }
        }
    }

    private Future<Message> timeoutNow(RpcRequests.TimeoutNowRequest.Builder rb, final boolean stopAfterFinish, int timeoutMs) {
        final RpcRequests.TimeoutNowRequest request = rb.build();
        return this.rpcService.timeoutNow(this.options.getPeerId().getEndpoint(), request, timeoutMs, (RpcResponseClosure<RpcRequests.TimeoutNowResponse>)new RpcResponseClosureAdapter<RpcRequests.TimeoutNowResponse>(){

            @Override
            public void run(Status status) {
                if (Replicator.this.id != null) {
                    Replicator.onTimeoutNowReturned(Replicator.this.id, status, request, (RpcRequests.TimeoutNowResponse)this.getResponse(), stopAfterFinish);
                }
            }
        });
    }

    static void onTimeoutNowReturned(ThreadId id, Status status, RpcRequests.TimeoutNowRequest request, RpcRequests.TimeoutNowResponse response, boolean stopAfterFinish) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return;
        }
        boolean isLogDebugEnabled = LOG.isDebugEnabled();
        StringBuilder sb = null;
        if (isLogDebugEnabled) {
            sb = new StringBuilder("Node ").append(r.options.getGroupId()).append(":").append(r.options.getServerId()).append(" received TimeoutNowResponse from ").append(r.options.getPeerId());
        }
        if (!status.isOk()) {
            if (isLogDebugEnabled) {
                sb.append(" fail:").append(status);
                LOG.debug(sb.toString());
            }
            if (stopAfterFinish) {
                r.notifyOnCaughtUp(RaftError.ESTOP.getNumber(), true);
                r.destroy();
            } else {
                id.unlock();
            }
            return;
        }
        if (isLogDebugEnabled) {
            sb.append(response.getSuccess() ? " success" : " fail");
            LOG.debug(sb.toString());
        }
        if (response.getTerm() > r.options.getTerm()) {
            NodeImpl node = r.options.getNode();
            r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
            r.destroy();
            node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term timeout_now_response from peer:%s", r.options.getPeerId()));
            return;
        }
        if (stopAfterFinish) {
            r.notifyOnCaughtUp(RaftError.ESTOP.getNumber(), true);
            r.destroy();
        } else {
            id.unlock();
        }
    }

    public static boolean stop(ThreadId id) {
        id.setError(RaftError.ESTOP.getNumber());
        return true;
    }

    public static boolean join(ThreadId id) {
        id.join();
        return true;
    }

    public static long getLastRpcSendTimestamp(ThreadId id) {
        Replicator r = (Replicator)id.getData();
        if (r == null) {
            return 0L;
        }
        return r.lastRpcSendTimestamp;
    }

    public static boolean transferLeadership(ThreadId id, long logIndex) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return false;
        }
        return r.transferLeadership(logIndex);
    }

    private boolean transferLeadership(long logIndex) {
        if (this.hasSucceeded && this.nextIndex > logIndex) {
            this.sendTimeoutNow(true, false);
            return true;
        }
        this.timeoutNowIndex = logIndex;
        this.id.unlock();
        return true;
    }

    public static boolean stopTransferLeadership(ThreadId id) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return false;
        }
        r.timeoutNowIndex = 0L;
        id.unlock();
        return true;
    }

    public static boolean sendTimeoutNowAndStop(ThreadId id, int timeoutMs) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return false;
        }
        r.sendTimeoutNow(true, true, timeoutMs);
        return true;
    }

    public static long getNextIndex(ThreadId id) {
        Replicator r = (Replicator)id.lock();
        if (r == null) {
            return 0L;
        }
        long nextIdx = 0L;
        if (r.hasSucceeded) {
            nextIdx = r.nextIndex;
        }
        id.unlock();
        return nextIdx;
    }

    static class RpcResponse
    implements Comparable<RpcResponse> {
        final Status status;
        final Message request;
        final Message response;
        final long rpcSendTime;
        final int seq;
        final RequestType requestType;

        public RpcResponse(RequestType reqType, int seq, Status status, Message request, Message response, long rpcSendTime) {
            this.requestType = reqType;
            this.seq = seq;
            this.status = status;
            this.request = request;
            this.response = response;
            this.rpcSendTime = rpcSendTime;
        }

        public String toString() {
            return "RpcResponse [status=" + this.status + ", request=" + this.request + ", response=" + this.response + ", rpcSendTime=" + this.rpcSendTime + ", seq=" + this.seq + ", requestType=" + (Object)((Object)this.requestType) + "]";
        }

        @Override
        public int compareTo(RpcResponse o) {
            return Integer.compare(this.seq, o.seq);
        }
    }

    static class Inflight {
        final int count;
        final long startIndex;
        final int size;
        final Future<Message> rpcFuture;
        final RequestType requestType;
        final int seq;

        public Inflight(RequestType requestType, long startIndex, int count, int size, int seq, Future<Message> rpcFuture) {
            this.seq = seq;
            this.requestType = requestType;
            this.count = count;
            this.startIndex = startIndex;
            this.size = size;
            this.rpcFuture = rpcFuture;
        }

        public String toString() {
            return "Inflight [count=" + this.count + ", startIndex=" + this.startIndex + ", size=" + this.size + ", rpcFuture=" + this.rpcFuture + ", requestType=" + (Object)((Object)this.requestType) + ", seq=" + this.seq + "]";
        }

        boolean isSendingLogEntries() {
            return this.requestType == RequestType.AppendEntries && this.count > 0;
        }
    }

    static enum RequestType {
        Snapshot,
        AppendEntries;

    }

    static class Stat {
        RunningState runningState;
        long firstLogIndex;
        long lastLogIncluded;
        long lastLogIndex;
        long lastTermIncluded;

        Stat() {
        }

        public String toString() {
            return "<running=" + (Object)((Object)this.runningState) + ", firstLogIndex=" + this.firstLogIndex + ", lastLogIncluded=" + this.lastLogIncluded + ", lastLogIndex=" + this.lastLogIndex + ", lastTermIncluded=" + this.lastTermIncluded + ">";
        }
    }

    static enum RunningState {
        IDLE,
        BLOCKING,
        APPENDING_ENTRIES,
        INSTALLING_SNAPSHOT;

    }

    private static final class ReplicatorMetricSet
    implements MetricSet {
        private final ReplicatorOptions opts;
        private final Replicator r;

        private ReplicatorMetricSet(ReplicatorOptions opts, Replicator r) {
            this.opts = opts;
            this.r = r;
        }

        public Map<String, Metric> getMetrics() {
            HashMap<String, Metric> gauges = new HashMap<String, Metric>();
            gauges.put("log-lags", (Metric)((Gauge)() -> this.opts.getLogManager().getLastLogIndex() - (this.r.nextIndex - 1L)));
            gauges.put("next-index", (Metric)((Gauge)() -> this.r.nextIndex));
            gauges.put("heartbeat-times", (Metric)((Gauge)() -> this.r.heartbeatCounter));
            gauges.put("install-snapshot-times", (Metric)((Gauge)() -> this.r.installSnapshotCounter));
            gauges.put("append-entries-times", (Metric)((Gauge)() -> this.r.appendEntriesCounter));
            return gauges;
        }
    }

    public static enum State {
        Probe,
        Snapshot,
        Replicate,
        Destroyed;

    }
}

