package io.apicurio.registry.kafka;

import io.apicurio.registry.common.proto.Cmmn;
import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.kafka.snapshot.StorageSnapshot;
import io.apicurio.registry.metrics.MetricIDs;
import io.apicurio.registry.metrics.PersistenceExceptionLivenessApply;
import io.apicurio.registry.metrics.PersistenceTimeoutReadinessApply;
import io.apicurio.registry.storage.ArtifactAlreadyExistsException;
import io.apicurio.registry.storage.ArtifactMetaDataDto;
import io.apicurio.registry.storage.ArtifactNotFoundException;
import io.apicurio.registry.storage.EditableArtifactMetaDataDto;
import io.apicurio.registry.storage.RegistryStorageException;
import io.apicurio.registry.storage.RuleAlreadyExistsException;
import io.apicurio.registry.storage.RuleConfigurationDto;
import io.apicurio.registry.storage.RuleNotFoundException;
import io.apicurio.registry.storage.VersionNotFoundException;
import io.apicurio.registry.storage.impl.SimpleMapRegistryStorage;
import io.apicurio.registry.storage.proto.Str;
import io.apicurio.registry.types.ArtifactState;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.types.RegistryException;
import io.apicurio.registry.types.RuleType;
import io.apicurio.registry.utils.ConcurrentUtil;
import io.apicurio.registry.utils.ProtoUtil;
import io.apicurio.registry.utils.kafka.ProducerActions;
import io.apicurio.registry.utils.kafka.Submitter;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.metrics.annotation.ConcurrentGauge;
import org.eclipse.microprofile.metrics.annotation.Counted;
import org.eclipse.microprofile.metrics.annotation.Timed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ConcurrentGauge(name = "concurrent_operation_count_KafkaRegistryStorage", description = MetricIDs.STORAGE_CONCURRENT_OPERATION_COUNT_DESC, tags = {"group=STORAGE", "metric=concurrent_operation_count"}, reusable = true)
@PersistenceExceptionLivenessApply
@PersistenceTimeoutReadinessApply
@Counted(name = "storage_operation_count_KafkaRegistryStorage", description = MetricIDs.STORAGE_OPERATION_COUNT_DESC, tags = {"group=STORAGE", "metric=storage_operation_count"}, reusable = true)
@Timed(name = "storage_operation_time_KafkaRegistryStorage", description = MetricIDs.STORAGE_OPERATION_TIME_DESC, tags = {"group=STORAGE", "metric=storage_operation_time"}, unit = "milliseconds", reusable = true)
@ApplicationScoped
/* loaded from: input_file:io/apicurio/registry/kafka/KafkaRegistryStorage.class */
public class KafkaRegistryStorage extends SimpleMapRegistryStorage implements KafkaRegistryStorageHandle {
    private static final Logger log = LoggerFactory.getLogger(KafkaRegistryStorage.class);

    @ConfigProperty(name = "registry.kafka.snapshot.requests", defaultValue = "1000")
    long snapshotRequests;

    @ConfigProperty(name = "registry.kafka.snapshot.period.minutes", defaultValue = "1200")
    long snapshotPeriod;

    @ConfigProperty(name = "registry.kafka.snapshot.topic", defaultValue = "snapshot-topic")
    String snapshotTopic;

    @ConfigProperty(name = "registry.kafka.schedule.period.minutes", defaultValue = "1")
    long schedulePeriod;

    @ConfigProperty(name = "registry.kafka.storage.topic", defaultValue = "storage-topic")
    String storageTopic;

    @Inject
    ProducerActions<Cmmn.UUID, Str.StorageValue> storageProducer;

    @Inject
    ProducerActions<Long, StorageSnapshot> snapshotProducer;
    private volatile long offset = 0;
    private final Submitter submitter = new Submitter(this::submit);
    private final Map<UUID, TimedFuture<Object>> outstandingRequests = new ConcurrentHashMap();
    private ScheduledExecutorService executor;
    private volatile long lastSnapshotTime;

    /* loaded from: input_file:io/apicurio/registry/kafka/KafkaRegistryStorage$TimedFuture.class */
    private static class TimedFuture<T> extends CompletableFuture<T> {
        private final long timestamp = System.currentTimeMillis();

        public long getTimestamp() {
            return this.timestamp;
        }
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage
    protected long nextGlobalId() {
        return this.offset;
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage
    protected void afterInit() {
        log.info("Autosnapshot on every {} requests, period: {}, scheduled check: {}", new Object[]{Long.valueOf(this.snapshotRequests), Long.valueOf(this.snapshotPeriod), Long.valueOf(this.schedulePeriod)});
    }

    @Override // io.apicurio.registry.kafka.KafkaRegistryStorageHandle
    public String registryTopic() {
        return this.storageTopic;
    }

    @Override // io.apicurio.registry.kafka.KafkaRegistryStorageHandle
    public String snapshotTopic() {
        return this.snapshotTopic;
    }

    @Override // io.apicurio.registry.kafka.KafkaRegistryStorageHandle
    public void loadSnapshot(StorageSnapshot storageSnapshot) {
        this.storage.putAll(storageSnapshot.getStorage());
        this.global.putAll(storageSnapshot.getGlobal());
        this.artifactRules.putAll(storageSnapshot.getArtifactRules());
        this.globalRules.putAll(storageSnapshot.getGlobalRules());
    }

    @Override // io.apicurio.registry.kafka.KafkaRegistryStorageHandle
    public void start() {
        this.executor = new ScheduledThreadPoolExecutor(1);
        this.executor.scheduleAtFixedRate(this::check, ThreadLocalRandom.current().nextLong(this.schedulePeriod * 60), this.schedulePeriod, TimeUnit.MINUTES);
    }

    private void check() {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<UUID, TimedFuture<Object>>> it = this.outstandingRequests.entrySet().iterator();
        while (it.hasNext()) {
            if (currentTimeMillis - it.next().getValue().getTimestamp() > TimeUnit.MINUTES.toMillis(this.schedulePeriod)) {
                it.remove();
            }
        }
        if (this.lastSnapshotTime <= 0 || currentTimeMillis - this.lastSnapshotTime <= TimeUnit.MINUTES.toMillis(this.snapshotPeriod)) {
            return;
        }
        log.info("Forced snapshot: " + ConcurrentUtil.get(this.submitter.submitSnapshot(currentTimeMillis)));
    }

    @Override // io.apicurio.registry.kafka.KafkaRegistryStorageHandle
    public void stop() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    private <T> CompletableFuture<T> submit(Str.StorageValue storageValue) {
        UUID randomUUID = UUID.randomUUID();
        TimedFuture<Object> timedFuture = new TimedFuture<>();
        this.outstandingRequests.put(randomUUID, timedFuture);
        return (CompletableFuture<T>) send(randomUUID, storageValue).whenComplete((obj, th) -> {
            if (th != null) {
                this.outstandingRequests.remove(randomUUID);
            }
        }).thenCompose(obj2 -> {
            return timedFuture;
        });
    }

    private CompletableFuture<?> send(UUID uuid, Str.StorageValue storageValue) {
        return this.storageProducer.apply(new ProducerRecord(this.storageTopic, ProtoUtil.convert(uuid), storageValue));
    }

    @Override // io.apicurio.registry.kafka.KafkaRegistryStorageHandle
    public void consumeStorageValue(ConsumerRecord<Cmmn.UUID, Str.StorageValue> consumerRecord) {
        this.offset = consumerRecord.offset();
        boolean z = false;
        TimedFuture<Object> remove = this.outstandingRequests.remove(ProtoUtil.convert((Cmmn.UUID) consumerRecord.key()));
        if (remove == null) {
            remove = new TimedFuture<>();
        } else {
            z = true;
        }
        Str.StorageValue storageValue = (Str.StorageValue) consumerRecord.value();
        Str.ActionType type = storageValue.getType();
        String str = (String) ProtoUtil.getNullable(storageValue.getArtifactId(), str2 -> {
            return !str2.isEmpty();
        }, Function.identity());
        long version = storageValue.getVersion();
        Str.ValueType vt = storageValue.getVt();
        boolean z2 = false;
        long currentTimeMillis = System.currentTimeMillis();
        try {
            switch (vt) {
                case ARTIFACT:
                    consumeArtifact(remove, storageValue, type, str, version);
                    break;
                case METADATA:
                    consumeMetaData(remove, storageValue, type, str, version);
                    break;
                case RULE:
                    consumeRule(remove, storageValue, type, str);
                    break;
                case SNAPSHOT:
                    this.lastSnapshotTime = storageValue.getSnapshot().getTimestamp();
                    currentTimeMillis = this.lastSnapshotTime;
                    z2 = true;
                    break;
                case STATE:
                    consumeState(remove, str, version, storageValue.getState());
                    break;
                default:
                    throw new IllegalArgumentException("No such ValueType: " + vt);
            }
            if (z && (z2 || (this.offset > 0 && this.offset % this.snapshotRequests == 0))) {
                CompletableFuture<RecordMetadata> makeSnapshot = makeSnapshot(currentTimeMillis);
                if (z2) {
                    TimedFuture<Object> timedFuture = remove;
                    makeSnapshot.whenComplete((recordMetadata, th) -> {
                        if (th != null) {
                            timedFuture.completeExceptionally(th);
                        } else {
                            timedFuture.complete(recordMetadata);
                        }
                    });
                }
            }
        } catch (RegistryException e) {
            remove.completeExceptionally(e);
        }
    }

    private CompletableFuture<RecordMetadata> makeSnapshot(long j) {
        return this.snapshotProducer.apply(new ProducerRecord(this.snapshotTopic, Long.valueOf(j), new StorageSnapshot(this.storage, this.global, this.artifactRules, this.globalRules, this.offset))).whenComplete((recordMetadata, th) -> {
            if (th != null) {
                log.warn("Exception dumping automatic snapshot: ", th);
            } else {
                log.info("Dumped automatic snapshot to {} ({} bytes)", recordMetadata, Integer.valueOf(recordMetadata.serializedValueSize()));
            }
        });
    }

    private void consumeRule(CompletableFuture<Object> completableFuture, Str.StorageValue storageValue, Str.ActionType actionType, String str) {
        Str.RuleValue rule = storageValue.getRule();
        Str.RuleType type = rule.getType();
        RuleType valueOf = (type == null || type == Str.RuleType.__NONE) ? null : RuleType.valueOf(type.name());
        RuleConfigurationDto ruleConfigurationDto = new RuleConfigurationDto(rule.getConfiguration());
        if (actionType == Str.ActionType.CREATE) {
            if (str != null) {
                super.createArtifactRule(str, valueOf, ruleConfigurationDto);
            } else {
                super.createGlobalRule(valueOf, ruleConfigurationDto);
            }
        } else if (actionType == Str.ActionType.UPDATE) {
            if (str != null) {
                super.updateArtifactRule(str, valueOf, ruleConfigurationDto);
            } else {
                super.updateGlobalRule(valueOf, ruleConfigurationDto);
            }
        } else if (actionType == Str.ActionType.DELETE) {
            if (str != null) {
                if (valueOf != null) {
                    super.deleteArtifactRule(str, valueOf);
                } else {
                    super.deleteArtifactRules(str);
                }
            } else if (valueOf != null) {
                super.deleteGlobalRule(valueOf);
            } else {
                super.deleteGlobalRules();
            }
        }
        completableFuture.complete(Void.class);
    }

    private void consumeMetaData(CompletableFuture<Object> completableFuture, Str.StorageValue storageValue, Str.ActionType actionType, String str, long j) {
        Str.MetaDataValue metadata = storageValue.getMetadata();
        if (actionType == Str.ActionType.UPDATE) {
            EditableArtifactMetaDataDto editableArtifactMetaDataDto = new EditableArtifactMetaDataDto(metadata.getName(), metadata.getDescription());
            if (j >= 0) {
                super.updateArtifactVersionMetaData(str, j, editableArtifactMetaDataDto);
            } else {
                super.updateArtifactMetaData(str, editableArtifactMetaDataDto);
            }
        } else if (actionType == Str.ActionType.DELETE) {
            super.deleteArtifactVersionMetaData(str, j);
        }
        completableFuture.complete(Void.class);
    }

    private void consumeArtifact(CompletableFuture<Object> completableFuture, Str.StorageValue storageValue, Str.ActionType actionType, String str, long j) {
        Str.ArtifactValue artifact = storageValue.getArtifact();
        if (actionType == Str.ActionType.CREATE || actionType == Str.ActionType.UPDATE) {
            completableFuture.complete(createOrUpdateArtifact(str, ArtifactType.values()[artifact.getArtifactType()], ContentHandle.create(artifact.getContent().toByteArray()), Str.ActionType.CREATE == actionType, this.offset));
        } else if (actionType == Str.ActionType.DELETE) {
            if (j < 0) {
                completableFuture.complete(super.deleteArtifact(str));
            } else {
                super.deleteArtifactVersion(str, j);
                completableFuture.complete(Void.class);
            }
        }
    }

    private void consumeState(CompletableFuture<Object> completableFuture, String str, long j, Str.ArtifactState artifactState) {
        super.updateArtifactState(str, ArtifactState.valueOf(artifactState.name()), j > 0 ? Integer.valueOf((int) j) : null);
        completableFuture.complete(Void.class);
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void updateArtifactState(String str, ArtifactState artifactState) {
        ConcurrentUtil.get(this.submitter.submitState(str, -1L, artifactState));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void updateArtifactState(String str, ArtifactState artifactState, Integer num) {
        ConcurrentUtil.get(this.submitter.submitState(str, Long.valueOf(num.longValue()), artifactState));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public CompletionStage<ArtifactMetaDataDto> createArtifact(String str, ArtifactType artifactType, ContentHandle contentHandle) throws ArtifactAlreadyExistsException, RegistryStorageException {
        return this.submitter.submitArtifact(Str.ActionType.CREATE, str, 0L, artifactType, contentHandle.bytes());
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public SortedSet<Long> deleteArtifact(String str) throws ArtifactNotFoundException, RegistryStorageException {
        return (SortedSet) ConcurrentUtil.get(this.submitter.submitArtifact(Str.ActionType.DELETE, str, -1L, null, null));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public CompletionStage<ArtifactMetaDataDto> updateArtifact(String str, ArtifactType artifactType, ContentHandle contentHandle) throws ArtifactNotFoundException, RegistryStorageException {
        return this.submitter.submitArtifact(Str.ActionType.UPDATE, str, 0L, artifactType, contentHandle.bytes());
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void deleteArtifactVersion(String str, long j) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitArtifact(Str.ActionType.DELETE, str, j, null, null));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void updateArtifactMetaData(String str, EditableArtifactMetaDataDto editableArtifactMetaDataDto) throws ArtifactNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitMetadata(Str.ActionType.UPDATE, str, -1L, editableArtifactMetaDataDto.getName(), editableArtifactMetaDataDto.getDescription()));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void createArtifactRule(String str, RuleType ruleType, RuleConfigurationDto ruleConfigurationDto) throws ArtifactNotFoundException, RuleAlreadyExistsException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.CREATE, str, ruleType, ruleConfigurationDto.getConfiguration()));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void deleteArtifactRules(String str) throws ArtifactNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.DELETE, str, null, null));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void updateArtifactRule(String str, RuleType ruleType, RuleConfigurationDto ruleConfigurationDto) throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.UPDATE, str, ruleType, ruleConfigurationDto.getConfiguration()));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void deleteArtifactRule(String str, RuleType ruleType) throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.DELETE, str, ruleType, null));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void updateArtifactVersionMetaData(String str, long j, EditableArtifactMetaDataDto editableArtifactMetaDataDto) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitMetadata(Str.ActionType.UPDATE, str, j, editableArtifactMetaDataDto.getName(), editableArtifactMetaDataDto.getDescription()));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void deleteArtifactVersionMetaData(String str, long j) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitMetadata(Str.ActionType.DELETE, str, j, null, null));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void createGlobalRule(RuleType ruleType, RuleConfigurationDto ruleConfigurationDto) throws RuleAlreadyExistsException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.CREATE, null, ruleType, ruleConfigurationDto.getConfiguration()));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void deleteGlobalRules() throws RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.DELETE, null, null, null));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void updateGlobalRule(RuleType ruleType, RuleConfigurationDto ruleConfigurationDto) throws RuleNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.UPDATE, null, ruleType, ruleConfigurationDto.getConfiguration()));
    }

    @Override // io.apicurio.registry.storage.impl.AbstractMapRegistryStorage, io.apicurio.registry.storage.RegistryStorage
    public void deleteGlobalRule(RuleType ruleType) throws RuleNotFoundException, RegistryStorageException {
        ConcurrentUtil.get(this.submitter.submitRule(Str.ActionType.DELETE, null, ruleType, null));
    }
}
