package io.atomix.protocols.raft.impl;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.service.ServiceId;
import io.atomix.protocols.raft.service.ServiceType;
import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
import io.atomix.protocols.raft.session.RaftSessionMetadata;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.protocols.raft.session.impl.RaftSessionContext;
import io.atomix.protocols.raft.session.impl.RaftSessionManager;
import io.atomix.protocols.raft.storage.log.RaftLog;
import io.atomix.protocols.raft.storage.log.RaftLogReader;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* loaded from: input_file:io/atomix/protocols/raft/impl/RaftServiceManager.class */
public class RaftServiceManager implements AutoCloseable {
    private static final Duration COMPACT_INTERVAL = Duration.ofSeconds(10);
    private final Logger logger;
    private final RaftContext raft;
    private final ScheduledExecutorService threadPool;
    private final ThreadContext threadContext;
    private final RaftLog log;
    private final RaftLogReader reader;
    private final RaftSessionManager sessionManager = new RaftSessionManager();
    private final Map<String, DefaultServiceContext> services = new HashMap();
    private long lastPrepared;
    private long lastCompacted;

    public RaftServiceManager(RaftContext raftContext, ScheduledExecutorService scheduledExecutorService, ThreadContext threadContext) {
        this.raft = (RaftContext) Preconditions.checkNotNull(raftContext, "state cannot be null");
        this.log = raftContext.getLog();
        this.reader = this.log.openReader(1L, RaftLogReader.Mode.COMMITS);
        this.threadPool = scheduledExecutorService;
        this.threadContext = threadContext;
        this.logger = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class).addValue(raftContext.getName()).build());
        scheduleSnapshots();
    }

    public RaftSessionManager getSessions() {
        return this.sessionManager;
    }

    public void applyAll(long j) {
        if (j > this.raft.getLastApplied()) {
            this.raft.getThreadContext().execute(() -> {
                apply(j);
            });
        }
    }

    public <T> CompletableFuture<T> apply(long j) {
        while (this.reader.hasNext()) {
            long nextIndex = this.reader.getNextIndex();
            long lastApplied = this.raft.getLastApplied();
            if (nextIndex > lastApplied + 1 && nextIndex != this.reader.getFirstIndex()) {
                this.logger.error("Cannot apply non-sequential index {} unless it's the first entry in the log: {}", Long.valueOf(nextIndex), Long.valueOf(this.reader.getFirstIndex()));
                return Futures.exceptionalFuture(new IndexOutOfBoundsException("Cannot apply non-sequential index unless it's the first entry in the log"));
            }
            if (nextIndex < lastApplied) {
                this.logger.error("Cannot apply duplicate entry at index {}", Long.valueOf(nextIndex));
                return Futures.exceptionalFuture(new IndexOutOfBoundsException("Cannot apply duplicate entry at index " + nextIndex));
            }
            if (nextIndex < j) {
                Indexed next = this.reader.next();
                try {
                    try {
                        apply((Indexed<? extends RaftLogEntry>) next);
                        this.raft.setLastApplied(nextIndex);
                    } catch (Exception e) {
                        this.logger.error("Failed to apply {}: {}", next, e);
                        this.raft.setLastApplied(nextIndex);
                    }
                } catch (Throwable th) {
                    this.raft.setLastApplied(nextIndex);
                    throw th;
                }
            } else {
                if (nextIndex != j) {
                    this.raft.setLastApplied(nextIndex);
                    return Futures.completedFuture((Object) null);
                }
                Indexed next2 = this.reader.next();
                try {
                    if (next2.index() != j) {
                        throw new IllegalStateException("inconsistent index applying entry " + j + ": " + next2);
                    }
                    CompletableFuture<T> apply = apply((Indexed<? extends RaftLogEntry>) next2);
                    this.raft.setLastApplied(nextIndex);
                    return apply;
                } catch (Exception e2) {
                    try {
                        this.logger.error("Failed to apply {}: {}", next2, e2);
                        this.raft.setLastApplied(nextIndex);
                    } catch (Throwable th2) {
                        this.raft.setLastApplied(nextIndex);
                        throw th2;
                    }
                }
            }
        }
        return Futures.exceptionalFuture(new IndexOutOfBoundsException("Cannot commit index " + j));
    }

    public <T> CompletableFuture<T> apply(Indexed<? extends RaftLogEntry> indexed) {
        this.logger.trace("Applying {}", indexed);
        prepareIndex(indexed.index());
        return indexed.type() == QueryEntry.class ? (CompletableFuture<T>) applyQuery(indexed.cast()) : indexed.type() == CommandEntry.class ? (CompletableFuture<T>) applyCommand(indexed.cast()) : indexed.type() == OpenSessionEntry.class ? (CompletableFuture<T>) applyOpenSession(indexed.cast()) : indexed.type() == KeepAliveEntry.class ? (CompletableFuture<T>) applyKeepAlive(indexed.cast()) : indexed.type() == CloseSessionEntry.class ? (CompletableFuture<T>) applyCloseSession(indexed.cast()) : indexed.type() == MetadataEntry.class ? (CompletableFuture<T>) applyMetadata(indexed.cast()) : indexed.type() == InitializeEntry.class ? (CompletableFuture<T>) applyInitialize(indexed.cast()) : indexed.type() == ConfigurationEntry.class ? (CompletableFuture<T>) applyConfiguration(indexed.cast()) : Futures.exceptionalFuture(new RaftException.ProtocolException("Unknown entry type", new Object[0]));
    }

    private void prepareIndex(long j) {
        Snapshot snapshotByIndex;
        if (j <= this.lastPrepared || (snapshotByIndex = this.raft.getSnapshotStore().getSnapshotByIndex(j)) == null) {
            return;
        }
        SnapshotReader openReader = snapshotByIndex.openReader();
        Throwable th = null;
        try {
            ServiceId from = ServiceId.from(openReader.readLong());
            ServiceType from2 = ServiceType.from(openReader.readString());
            String readString = openReader.readString();
            DefaultServiceContext orInitializeService = getOrInitializeService(from, from2, readString);
            if (orInitializeService == null) {
                if (openReader != null) {
                    if (0 == 0) {
                        openReader.close();
                        return;
                    }
                    try {
                        openReader.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            this.logger.debug("Restoring sessions for {}", readString);
            int readInt = openReader.readInt();
            for (int i = 0; i < readInt; i++) {
                SessionId from3 = SessionId.from(openReader.readLong());
                MemberId from4 = MemberId.from(openReader.readString());
                ReadConsistency valueOf = ReadConsistency.valueOf(openReader.readString());
                long readLong = openReader.readLong();
                long readLong2 = openReader.readLong();
                RaftSessionContext raftSessionContext = new RaftSessionContext(from3, from4, readString, from2, valueOf, readLong, orInitializeService, this.raft, this.threadPool);
                raftSessionContext.setTimestamp(readLong2);
                raftSessionContext.setRequestSequence(openReader.readLong());
                raftSessionContext.setCommandSequence(openReader.readLong());
                raftSessionContext.setEventIndex(openReader.readLong());
                raftSessionContext.setLastCompleted(openReader.readLong());
                raftSessionContext.setLastApplied(snapshotByIndex.index());
                this.sessionManager.registerSession(raftSessionContext);
            }
            this.lastPrepared = j;
        } finally {
            if (openReader != null) {
                if (0 != 0) {
                    try {
                        openReader.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    openReader.close();
                }
            }
        }
    }

    private CompletableFuture<Void> applyInitialize(Indexed<InitializeEntry> indexed) {
        Iterator<DefaultServiceContext> it = this.services.values().iterator();
        while (it.hasNext()) {
            it.next().keepAliveSessions(indexed.index(), ((InitializeEntry) indexed.entry()).timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> applyConfiguration(Indexed<ConfigurationEntry> indexed) {
        Iterator<DefaultServiceContext> it = this.services.values().iterator();
        while (it.hasNext()) {
            it.next().keepAliveSessions(indexed.index(), ((ConfigurationEntry) indexed.entry()).timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<long[]> applyKeepAlive(Indexed<KeepAliveEntry> indexed) {
        long[] sessionIds = ((KeepAliveEntry) indexed.entry()).sessionIds();
        long[] commandSequenceNumbers = ((KeepAliveEntry) indexed.entry()).commandSequenceNumbers();
        long[] eventIndexes = ((KeepAliveEntry) indexed.entry()).eventIndexes();
        ArrayList arrayList = new ArrayList(sessionIds.length);
        ArrayList arrayList2 = new ArrayList(sessionIds.length);
        for (int i = 0; i < sessionIds.length; i++) {
            long j = sessionIds[i];
            long j2 = commandSequenceNumbers[i];
            long j3 = eventIndexes[i];
            RaftSessionContext session = this.sessionManager.getSession(j);
            if (session != null) {
                arrayList2.add(session.getService().keepAlive(indexed.index(), ((KeepAliveEntry) indexed.entry()).timestamp(), session, j2, j3).thenApply(bool -> {
                    if (!bool.booleanValue()) {
                        return null;
                    }
                    synchronized (arrayList) {
                        arrayList.add(Long.valueOf(j));
                    }
                    return null;
                }));
            }
        }
        Iterator<DefaultServiceContext> it = this.services.values().iterator();
        while (it.hasNext()) {
            it.next().completeKeepAlive(indexed.index(), ((KeepAliveEntry) indexed.entry()).timestamp());
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[arrayList2.size()])).thenApply(r4 -> {
            long[] array;
            synchronized (arrayList) {
                array = Longs.toArray(arrayList);
            }
            return array;
        });
    }

    private DefaultServiceContext getOrInitializeService(ServiceId serviceId, ServiceType serviceType, String str) {
        DefaultServiceContext defaultServiceContext = this.services.get(str);
        if (defaultServiceContext == null) {
            Supplier<RaftService> factory = this.raft.getServiceRegistry().getFactory((String) serviceType.id());
            if (factory == null) {
                return null;
            }
            defaultServiceContext = new DefaultServiceContext(serviceId, str, serviceType, factory.get(), this.raft, this.sessionManager, this.threadPool);
            this.services.put(str, defaultServiceContext);
        }
        return defaultServiceContext;
    }

    private CompletableFuture<Long> applyOpenSession(Indexed<OpenSessionEntry> indexed) {
        DefaultServiceContext orInitializeService = getOrInitializeService(ServiceId.from(indexed.index()), ServiceType.from(((OpenSessionEntry) indexed.entry()).serviceType()), ((OpenSessionEntry) indexed.entry()).serviceName());
        if (orInitializeService == null) {
            return Futures.exceptionalFuture(new RaftException.UnknownService("Unknown service type " + ((OpenSessionEntry) indexed.entry()).serviceType(), new Object[0]));
        }
        RaftSessionContext raftSessionContext = new RaftSessionContext(SessionId.from(indexed.index()), MemberId.from(((OpenSessionEntry) indexed.entry()).memberId()), ((OpenSessionEntry) indexed.entry()).serviceName(), ServiceType.from(((OpenSessionEntry) indexed.entry()).serviceType()), ((OpenSessionEntry) indexed.entry()).readConsistency(), ((OpenSessionEntry) indexed.entry()).timeout(), orInitializeService, this.raft, this.threadPool);
        this.sessionManager.registerSession(raftSessionContext);
        return orInitializeService.openSession(indexed.index(), ((OpenSessionEntry) indexed.entry()).timestamp(), raftSessionContext);
    }

    private CompletableFuture<Void> applyCloseSession(Indexed<CloseSessionEntry> indexed) {
        RaftSessionContext session = this.sessionManager.getSession(((CloseSessionEntry) indexed.entry()).session());
        if (session != null) {
            return session.getService().closeSession(indexed.index(), ((CloseSessionEntry) indexed.entry()).timestamp(), session);
        }
        this.logger.warn("Unknown session: " + ((CloseSessionEntry) indexed.entry()).session());
        return Futures.exceptionalFuture(new RaftException.UnknownSession("Unknown session: " + ((CloseSessionEntry) indexed.entry()).session(), new Object[0]));
    }

    private CompletableFuture<MetadataResult> applyMetadata(Indexed<MetadataEntry> indexed) {
        if (((MetadataEntry) indexed.entry()).session() <= 0) {
            HashSet hashSet = new HashSet();
            for (RaftSessionContext raftSessionContext : this.sessionManager.getSessions()) {
                hashSet.add(new RaftSessionMetadata(((Long) raftSessionContext.sessionId().id()).longValue(), raftSessionContext.serviceName(), (String) raftSessionContext.serviceType().id()));
            }
            return CompletableFuture.completedFuture(new MetadataResult(hashSet));
        }
        RaftSessionContext session = this.sessionManager.getSession(((MetadataEntry) indexed.entry()).session());
        if (session == null) {
            this.logger.warn("Unknown session: " + ((MetadataEntry) indexed.entry()).session());
            return Futures.exceptionalFuture(new RaftException.UnknownSession("Unknown session: " + ((MetadataEntry) indexed.entry()).session(), new Object[0]));
        }
        HashSet hashSet2 = new HashSet();
        for (RaftSessionContext raftSessionContext2 : this.sessionManager.getSessions()) {
            if (raftSessionContext2.serviceName().equals(session.serviceName())) {
                hashSet2.add(new RaftSessionMetadata(((Long) raftSessionContext2.sessionId().id()).longValue(), raftSessionContext2.serviceName(), (String) raftSessionContext2.serviceType().id()));
            }
        }
        return CompletableFuture.completedFuture(new MetadataResult(hashSet2));
    }

    private CompletableFuture<OperationResult> applyCommand(Indexed<CommandEntry> indexed) {
        RaftSessionContext session = this.sessionManager.getSession(((CommandEntry) indexed.entry()).session());
        if (session != null) {
            return session.getService().executeCommand(indexed.index(), ((CommandEntry) indexed.entry()).sequenceNumber(), ((CommandEntry) indexed.entry()).timestamp(), session, ((CommandEntry) indexed.entry()).operation());
        }
        this.logger.warn("Unknown session: " + ((CommandEntry) indexed.entry()).session());
        return Futures.exceptionalFuture(new RaftException.UnknownSession("unknown session: " + ((CommandEntry) indexed.entry()).session(), new Object[0]));
    }

    private CompletableFuture<OperationResult> applyQuery(Indexed<QueryEntry> indexed) {
        RaftSessionContext session = this.sessionManager.getSession(((QueryEntry) indexed.entry()).session());
        if (session != null) {
            return session.getService().executeQuery(indexed.index(), ((QueryEntry) indexed.entry()).sequenceNumber(), ((QueryEntry) indexed.entry()).timestamp(), session, ((QueryEntry) indexed.entry()).operation());
        }
        this.logger.warn("Unknown session: " + ((QueryEntry) indexed.entry()).session());
        return Futures.exceptionalFuture(new RaftException.UnknownSession("unknown session " + ((QueryEntry) indexed.entry()).session(), new Object[0]));
    }

    private void scheduleSnapshots() {
        this.threadContext.schedule(COMPACT_INTERVAL, this::snapshotServices);
    }

    private void scheduleCompaction(long j) {
        this.logger.trace("Scheduling compaction in {}", COMPACT_INTERVAL);
        this.threadContext.schedule(COMPACT_INTERVAL, () -> {
            compactLogs(j);
        });
    }

    private void snapshotServices() {
        long lastApplied = this.raft.getLastApplied();
        if (!this.raft.getLog().isCompactable(lastApplied) || this.raft.getLog().getCompactableIndex(lastApplied) <= this.lastCompacted) {
            scheduleSnapshots();
            return;
        }
        this.logger.debug("Snapshotting services");
        this.lastCompacted = lastApplied;
        List list = (List) new ArrayList(this.services.values()).stream().map(defaultServiceContext -> {
            return defaultServiceContext.completeSnapshot(defaultServiceContext.takeSnapshot().join().longValue());
        }).collect(Collectors.toList());
        CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()])).whenComplete((r7, th) -> {
            scheduleCompaction(lastApplied);
        });
    }

    private void compactLogs(long j) {
        this.logger.debug("Compacting logs up to index {}", Long.valueOf(j));
        try {
            try {
                this.log.compact(j);
                snapshotServices();
            } catch (Exception e) {
                this.logger.error("An exception occurred during log compaction: {}", e);
                snapshotServices();
            }
        } catch (Throwable th) {
            snapshotServices();
            throw th;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
