package io.apicurio.registry.storage.impl.kafkasql.upgrade;

import io.apicurio.common.apps.config.Info;
import io.apicurio.registry.exception.RuntimeAssertionFailedException;
import io.apicurio.registry.exception.UnreachableCodeException;
import io.apicurio.registry.storage.impl.kafkasql.KafkaSqlSubmitter;
import io.apicurio.registry.storage.impl.kafkasql.keys.MessageKey;
import io.apicurio.registry.storage.impl.kafkasql.keys.UpgraderKey;
import io.apicurio.registry.storage.impl.kafkasql.sql.KafkaSqlStore;
import io.apicurio.registry.storage.impl.kafkasql.values.ActionType;
import io.apicurio.registry.storage.impl.kafkasql.values.MessageValue;
import io.apicurio.registry.storage.impl.kafkasql.values.UpgraderValue;
import io.apicurio.registry.utils.impexp.ContentEntity;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
import org.slf4j.Logger;

@ApplicationScoped
/* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager.class */
public class KafkaSqlUpgraderManager {
    public static final int BASE_KAFKASQL_TOPIC_VERSION = 1;
    public static final int TARGET_KAFKASQL_TOPIC_VERSION = 2;

    @ConfigProperty(name = "registry.kafkasql.upgrade-lock-timeout", defaultValue = "10s")
    @Info(category = "store", description = "How long should KafkaSQL upgrader manager hold the lock before it's assumed to have failed. There is a tradeoff between giving the upgrade process enough time and recovering from a failed upgrade. You may need to increase this value if your Kafka cluster is very busy.", availableSince = "2.5.9.Final")
    Duration lockTimeout;

    @ConfigProperty(name = "registry.kafkasql.upgrade-test-mode", defaultValue = "false")
    boolean testMode;

    @ConfigProperty(name = "registry.kafkasql.upgrade-test-init-delay", defaultValue = "0ms")
    Duration testModeInitDelay;

    @Inject
    KafkaSqlSubmitter submitter;

    @Inject
    KafkaSqlStore sqlStore;

    @Inject
    Instance<KafkaSqlUpgrader> upgraders;

    @Inject
    Logger log;

    @Inject
    ManagedExecutor executor;

    @Inject
    ThreadContext threadContext;
    private String localUpgraderUUID;
    private State state;
    private boolean retry;
    private long sequence;
    private volatile boolean localTryLocked;
    private Instant localTryLockedTimestamp;
    private volatile boolean upgrading;
    private Instant initTimestamp;
    private Instant closeTimestamp;
    private Exception upgradeError;
    private WaitHeartbeatEmitter waitHeartbeatEmitter;
    private int currentVersion = 1;
    private int targetVersion = 2;
    private Map<String, LockRecord> lockMap = new HashMap();
    private int localTryLockCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.apicurio.registry.storage.impl.kafkasql.upgrade.KafkaSqlUpgraderManager$1, reason: invalid class name */
    /* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType;
        static final /* synthetic */ int[] $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.WAIT_FOR_BOOTSTRAP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.WAIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.TRY_LOCK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.LOCKED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.CLOSING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[State.CLOSED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType = new int[ActionType.values().length];
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[ActionType.UPGRADE_BOOTSTRAP.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[ActionType.UPGRADE_TRY_LOCK.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[ActionType.UPGRADE_ABORT_AND_UNLOCK.ordinal()] = 3;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[ActionType.UPGRADE_COMMIT_AND_UNLOCK.ordinal()] = 4;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[ActionType.UPGRADE_LOCK_HEARTBEAT.ordinal()] = 5;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[ActionType.UPGRADE_WAIT_HEARTBEAT.ordinal()] = 6;
            } catch (NoSuchFieldError e13) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager$LockRecord.class */
    public static class LockRecord {
        String upgraderUUID;
        Integer targetVersion;
        long tryLockSequence = -1;
        Instant latestLockTimestamp;
        boolean tryLocked;

        private LockRecord() {
        }

        private boolean isTimedOut(Instant instant, Duration duration) {
            return this.latestLockTimestamp != null && instant.isAfter(this.latestLockTimestamp.plus((TemporalAmount) duration));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager$State.class */
    public enum State {
        WAIT_FOR_BOOTSTRAP,
        WAIT,
        TRY_LOCK,
        LOCKED,
        FAILED,
        CLOSING,
        CLOSED
    }

    /* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager$UpgraderManagerHandle.class */
    public static class UpgraderManagerHandle {
        private Instant lastHeartbeat;
        private final KafkaSqlSubmitter submitter;
        private final String localUpgraderUUID;
        private final Logger log;
        private final Duration lockTimeout;
        private final int targetVersion;

        private UpgraderManagerHandle(KafkaSqlSubmitter kafkaSqlSubmitter, String str, Logger logger, Duration duration, int i) {
            this.submitter = kafkaSqlSubmitter;
            this.localUpgraderUUID = str;
            this.log = logger;
            this.lockTimeout = duration;
            this.targetVersion = i;
        }

        public synchronized void heartbeat() {
            Instant now = Instant.now();
            if (this.lastHeartbeat == null || now.isAfter(this.lastHeartbeat.plus((TemporalAmount) KafkaSqlUpgraderManager.scale(this.lockTimeout, 0.35f)))) {
                this.log.debug("Sending lock heartbeat.");
                this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_LOCK_HEARTBEAT, this.localUpgraderUUID, null));
                this.lastHeartbeat = now;
            }
        }

        private synchronized boolean isTimedOut() {
            return Instant.now().isAfter(this.lastHeartbeat.plus((TemporalAmount) KafkaSqlUpgraderManager.scale(this.lockTimeout, 0.85f)));
        }

        public Duration getLockTimeout() {
            return this.lockTimeout;
        }

        public int getTargetVersion() {
            return this.targetVersion;
        }
    }

    /* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager$WaitHeartbeatEmitter.class */
    private static class WaitHeartbeatEmitter implements AutoCloseable {
        private volatile Thread thread;
        private final Duration delay;
        private volatile Instant next;
        private final KafkaSqlSubmitter submitter;
        private final Logger log;
        private final ThreadContext threadContext;
        private volatile boolean stop = false;

        public WaitHeartbeatEmitter(Duration duration, KafkaSqlSubmitter kafkaSqlSubmitter, Logger logger, ThreadContext threadContext) {
            this.delay = duration;
            this.submitter = kafkaSqlSubmitter;
            this.log = logger;
            this.threadContext = threadContext;
        }

        public void runDelayedWaitHeartbeatOnce() {
            if (this.thread == null) {
                this.thread = new Thread(this.threadContext.contextualRunnable(() -> {
                    this.log.debug("WaitHeartbeatEmitter thread has started.");
                    while (!this.stop) {
                        try {
                            if (this.next != null) {
                                Instant now = Instant.now();
                                if (now.isAfter(this.next)) {
                                    this.log.debug("Sending wait heartbeat.");
                                    this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_WAIT_HEARTBEAT, null, null));
                                    this.next = null;
                                } else {
                                    Thread.sleep(Duration.between(now, this.next).abs().toMillis());
                                }
                            } else {
                                Thread.sleep(1000L);
                            }
                        } catch (InterruptedException e) {
                        }
                    }
                }));
                this.thread.start();
            }
            this.next = Instant.now().plus((TemporalAmount) this.delay);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            if (this.thread != null) {
                this.stop = true;
                this.thread.interrupt();
                try {
                    this.thread.join(1000L);
                    if (this.thread.isAlive()) {
                        this.log.warn("WaitHeartbeatEmitter thread has failed to stop within 1000ms.");
                    } else {
                        this.log.debug("WaitHeartbeatEmitter thread has stopped successfully.");
                    }
                } catch (InterruptedException e) {
                    this.log.warn("WaitHeartbeatEmitter::close() interrupted: {}", e.getMessage());
                }
            }
        }
    }

    public synchronized void init() {
        if (this.state != null) {
            throw new IllegalStateException("The init method MUST be called only once");
        }
        if (this.testMode) {
            this.log.warn("RUNNING IN TEST MODE");
            this.targetVersion = 99;
            try {
                Thread.sleep(this.testModeInitDelay.toMillis());
            } catch (InterruptedException e) {
            }
        }
        this.initTimestamp = Instant.now();
        this.localUpgraderUUID = UUID.randomUUID().toString();
        this.waitHeartbeatEmitter = new WaitHeartbeatEmitter(scale(this.lockTimeout, 1.1f), this.submitter, this.log, this.threadContext);
        this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_BOOTSTRAP, this.localUpgraderUUID, null));
        switchState(State.WAIT_FOR_BOOTSTRAP);
    }

    public synchronized void read(Instant instant, MessageKey messageKey, MessageValue messageValue) {
        if (this.state == null) {
            throw new IllegalStateException("The init method MUST be called first");
        }
        if (this.state != State.CLOSED) {
            if (this.state != State.LOCKED && this.state != State.FAILED && this.state != State.CLOSING) {
                this.upgraders.forEach(kafkaSqlUpgrader -> {
                    kafkaSqlUpgrader.read(messageKey, messageValue);
                });
            }
            if (messageKey instanceof UpgraderKey) {
                updateLockMap(instant, (UpgraderValue) messageValue);
            }
        }
        this.retry = true;
        while (this.retry) {
            this.retry = false;
            switch (AnonymousClass1.$SwitchMap$io$apicurio$registry$storage$impl$kafkasql$upgrade$KafkaSqlUpgraderManager$State[this.state.ordinal()]) {
                case BASE_KAFKASQL_TOPIC_VERSION /* 1 */:
                    if (messageKey instanceof UpgraderKey) {
                        UpgraderValue upgraderValue = (UpgraderValue) messageValue;
                        switch (AnonymousClass1.$SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[upgraderValue.getAction().ordinal()]) {
                            case BASE_KAFKASQL_TOPIC_VERSION /* 1 */:
                                if (!this.localUpgraderUUID.equals(upgraderValue.getUpgraderUUID())) {
                                    break;
                                } else {
                                    this.log.debug("Bootstrapped, waiting to upgrade.");
                                    long millis = Duration.between(this.initTimestamp, instant).abs().toMillis();
                                    if (millis > scale(this.lockTimeout, 0.25f).toMillis()) {
                                        this.log.warn("We detected a significant time difference ({} ms) between a moment when a Kafka message is produced (local time), and it's creation timestamp reported by Kafka at the moment it is consumed. If this causes issues during KafkaSQL storage upgrade, consider increasing 'registry.kafkasql.upgrade-lock-timeout' config value (currently {} ms).", Long.valueOf(millis), this.lockTimeout);
                                    }
                                    switchState(State.WAIT);
                                    break;
                                }
                            case TARGET_KAFKASQL_TOPIC_VERSION /* 2 */:
                            case 3:
                            case 4:
                            case 5:
                            case 6:
                                break;
                            default:
                                throw new RuntimeAssertionFailedException("Read an upgrader manager message with unsupported action type: " + upgraderValue.getAction());
                        }
                    } else {
                        continue;
                    }
                case TARGET_KAFKASQL_TOPIC_VERSION /* 2 */:
                    LockRecord computeActiveLock = computeActiveLock();
                    if (computeActiveLock != null) {
                        if (!this.localUpgraderUUID.equals(computeActiveLock.upgraderUUID)) {
                            this.log.debug("Another upgrader manager holds the lock (UUID = {}, latestTimestamp = {}). Waiting.", computeActiveLock.upgraderUUID, computeActiveLock.latestLockTimestamp);
                            this.waitHeartbeatEmitter.runDelayedWaitHeartbeatOnce();
                            break;
                        } else if (!this.localTryLocked) {
                            this.log.debug("Waiting for our unlock to propagate.");
                            break;
                        } else {
                            Instant now = Instant.now();
                            if (!now.isAfter(this.lockMap.get(this.localUpgraderUUID).latestLockTimestamp.plus((TemporalAmount) scale(this.lockTimeout, 0.5f)))) {
                                switchState(State.LOCKED);
                                break;
                            } else {
                                this.log.warn("We've got the lock but we don't have enough time ({} ms remaining). Unlocking.", Long.valueOf(Duration.between(this.lockMap.get(this.localUpgraderUUID).latestLockTimestamp, now).toMillis()));
                                this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_ABORT_AND_UNLOCK, this.localUpgraderUUID, Integer.valueOf(this.targetVersion)));
                                this.localTryLocked = false;
                                break;
                            }
                        }
                    } else {
                        this.log.debug("No active upgrader manager lock found.");
                        if (this.currentVersion < this.targetVersion) {
                            if (!this.localTryLocked) {
                                switchState(State.TRY_LOCK);
                                break;
                            } else {
                                Instant now2 = Instant.now();
                                if (this.lockMap.get(this.localUpgraderUUID).latestLockTimestamp != null && now2.isAfter(this.lockMap.get(this.localUpgraderUUID).latestLockTimestamp.plus((TemporalAmount) this.lockTimeout)) && this.localTryLockedTimestamp != null && now2.isAfter(this.localTryLockedTimestamp.plus((TemporalAmount) scale(this.lockTimeout, 1.5f)))) {
                                    this.localTryLocked = false;
                                    switchState(State.TRY_LOCK);
                                    this.log.warn("Our own lock attempt timed out.");
                                    break;
                                } else {
                                    this.log.debug("Waiting for us or somebody else to acquire the lock.");
                                    this.waitHeartbeatEmitter.runDelayedWaitHeartbeatOnce();
                                    break;
                                }
                            }
                        } else {
                            if (this.currentVersion != this.targetVersion) {
                                this.log.warn("Current storage version {} is later than the target version {}. Running an older version of Registry after a storage upgrade might cause problems.", Integer.valueOf(this.currentVersion), Integer.valueOf(this.targetVersion));
                            }
                            this.log.info("KafkaSQL storage topic is up-to-date (version {}).", Integer.valueOf(this.targetVersion));
                            switchState(State.CLOSING);
                            break;
                        }
                    }
                    break;
                case 3:
                    if (this.localTryLockCount <= 3) {
                        this.log.debug("Attempting to lock (UUID = {}).", this.localUpgraderUUID);
                        this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_TRY_LOCK, this.localUpgraderUUID, Integer.valueOf(this.targetVersion)));
                        this.localTryLocked = true;
                        this.localTryLockedTimestamp = Instant.now();
                        this.localTryLockCount++;
                        switchState(State.WAIT);
                        break;
                    } else {
                        this.log.warn("Maximum number of locked attempts (3) reached. Stopping to allow Registry to start, probably without an upgrade.");
                        switchState(State.FAILED);
                        break;
                    }
                case 4:
                    if (!this.upgrading) {
                        this.log.info("Lock acquired (UUID = {}) to perform upgrade from version {} to version {}.", new Object[]{this.localUpgraderUUID, Integer.valueOf(this.currentVersion), Integer.valueOf(this.targetVersion)});
                        this.upgrading = true;
                        this.executor.execute(() -> {
                            try {
                                try {
                                    List<KafkaSqlUpgrader> computeActiveUpgraders = computeActiveUpgraders();
                                    if (computeActiveUpgraders.isEmpty()) {
                                        this.log.info("No upgraders found for this version.");
                                    } else {
                                        this.log.info("Performing an upgrade with (in order): {} .", computeActiveUpgraders.stream().map(kafkaSqlUpgrader2 -> {
                                            return kafkaSqlUpgrader2.getClass().getSimpleName();
                                        }).collect(Collectors.joining(", ")));
                                        UpgraderManagerHandle upgraderManagerHandle = new UpgraderManagerHandle(this.submitter, this.localUpgraderUUID, this.log, this.lockTimeout, this.targetVersion);
                                        upgraderManagerHandle.heartbeat();
                                        if (this.testMode) {
                                            try {
                                                ContentEntity contentEntityByContentId = this.sqlStore.getContentEntityByContentId(2L);
                                                this.log.debug("Content hash before: {}", contentEntityByContentId.contentHash);
                                                this.log.debug("Canonical content hash before: {}", contentEntityByContentId.canonicalHash);
                                            } catch (Exception e) {
                                            }
                                        }
                                        for (KafkaSqlUpgrader kafkaSqlUpgrader3 : computeActiveUpgraders) {
                                            kafkaSqlUpgrader3.upgrade(upgraderManagerHandle);
                                            if (upgraderManagerHandle.isTimedOut()) {
                                                this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_ABORT_AND_UNLOCK, this.localUpgraderUUID, Integer.valueOf(this.targetVersion)));
                                                this.localTryLocked = false;
                                                switchState(State.WAIT);
                                                this.log.warn("Upgrader {} ran out of time (took {} ms). Make sure it sends heartbeat often enough.", kafkaSqlUpgrader3.getClass().getSimpleName(), Long.valueOf(Duration.between(upgraderManagerHandle.lastHeartbeat, Instant.now()).toMillis()));
                                                this.upgrading = false;
                                                return;
                                            }
                                            upgraderManagerHandle.heartbeat();
                                        }
                                        if (this.testMode) {
                                            try {
                                                ContentEntity contentEntityByContentId2 = this.sqlStore.getContentEntityByContentId(2L);
                                                this.log.debug("Content hash after: {}", contentEntityByContentId2.contentHash);
                                                this.log.debug("Canonical content hash after: {}", contentEntityByContentId2.canonicalHash);
                                            } catch (Exception e2) {
                                            }
                                        }
                                    }
                                    this.submitter.send(UpgraderKey.create(false), UpgraderValue.create(ActionType.UPGRADE_COMMIT_AND_UNLOCK, this.localUpgraderUUID, Integer.valueOf(this.targetVersion)));
                                    this.log.info("Upgrade finished successfully!");
                                    switchState(State.CLOSING);
                                    this.upgrading = false;
                                } catch (Exception e3) {
                                    this.upgradeError = e3;
                                    switchState(State.FAILED);
                                    this.upgrading = false;
                                }
                            } catch (Throwable th) {
                                this.upgrading = false;
                                throw th;
                            }
                        });
                        break;
                    } else {
                        break;
                    }
                case 5:
                    if (this.upgradeError != null) {
                        this.log.error("Upgrade failed with an error. " + "If you are starting multiple nodes, check if another one succeeded, otherwise we suggest a restart.", this.upgradeError);
                    } else {
                        this.log.warn("Upgrade failed. {}", "If you are starting multiple nodes, check if another one succeeded, otherwise we suggest a restart.");
                    }
                    this.submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_ABORT_AND_UNLOCK, this.localUpgraderUUID, Integer.valueOf(this.targetVersion)));
                    switchState(State.CLOSING);
                    break;
                case 6:
                    this.upgraders.stream().forEach(kafkaSqlUpgrader2 -> {
                        try {
                            kafkaSqlUpgrader2.close();
                        } catch (Exception e) {
                            this.log.warn("Failed to close upgrader {} because of {}:{}", new Object[]{kafkaSqlUpgrader2.getClass().getSimpleName(), e.getClass().getSimpleName(), e.getMessage()});
                        }
                    });
                    this.waitHeartbeatEmitter.close();
                    this.lockMap = null;
                    this.closeTimestamp = Instant.now();
                    this.log.debug("Closed.");
                    switchState(State.CLOSED);
                    break;
                case 7:
                    break;
                default:
                    throw new UnreachableCodeException();
            }
        }
        this.sequence++;
    }

    public synchronized boolean isClosed() {
        return this.state == State.CLOSED;
    }

    public synchronized Duration getBootstrapAndUpgradeDuration() {
        Objects.requireNonNull(this.initTimestamp);
        Objects.requireNonNull(this.closeTimestamp);
        return Duration.between(this.initTimestamp, this.closeTimestamp);
    }

    private synchronized void switchState(State state) {
        this.log.debug("State change: {} -> {}", this.state, state);
        this.state = state;
        this.retry = true;
    }

    private synchronized List<KafkaSqlUpgrader> computeActiveUpgraders() {
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (int i = this.currentVersion; i < this.targetVersion; i++) {
            for (KafkaSqlUpgrader kafkaSqlUpgrader : this.upgraders) {
                if (!hashSet.contains(kafkaSqlUpgrader.getClass()) && kafkaSqlUpgrader.supportsVersion(i)) {
                    hashSet.add(kafkaSqlUpgrader.getClass());
                    arrayList.add(kafkaSqlUpgrader);
                }
            }
        }
        return arrayList;
    }

    private LockRecord computeActiveLock() {
        if (this.currentVersion < this.targetVersion) {
            return this.lockMap.values().stream().filter(lockRecord -> {
                return lockRecord.targetVersion != null && lockRecord.targetVersion.intValue() == this.targetVersion && lockRecord.tryLocked && !lockRecord.isTimedOut(Instant.now(), this.lockTimeout);
            }).min(Comparator.comparingLong(lockRecord2 -> {
                return lockRecord2.tryLockSequence;
            })).orElse(null);
        }
        if (this.currentVersion == this.targetVersion) {
            return null;
        }
        this.log.warn("Current storage version {} is later than the target version {}. Running an older version of Registry after a storage upgrade might cause problems.", Integer.valueOf(this.currentVersion), Integer.valueOf(this.targetVersion));
        return null;
    }

    private void updateLockMap(Instant instant, UpgraderValue upgraderValue) {
        if (upgraderValue.getUpgraderUUID() == null) {
            return;
        }
        LockRecord computeIfAbsent = this.lockMap.computeIfAbsent(upgraderValue.getUpgraderUUID(), str -> {
            LockRecord lockRecord = new LockRecord();
            lockRecord.upgraderUUID = str;
            return lockRecord;
        });
        switch (AnonymousClass1.$SwitchMap$io$apicurio$registry$storage$impl$kafkasql$values$ActionType[upgraderValue.getAction().ordinal()]) {
            case BASE_KAFKASQL_TOPIC_VERSION /* 1 */:
            case 6:
                return;
            case TARGET_KAFKASQL_TOPIC_VERSION /* 2 */:
                computeIfAbsent.targetVersion = upgraderValue.getVersion();
                computeIfAbsent.tryLockSequence = this.sequence;
                computeIfAbsent.tryLocked = true;
                computeIfAbsent.latestLockTimestamp = instant;
                return;
            case 3:
                computeIfAbsent.tryLocked = false;
                computeIfAbsent.latestLockTimestamp = instant;
                return;
            case 4:
                computeIfAbsent.tryLocked = false;
                computeIfAbsent.latestLockTimestamp = instant;
                if (upgraderValue.getVersion().intValue() > this.currentVersion) {
                    this.currentVersion = upgraderValue.getVersion().intValue();
                    return;
                }
                return;
            case 5:
                computeIfAbsent.latestLockTimestamp = instant;
                return;
            default:
                throw new RuntimeAssertionFailedException("Read an upgrader message with unsupported action type: " + upgraderValue.getAction());
        }
    }

    public static Duration scale(Duration duration, float f) {
        return Duration.ofMillis(((float) duration.toMillis()) * f);
    }

    public Duration getLockTimeout() {
        return this.lockTimeout;
    }
}
