package io.apicurio.registry.streams;

import com.google.common.collect.ImmutableMap;
import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.content.extract.ContentExtractor;
import io.apicurio.registry.rest.beans.EditableMetaData;
import io.apicurio.registry.storage.ArtifactStateExt;
import io.apicurio.registry.storage.InvalidArtifactStateException;
import io.apicurio.registry.storage.MetaDataKeys;
import io.apicurio.registry.storage.proto.Str;
import io.apicurio.registry.types.ArtifactState;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.types.RuleType;
import io.apicurio.registry.types.provider.ArtifactTypeUtilProviderFactory;
import io.apicurio.registry.utils.StringUtil;
import io.apicurio.registry.utils.kafka.ProtoSerde;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/apicurio/registry/streams/StreamsTopologyProvider.class */
public class StreamsTopologyProvider implements Supplier<Topology> {
    private final StreamsProperties properties;
    private final ForeachAction<? super String, ? super Str.Data> dataDispatcher;
    private final ArtifactTypeUtilProviderFactory factory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/apicurio/registry/streams/StreamsTopologyProvider$GlobalIdProcessor.class */
    public static class GlobalIdProcessor extends AbstractProcessor<Long, Str.TupleValue> {
        private final String storeName;
        private KeyValueStore<Long, Str.TupleValue> store;

        public GlobalIdProcessor(String str) {
            this.storeName = str;
        }

        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.store = processorContext.getStateStore(this.storeName);
        }

        public void process(Long l, Str.TupleValue tupleValue) {
            if (tupleValue == null) {
                this.store.delete(l);
            } else {
                this.store.put(l, tupleValue);
            }
        }
    }

    /* loaded from: input_file:io/apicurio/registry/streams/StreamsTopologyProvider$StorageTransformer.class */
    private static class StorageTransformer implements Transformer<String, Str.StorageValue, KeyValue<Long, Str.TupleValue>> {
        private static final Logger log = LoggerFactory.getLogger(StorageTransformer.class);
        private final StreamsProperties properties;
        private final ForeachAction<? super String, ? super Str.Data> dispatcher;
        private final ArtifactTypeUtilProviderFactory factory;
        private ProcessorContext context;
        private KeyValueStore<String, Str.Data> store;

        public StorageTransformer(StreamsProperties streamsProperties, ForeachAction<? super String, ? super Str.Data> foreachAction, ArtifactTypeUtilProviderFactory artifactTypeUtilProviderFactory) {
            this.properties = streamsProperties;
            this.dispatcher = foreachAction;
            this.factory = artifactTypeUtilProviderFactory;
        }

        public void init(ProcessorContext processorContext) {
            this.context = processorContext;
            this.store = processorContext.getStateStore(this.properties.getStorageStoreName());
        }

        public KeyValue<Long, Str.TupleValue> transform(String str, Str.StorageValue storageValue) {
            Str.Data data = (Str.Data) this.store.get(str);
            if (data == null) {
                data = Str.Data.getDefaultInstance();
            }
            long offset = this.context.offset();
            long globalId = this.properties.toGlobalId(offset, this.context.partition());
            Str.Data apply = apply(str, storageValue, data, globalId, offset);
            if (apply != null) {
                this.store.put(str, apply);
                this.dispatcher.apply(str, apply);
            } else {
                this.store.delete(str);
            }
            switch (storageValue.getType()) {
                case CREATE:
                case UPDATE:
                    return new KeyValue<>(Long.valueOf(globalId), Str.TupleValue.newBuilder().setArtifactId(str).setVersion(apply.getArtifactsCount()).build());
                case DELETE:
                    return new KeyValue<>(Long.valueOf(globalId), (Object) null);
                default:
                    return null;
            }
        }

        public void close() {
        }

        private Str.Data apply(String str, Str.StorageValue storageValue, Str.Data data, long j, long j2) {
            Str.ActionType type = storageValue.getType();
            long version = storageValue.getVersion();
            Str.ValueType vt = storageValue.getVt();
            switch (vt) {
                case ARTIFACT:
                    return consumeArtifact(data, storageValue, type, str, version, j, j2);
                case METADATA:
                    return consumeMetaData(data, storageValue, type, str, version, j2);
                case RULE:
                    return consumeRule(data, storageValue, type, j2);
                case STATE:
                    return consumeState(data, storageValue, str, version, j2);
                default:
                    throw new IllegalArgumentException("Cannot handle value type: " + vt);
            }
        }

        private Str.Data consumeRule(Str.Data data, Str.StorageValue storageValue, Str.ActionType actionType, long j) {
            Str.Data.Builder lastProcessedOffset = Str.Data.newBuilder(data).setLastProcessedOffset(j);
            Str.RuleValue rule = storageValue.getRule();
            Str.RuleType type = rule.getType();
            RuleType valueOf = (type == null || type == Str.RuleType.__NONE) ? null : RuleType.valueOf(type.name());
            if (actionType == Str.ActionType.CREATE) {
                if (lastProcessedOffset.getRulesList().stream().anyMatch(ruleValue -> {
                    return ruleValue.getType() == type;
                })) {
                    log.warn("Rule already exists: {}", valueOf);
                } else {
                    lastProcessedOffset.addRules(Str.RuleValue.newBuilder().setType(type).setConfiguration(rule.getConfiguration()).build());
                }
            } else if (actionType == Str.ActionType.UPDATE) {
                updateRule(rule, type, valueOf, lastProcessedOffset);
            } else if (actionType == Str.ActionType.DELETE) {
                if (valueOf != null) {
                    int i = -1;
                    int i2 = 0;
                    while (true) {
                        if (i2 >= lastProcessedOffset.getRulesCount()) {
                            break;
                        }
                        if (lastProcessedOffset.getRules(i2).getType() == type) {
                            i = i2;
                            break;
                        }
                        i2++;
                    }
                    if (i >= 0) {
                        lastProcessedOffset.removeRules(i);
                    }
                } else {
                    lastProcessedOffset.clearRules();
                }
            }
            return lastProcessedOffset.build();
        }

        private void updateRule(Str.RuleValue ruleValue, Str.RuleType ruleType, RuleType ruleType2, Str.Data.Builder builder) {
            Str.RuleValue ruleValue2 = null;
            int i = 0;
            while (true) {
                if (i >= builder.getRulesCount()) {
                    break;
                }
                if (builder.getRules(i).getType() == ruleType) {
                    ruleValue2 = builder.getRules(i);
                    break;
                }
                i++;
            }
            if (ruleValue2 == null) {
                log.warn("Rule not found: {}", ruleType2);
            } else {
                builder.setRules(i, Str.RuleValue.newBuilder().setType(ruleType).setConfiguration(ruleValue.getConfiguration()).build());
            }
        }

        private Str.Data consumeState(Str.Data data, Str.StorageValue storageValue, String str, long j, long j2) {
            Str.Data.Builder lastProcessedOffset = Str.Data.newBuilder(data).setLastProcessedOffset(j2);
            if (j > lastProcessedOffset.getArtifactsCount()) {
                log.warn("Version not found: {} [{}]", Long.valueOf(j), str);
            } else {
                int artifactsCount = ((int) (j >= 0 ? j : data.getArtifactsCount())) - 1;
                Str.ArtifactValue artifacts = data.getArtifacts(artifactsCount);
                ArtifactState state = ArtifactStateExt.getState(artifacts.getMetadataMap());
                ArtifactState valueOf = ArtifactState.valueOf(storageValue.getState().name());
                Str.ArtifactValue.Builder newBuilder = Str.ArtifactValue.newBuilder(artifacts);
                if (ArtifactStateExt.canTransition(state, valueOf)) {
                    newBuilder.putMetadata(MetaDataKeys.STATE, valueOf.name());
                } else {
                    log.error(InvalidArtifactStateException.errorMsg(state, valueOf));
                }
                lastProcessedOffset.setArtifacts(artifactsCount, newBuilder.m321build());
            }
            return lastProcessedOffset.build();
        }

        private Str.Data consumeMetaData(Str.Data data, Str.StorageValue storageValue, Str.ActionType actionType, String str, long j, long j2) {
            int i;
            Str.Data.Builder lastProcessedOffset = Str.Data.newBuilder(data).setLastProcessedOffset(j2);
            Str.MetaDataValue metadata = storageValue.getMetadata();
            int artifactsCount = lastProcessedOffset.getArtifactsCount();
            if (j > artifactsCount) {
                log.warn("Version not found: {} [{}]", Long.valueOf(j), str);
            } else {
                Str.ArtifactValue artifactValue = null;
                if (j > 0) {
                    i = (int) (j - 1);
                    artifactValue = lastProcessedOffset.getArtifacts(i);
                    if (!ArtifactStateExt.ACTIVE_STATES.contains(ArtifactStateExt.getState(artifactValue.getMetadataMap()))) {
                        log.warn(String.format("Not an active artifact, cannot modify metadata: %s [%s]", str, Long.valueOf(j)));
                        artifactValue = null;
                    }
                } else {
                    i = artifactsCount - 1;
                    while (i >= 0) {
                        artifactValue = lastProcessedOffset.getArtifacts(i);
                        if (ArtifactStateExt.ACTIVE_STATES.contains(ArtifactStateExt.getState(artifactValue.getMetadataMap()))) {
                            break;
                        }
                        i--;
                    }
                    if (i < 0) {
                        artifactValue = null;
                    }
                }
                if (artifactValue != null) {
                    Str.ArtifactValue.Builder newBuilder = Str.ArtifactValue.newBuilder(artifactValue);
                    if (actionType == Str.ActionType.UPDATE) {
                        newBuilder.putMetadata(MetaDataKeys.NAME, metadata.getName());
                        newBuilder.putMetadata(MetaDataKeys.DESCRIPTION, metadata.getDescription());
                        newBuilder.putMetadata(MetaDataKeys.LABELS, metadata.getLabels());
                    } else if (actionType == Str.ActionType.DELETE) {
                        newBuilder.removeMetadata(MetaDataKeys.NAME);
                        newBuilder.removeMetadata(MetaDataKeys.DESCRIPTION);
                        newBuilder.removeMetadata(MetaDataKeys.LABELS);
                    }
                    lastProcessedOffset.setArtifacts(i, newBuilder.m321build());
                }
            }
            return lastProcessedOffset.build();
        }

        private Str.Data consumeArtifact(Str.Data data, Str.StorageValue storageValue, Str.ActionType actionType, String str, long j, long j2, long j3) {
            Str.Data.Builder lastProcessedOffset = Str.Data.newBuilder(data).setLastProcessedOffset(j3);
            Str.ArtifactValue artifact = storageValue.getArtifact();
            if (actionType == Str.ActionType.CREATE || actionType == Str.ActionType.UPDATE) {
                createOrUpdateArtifact(lastProcessedOffset, str, j2, artifact, actionType == Str.ActionType.CREATE);
            } else if (actionType == Str.ActionType.DELETE) {
                if (j < 0) {
                    return null;
                }
                if (j > lastProcessedOffset.getArtifactsCount()) {
                    log.warn("Version not found: {} [{}]", Long.valueOf(j), str);
                } else {
                    lastProcessedOffset.setArtifacts((int) (j - 1), Str.ArtifactValue.getDefaultInstance());
                }
            }
            return lastProcessedOffset.build();
        }

        private void createOrUpdateArtifact(Str.Data.Builder builder, String str, long j, Str.ArtifactValue artifactValue, boolean z) {
            Map<String, String> metadataMap;
            builder.setArtifactId(str);
            int artifactsCount = builder.getArtifactsCount();
            if (z && artifactsCount > 0) {
                log.warn("Artifact already exists: {}", str);
                return;
            }
            if (!z && artifactsCount == 0) {
                log.warn("Artifact not found: {}", str);
                return;
            }
            Str.ArtifactValue.Builder newBuilder = Str.ArtifactValue.newBuilder(artifactValue);
            newBuilder.setId(j);
            int artifactsCount2 = builder.getArtifactsCount() + 1;
            ArtifactType artifactType = ArtifactType.values()[artifactValue.getArtifactType()];
            HashMap hashMap = new HashMap();
            hashMap.put(MetaDataKeys.ARTIFACT_ID, str);
            hashMap.put(MetaDataKeys.GLOBAL_ID, String.valueOf(j));
            hashMap.put(MetaDataKeys.VERSION, String.valueOf(artifactsCount2));
            hashMap.put(MetaDataKeys.TYPE, artifactType.value());
            String valueOf = String.valueOf(System.currentTimeMillis());
            hashMap.put(MetaDataKeys.CREATED_ON, valueOf);
            hashMap.put(MetaDataKeys.MODIFIED_ON, valueOf);
            hashMap.put(MetaDataKeys.STATE, ArtifactState.ENABLED.name());
            if (!z && (metadataMap = builder.getArtifacts(artifactsCount - 1).getMetadataMap()) != null) {
                checkNull(str, artifactsCount2, hashMap, MetaDataKeys.CREATED_ON, metadataMap.get(MetaDataKeys.CREATED_ON));
                if (metadataMap.containsKey(MetaDataKeys.NAME)) {
                    checkNull(str, artifactsCount2, hashMap, MetaDataKeys.NAME, metadataMap.get(MetaDataKeys.NAME));
                }
                if (metadataMap.containsKey(MetaDataKeys.DESCRIPTION)) {
                    checkNull(str, artifactsCount2, hashMap, MetaDataKeys.DESCRIPTION, metadataMap.get(MetaDataKeys.DESCRIPTION));
                }
            }
            ContentExtractor contentExtractor = this.factory.getArtifactTypeProvider(artifactType).getContentExtractor();
            EditableMetaData extract = contentExtractor.extract(ContentHandle.create(artifactValue.getContent().toByteArray()));
            if (contentExtractor.isExtracted(extract)) {
                if (!StringUtil.isEmpty(extract.getName())) {
                    checkNull(str, artifactsCount2, hashMap, MetaDataKeys.NAME, extract.getName());
                }
                if (!StringUtil.isEmpty(extract.getDescription())) {
                    checkNull(str, artifactsCount2, hashMap, MetaDataKeys.DESCRIPTION, extract.getDescription());
                }
            }
            newBuilder.putAllMetadata(hashMap);
            builder.addArtifacts(newBuilder);
        }

        private static void checkNull(String str, int i, Map<String, String> map, String str2, String str3) {
            if (str2 == null || str3 == null) {
                log.warn("Metadata - null key {} or value {} - [{} ({})]", new Object[]{str2, str3, str, Integer.valueOf(i)});
            } else {
                map.put(str2, str3);
            }
        }
    }

    public StreamsTopologyProvider(StreamsProperties streamsProperties, ForeachAction<? super String, ? super Str.Data> foreachAction, ArtifactTypeUtilProviderFactory artifactTypeUtilProviderFactory) {
        this.properties = streamsProperties;
        this.dataDispatcher = foreachAction;
        this.factory = artifactTypeUtilProviderFactory;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public Topology get() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ImmutableMap of = ImmutableMap.of("cleanup.policy", "compact", "min.compaction.lag.ms", "0", "segment.bytes", String.valueOf(67108864));
        KStream stream = streamsBuilder.stream(this.properties.getStorageTopic(), Consumed.with(Serdes.String(), ProtoSerde.parsedWith(Str.StorageValue.parser())));
        String storageStoreName = this.properties.getStorageStoreName();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storageStoreName), Serdes.String(), ProtoSerde.parsedWith(Str.Data.parser())).withCachingEnabled().withLoggingEnabled(of));
        KStream through = stream.transform(() -> {
            return new StorageTransformer(this.properties, this.dataDispatcher, this.factory);
        }, new String[]{storageStoreName}).through(this.properties.getGlobalIdTopic(), Produced.with(Serdes.Long(), ProtoSerde.parsedWith(Str.TupleValue.parser())));
        String globalIdStoreName = this.properties.getGlobalIdStoreName();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(globalIdStoreName), Serdes.Long(), ProtoSerde.parsedWith(Str.TupleValue.parser())).withCachingEnabled().withLoggingEnabled(of));
        through.process(() -> {
            return new GlobalIdProcessor(globalIdStoreName);
        }, new String[]{globalIdStoreName});
        return streamsBuilder.build(this.properties.getProperties());
    }
}
