/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.consensus.shipping;

import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.neo4j.causalclustering.core.consensus.LeaderContext;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembership;
import org.neo4j.causalclustering.core.consensus.outcome.ShipCommand;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService;
import org.neo4j.causalclustering.core.consensus.shipping.RaftLogShipper;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.LogProvider;

public class RaftLogShippingManager
extends LifecycleAdapter
implements RaftMembership.Listener {
    private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
    private final LogProvider logProvider;
    private final ReadableRaftLog raftLog;
    private final Clock clock;
    private final MemberId myself;
    private final RaftMembership membership;
    private final long retryTimeMillis;
    private final int catchupBatchSize;
    private final int maxAllowedShippingLag;
    private final InFlightCache inFlightCache;
    private Map<MemberId, RaftLogShipper> logShippers = new HashMap<MemberId, RaftLogShipper>();
    private LeaderContext lastLeaderContext;
    private boolean running;
    private boolean stopped;
    private TimerService timerService;

    public RaftLogShippingManager(Outbound<MemberId, RaftMessages.RaftMessage> outbound, LogProvider logProvider, ReadableRaftLog raftLog, TimerService timerService, Clock clock, MemberId myself, RaftMembership membership, long retryTimeMillis, int catchupBatchSize, int maxAllowedShippingLag, InFlightCache inFlightCache) {
        this.outbound = outbound;
        this.logProvider = logProvider;
        this.raftLog = raftLog;
        this.timerService = timerService;
        this.clock = clock;
        this.myself = myself;
        this.membership = membership;
        this.retryTimeMillis = retryTimeMillis;
        this.catchupBatchSize = catchupBatchSize;
        this.maxAllowedShippingLag = maxAllowedShippingLag;
        this.inFlightCache = inFlightCache;
        membership.registerListener(this);
    }

    public synchronized void pause() {
        this.running = false;
        this.logShippers.values().forEach(RaftLogShipper::stop);
        this.logShippers.clear();
    }

    public synchronized void resume(LeaderContext initialLeaderContext) {
        if (this.stopped) {
            return;
        }
        this.running = true;
        for (MemberId member : this.membership.replicationMembers()) {
            this.ensureLogShipperRunning(member, initialLeaderContext);
        }
        this.lastLeaderContext = initialLeaderContext;
    }

    public synchronized void stop() {
        this.pause();
        this.stopped = true;
    }

    private void ensureLogShipperRunning(MemberId member, LeaderContext leaderContext) {
        RaftLogShipper logShipper = this.logShippers.get(member);
        if (logShipper == null && !member.equals(this.myself)) {
            logShipper = new RaftLogShipper(this.outbound, this.logProvider, this.raftLog, this.clock, this.timerService, this.myself, member, leaderContext.term, leaderContext.commitIndex, this.retryTimeMillis, this.catchupBatchSize, this.maxAllowedShippingLag, this.inFlightCache);
            this.logShippers.put(member, logShipper);
            logShipper.start();
        }
    }

    public synchronized void handleCommands(Iterable<ShipCommand> shipCommands, LeaderContext leaderContext) throws IOException {
        for (ShipCommand shipCommand : shipCommands) {
            for (RaftLogShipper logShipper : this.logShippers.values()) {
                shipCommand.applyTo(logShipper, leaderContext);
            }
        }
        this.lastLeaderContext = leaderContext;
    }

    @Override
    public synchronized void onMembershipChanged() {
        if (this.lastLeaderContext == null || !this.running) {
            return;
        }
        HashSet<MemberId> toBeRemoved = new HashSet<MemberId>(this.logShippers.keySet());
        toBeRemoved.removeAll(this.membership.replicationMembers());
        for (MemberId member : toBeRemoved) {
            RaftLogShipper logShipper = this.logShippers.remove(member);
            if (logShipper == null) continue;
            logShipper.stop();
        }
        for (MemberId replicationMember : this.membership.replicationMembers()) {
            this.ensureLogShipperRunning(replicationMember, this.lastLeaderContext);
        }
    }

    public String toString() {
        return String.format("RaftLogShippingManager{logShippers=%s, myself=%s}", this.logShippers, this.myself);
    }
}

