package io.kgraph.kgiraffe;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.protobuf.Message;
import graphql.GraphQL;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.hdocdb.HDocument;
import io.hdocdb.HValue;
import io.hdocdb.store.HDocumentCollection;
import io.hdocdb.store.HDocumentDB;
import io.hdocdb.store.InMemoryHDocumentDB;
import io.kcache.CacheLoader;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.caffeine.CaffeineCache;
import io.kgraph.kgiraffe.KGiraffeConfig;
import io.kgraph.kgiraffe.notifier.Notifier;
import io.kgraph.kgiraffe.schema.GraphQLExecutor;
import io.kgraph.kgiraffe.schema.GraphQLSchemaBuilder;
import io.kgraph.kgiraffe.schema.Status;
import io.kgraph.kgiraffe.schema.converters.GraphQLProtobufConverter;
import io.kgraph.kgiraffe.util.Jackson;
import io.kgraph.kgiraffe.util.proto.ProtoFileElem;
import io.vavr.Tuple2;
import io.vavr.control.Either;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang.SerializationException;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.ojai.Document;
import org.ojai.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/kgiraffe/KGiraffeEngine.class */
public class KGiraffeEngine implements Configurable, Closeable {
    public static final String REGISTERED_SCHEMAS_COLLECTION_NAME = "_registered_schemas";
    public static final String STAGED_SCHEMAS_COLLECTION_NAME = "_staged_schemas";
    private KGiraffeConfig config;
    private Notifier notifier;
    private SchemaRegistryClient schemaRegistry;
    private Map<String, SchemaProvider> schemaProviders;
    private GraphQLExecutor executor;
    private Map<String, KGiraffeConfig.Serde> keySerdes;
    private Map<String, KGiraffeConfig.Serde> valueSerdes;
    private final Map<String, KafkaCache<Bytes, Bytes>> caches;
    private final HDocumentDB docdb;
    private final AtomicBoolean initialized;
    private static KGiraffeEngine INSTANCE;
    private static final Logger LOG = LoggerFactory.getLogger(KGiraffeEngine.class);
    private static final ObjectMapper MAPPER = Jackson.newObjectMapper();
    private final Map<String, Either<Value.Type, ParsedSchema>> keySchemas = new HashMap();
    private final Map<String, Either<Value.Type, ParsedSchema>> valueSchemas = new HashMap();
    private final Map<Tuple2<String, ProtobufSchema>, ProtobufSchema> protSchemaCache = new HashMap();
    private int idCounter = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.kgraph.kgiraffe.KGiraffeEngine$8, reason: invalid class name */
    /* loaded from: input_file:io/kgraph/kgiraffe/KGiraffeEngine$8.class */
    public static /* synthetic */ class AnonymousClass8 {
        static final /* synthetic */ int[] $SwitchMap$org$ojai$Value$Type = new int[Value.Type.values().length];

        static {
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.STRING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.SHORT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.INT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.LONG.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.FLOAT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.DOUBLE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$ojai$Value$Type[Value.Type.BINARY.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType = new int[KGiraffeConfig.SerdeType.values().length];
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.SHORT.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.INT.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.LONG.ordinal()] = 3;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.FLOAT.ordinal()] = 4;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.DOUBLE.ordinal()] = 5;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.STRING.ordinal()] = 6;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.BINARY.ordinal()] = 7;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.AVRO.ordinal()] = 8;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.JSON.ordinal()] = 9;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.PROTO.ordinal()] = 10;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.LATEST.ordinal()] = 11;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$io$kgraph$kgiraffe$KGiraffeConfig$SerdeType[KGiraffeConfig.SerdeType.ID.ordinal()] = 12;
            } catch (NoSuchFieldError e19) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/kgraph/kgiraffe/KGiraffeEngine$UpdateHandler.class */
    public class UpdateHandler implements CacheUpdateHandler<Bytes, Bytes> {
        private static final int MAGIC_BYTE = 0;

        UpdateHandler() {
        }

        public void handleUpdate(Headers headers, Bytes bytes, Bytes bytes2, Bytes bytes3, TopicPartition topicPartition, long j, long j2, TimestampType timestampType, Optional<Integer> optional) {
            String str = topicPartition.topic();
            int partition = topicPartition.partition();
            String str2 = str + "-" + partition + "-" + j;
            HDocumentCollection collection = KGiraffeEngine.this.docdb.getCollection(str);
            HDocument hDocument = new HDocument();
            hDocument.setId(str2);
            Map<String, Object> convertHeaders = convertHeaders(headers);
            if (convertHeaders != null) {
                hDocument.set(GraphQLSchemaBuilder.HEADERS_ATTR_NAME, convertHeaders);
            }
            if (bytes != null && bytes.get() != Bytes.EMPTY) {
                try {
                    if (KGiraffeEngine.this.getKeySchema(str).isRight()) {
                        hDocument.set(GraphQLSchemaBuilder.KEY_SCHEMA_ID_ATTR_NAME, schemaIdFor(bytes.get()));
                    }
                    hDocument.set(GraphQLSchemaBuilder.KEY_ATTR_NAME, KGiraffeEngine.this.deserializeKey(str, bytes.get()));
                } catch (IOException e) {
                    hDocument.set(GraphQLSchemaBuilder.KEY_ERROR_ATTR_NAME, trace(e));
                }
            }
            try {
                if (KGiraffeEngine.this.getValueSchema(str).isRight()) {
                    hDocument.set(GraphQLSchemaBuilder.VALUE_SCHEMA_ID_ATTR_NAME, schemaIdFor(bytes2.get()));
                }
                hDocument.set(GraphQLSchemaBuilder.VALUE_ATTR_NAME, KGiraffeEngine.this.deserializeValue(str, bytes2.get()));
            } catch (IOException e2) {
                hDocument.set(GraphQLSchemaBuilder.VALUE_ERROR_ATTR_NAME, trace(e2));
            }
            hDocument.set(GraphQLSchemaBuilder.TOPIC_ATTR_NAME, str);
            hDocument.set(GraphQLSchemaBuilder.PARTITION_ATTR_NAME, partition);
            hDocument.set("offset", j);
            hDocument.set(GraphQLSchemaBuilder.TIMESTAMP_ATTR_NAME, j2);
            hDocument.set(GraphQLSchemaBuilder.TIMESTAMP_TYPE_ATTR_NAME, timestampType.toString());
            if (optional.isPresent()) {
                hDocument.set(GraphQLSchemaBuilder.LEADER_EPOCH_ATTR_NAME, optional.get().intValue());
            }
            collection.insertOrReplace(hDocument);
            collection.flush();
            KGiraffeEngine.this.notifier.publish(str, collection.findById(str2));
        }

        public void handleUpdate(Bytes bytes, Bytes bytes2, Bytes bytes3, TopicPartition topicPartition, long j, long j2) {
            throw new UnsupportedOperationException();
        }

        private Map<String, Object> convertHeaders(Headers headers) {
            if (headers == null) {
                return null;
            }
            HashMap hashMap = new HashMap();
            Iterator it = headers.iterator();
            while (it.hasNext()) {
                Header header = (Header) it.next();
                hashMap.merge(header.key(), new String(header.value(), StandardCharsets.UTF_8), (obj, obj2) -> {
                    if (obj instanceof List) {
                        ((List) obj).add((String) obj2);
                        return obj;
                    }
                    ArrayList arrayList = new ArrayList();
                    arrayList.add((String) obj);
                    arrayList.add((String) obj2);
                    return arrayList;
                });
            }
            return hashMap;
        }

        private int schemaIdFor(byte[] bArr) {
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            if (wrap.get() != 0) {
                throw new SerializationException("Unknown magic byte!");
            }
            return wrap.getInt();
        }

        private String trace(Throwable th) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            th.printStackTrace(new PrintStream((OutputStream) byteArrayOutputStream, false, StandardCharsets.UTF_8));
            return byteArrayOutputStream.toString(StandardCharsets.UTF_8);
        }

        public /* bridge */ /* synthetic */ void handleUpdate(Headers headers, Object obj, Object obj2, Object obj3, TopicPartition topicPartition, long j, long j2, TimestampType timestampType, Optional optional) {
            handleUpdate(headers, (Bytes) obj, (Bytes) obj2, (Bytes) obj3, topicPartition, j, j2, timestampType, (Optional<Integer>) optional);
        }
    }

    public static synchronized KGiraffeEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new KGiraffeEngine();
        }
        return INSTANCE;
    }

    public static synchronized void closeInstance() {
        if (INSTANCE != null) {
            try {
                INSTANCE.close();
            } catch (IOException e) {
                LOG.warn("Could not close engine", e);
            }
            INSTANCE = null;
        }
    }

    private KGiraffeEngine() {
        try {
            this.caches = new HashMap();
            this.docdb = new InMemoryHDocumentDB();
            this.initialized = new AtomicBoolean();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void configure(Map<String, ?> map) {
        configure(new KGiraffeConfig(map));
    }

    public void configure(KGiraffeConfig kGiraffeConfig) {
        this.config = kGiraffeConfig;
    }

    public void init(Notifier notifier) {
        this.notifier = notifier;
        List asList = Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider());
        this.schemaRegistry = createSchemaRegistry(this.config.getSchemaRegistryUrls(), asList, this.config.originals());
        this.schemaProviders = (Map) asList.stream().collect(Collectors.toMap((v0) -> {
            return v0.schemaType();
        }, schemaProvider -> {
            return schemaProvider;
        }));
        Iterator<KGiraffeConfig.Serde> it = this.config.getStagedSchemas().iterator();
        while (it.hasNext()) {
            stageSchemas(it.next());
        }
        this.keySerdes = this.config.getKeySerdes();
        this.valueSerdes = this.config.getValueSerdes();
        this.executor = new GraphQLExecutor(this.config, new GraphQLSchemaBuilder(this, this.config.getTopics()));
        initCaches();
        if (!this.initialized.compareAndSet(false, true)) {
            throw new IllegalStateException("Illegal state while initializing engine. Engine was already initialized");
        }
    }

    public static SchemaRegistryClient createSchemaRegistry(List<String> list, List<SchemaProvider> list2, Map<String, Object> map) {
        if (list == null || list.isEmpty()) {
            return null;
        }
        String validateAndMaybeGetMockScope = MockSchemaRegistry.validateAndMaybeGetMockScope(list);
        return validateAndMaybeGetMockScope != null ? MockSchemaRegistry.getClientForScope(validateAndMaybeGetMockScope, list2) : new CachedSchemaRegistryClient(list, 1000, list2, map);
    }

    public static void resetSchemaRegistry(List<String> list, SchemaRegistryClient schemaRegistryClient) {
        if (list == null || list.isEmpty()) {
            return;
        }
        String validateAndMaybeGetMockScope = MockSchemaRegistry.validateAndMaybeGetMockScope(list);
        if (validateAndMaybeGetMockScope != null) {
            MockSchemaRegistry.dropScope(validateAndMaybeGetMockScope);
        } else {
            schemaRegistryClient.reset();
        }
    }

    public Notifier getNotifier() {
        return this.notifier;
    }

    public SchemaRegistryClient getSchemaRegistry() {
        if (this.schemaRegistry == null) {
            throw new ConfigException("Missing schema registry URL");
        }
        return this.schemaRegistry;
    }

    public SchemaProvider getSchemaProvider(String str) {
        return this.schemaProviders.get(str);
    }

    public int nextId() {
        int i = this.idCounter - 1;
        this.idCounter = i;
        return i;
    }

    public Either<Value.Type, ParsedSchema> getKeySchema(String str) {
        return this.keySchemas.computeIfAbsent(str, str2 -> {
            return getSchema(str + "-key", this.keySerdes.getOrDefault(str, KGiraffeConfig.Serde.KEY_DEFAULT));
        });
    }

    public Either<Value.Type, ParsedSchema> getValueSchema(String str) {
        return this.valueSchemas.computeIfAbsent(str, str2 -> {
            return getSchema(str + "-value", this.valueSerdes.getOrDefault(str, KGiraffeConfig.Serde.VALUE_DEFAULT));
        });
    }

    private Either<Value.Type, ParsedSchema> getSchema(String str, KGiraffeConfig.Serde serde) {
        switch (serde.getSerdeType()) {
            case SHORT:
                return Either.left(Value.Type.SHORT);
            case INT:
                return Either.left(Value.Type.INT);
            case LONG:
                return Either.left(Value.Type.LONG);
            case FLOAT:
                return Either.left(Value.Type.FLOAT);
            case DOUBLE:
                return Either.left(Value.Type.DOUBLE);
            case STRING:
                return Either.left(Value.Type.STRING);
            case BINARY:
                return Either.left(Value.Type.BINARY);
            case AVRO:
            case JSON:
            case PROTO:
                return (Either) stageSchemas(serde).map((v0) -> {
                    return Either.right(v0);
                }).orElseGet(() -> {
                    return Either.left(Value.Type.BINARY);
                });
            case LATEST:
                return (Either) ((Optional) getLatestSchema(str)._2).map((v0) -> {
                    return Either.right(v0);
                }).orElseGet(() -> {
                    return Either.left(Value.Type.BINARY);
                });
            case ID:
                return (Either) ((Optional) getSchemaById(serde.getId())._2).map((v0) -> {
                    return Either.right(v0);
                }).orElseGet(() -> {
                    return Either.left(Value.Type.BINARY);
                });
            default:
                throw new IllegalArgumentException("Illegal serde type: " + serde.getSerdeType());
        }
    }

    private Optional<ParsedSchema> stageSchemas(KGiraffeConfig.Serde serde) {
        return (Optional) stageSchemas(serde.getSchemaType(), serde.getSchema(), serde.getSchemaReferences())._2;
    }

    public Tuple2<Document, Optional<ParsedSchema>> stageSchemas(String str, String str2, List<SchemaReference> list) {
        try {
            ParsedSchema parseSchemaOrElseThrow = getSchemaProvider(str).parseSchemaOrElseThrow(new Schema((String) null, (Integer) null, (Integer) null, str, list, str2), false, false);
            parseSchemaOrElseThrow.validate();
            return new Tuple2<>(cacheSchema(nextId(), null, 0, Status.STAGED, parseSchemaOrElseThrow), Optional.of(parseSchemaOrElseThrow));
        } catch (Exception e) {
            LOG.error("Could not parse schema " + str2, e);
            return new Tuple2<>(cacheErroredSchema(nextId(), str, str2, list, e), Optional.empty());
        }
    }

    public Tuple2<Document, Optional<ParsedSchema>> unstageSchema(int i) {
        Tuple2<Document, Optional<ParsedSchema>> cachedSchemaById = getCachedSchemaById(i, STAGED_SCHEMAS_COLLECTION_NAME);
        if (!((Document) cachedSchemaById._1).isEmpty()) {
            uncacheSchema(i, STAGED_SCHEMAS_COLLECTION_NAME);
        }
        return cachedSchemaById;
    }

    public Tuple2<Document, Optional<ParsedSchema>> getSchemaByVersion(String str, int i) {
        if (str == null) {
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
        if (i == -1) {
            return getLatestSchema(str);
        }
        try {
            SchemaMetadata schemaMetadata = getSchemaRegistry().getSchemaMetadata(str, i);
            Optional parseSchema = getSchemaRegistry().parseSchema(new Schema((String) null, schemaMetadata));
            return parseSchema.isPresent() ? new Tuple2<>(cacheSchema(schemaMetadata.getId(), str, schemaMetadata.getVersion(), Status.REGISTERED, (ParsedSchema) parseSchema.get()), parseSchema) : new Tuple2<>(new HDocument(), Optional.empty());
        } catch (Exception e) {
            LOG.error("Could not find schema for subject " + str + ", version " + i, e);
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
    }

    public Tuple2<Document, Optional<ParsedSchema>> getLatestSchema(String str) {
        if (str == null) {
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
        try {
            SchemaMetadata latestSchemaMetadata = getSchemaRegistry().getLatestSchemaMetadata(str);
            Optional parseSchema = getSchemaRegistry().parseSchema(new Schema((String) null, latestSchemaMetadata));
            if (parseSchema.isPresent()) {
                cacheSchema(latestSchemaMetadata.getId(), str, latestSchemaMetadata.getVersion(), Status.REGISTERED, (ParsedSchema) parseSchema.get());
            }
            return parseSchema.isPresent() ? new Tuple2<>(cacheSchema(latestSchemaMetadata.getId(), str, latestSchemaMetadata.getVersion(), Status.REGISTERED, (ParsedSchema) parseSchema.get()), parseSchema) : new Tuple2<>(new HDocument(), Optional.empty());
        } catch (Exception e) {
            LOG.error("Could not find latest schema for subject " + str, e);
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
    }

    public Tuple2<Document, Optional<ParsedSchema>> getSchemaById(int i) {
        try {
            Tuple2<Document, Optional<ParsedSchema>> cachedSchemaById = getCachedSchemaById(i);
            if (!((Document) cachedSchemaById._1).isEmpty()) {
                return cachedSchemaById;
            }
            ParsedSchema schemaById = getSchemaRegistry().getSchemaById(i);
            return new Tuple2<>(cacheSchema(i, null, 0, Status.REGISTERED, schemaById), Optional.of(schemaById));
        } catch (Exception e) {
            LOG.error("Could not find schema with id " + i, e);
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
    }

    public Tuple2<Document, Optional<ParsedSchema>> registerSchema(String str, int i, boolean z) {
        try {
            Tuple2<Document, Optional<ParsedSchema>> schemaById = getSchemaById(i);
            if (((Document) schemaById._1).isEmpty()) {
                return new Tuple2<>(new HDocument(), Optional.empty());
            }
            ParsedSchema parsedSchema = (ParsedSchema) ((Optional) schemaById._2).get();
            Document cacheSchema = cacheSchema(getSchemaRegistry().register(str, parsedSchema, z), str, getSchemaRegistry().getVersion(str, parsedSchema, z), Status.REGISTERED, parsedSchema);
            uncacheSchema(i, STAGED_SCHEMAS_COLLECTION_NAME);
            return new Tuple2<>(cacheSchema, (Optional) schemaById._2);
        } catch (Exception e) {
            LOG.error("Could not register schema with id " + i, e);
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
    }

    private Tuple2<Document, Optional<ParsedSchema>> getCachedSchemaById(int i) {
        Tuple2<Document, Optional<ParsedSchema>> cachedSchemaById = getCachedSchemaById(i, STAGED_SCHEMAS_COLLECTION_NAME);
        return !((Document) cachedSchemaById._1).isEmpty() ? cachedSchemaById : getCachedSchemaById(i, REGISTERED_SCHEMAS_COLLECTION_NAME);
    }

    private Tuple2<Document, Optional<ParsedSchema>> getCachedSchemaById(int i, String str) {
        try {
            Document findById = this.docdb.getCollection(str).findById(String.valueOf(i));
            if (findById == null) {
                return new Tuple2<>(new HDocument(), Optional.empty());
            }
            List emptyList = findById.getList(GraphQLSchemaBuilder.REFERENCES_ATTR_NAME) != null ? (List) MAPPER.convertValue(findById.getList(GraphQLSchemaBuilder.REFERENCES_ATTR_NAME), new TypeReference<List<SchemaReference>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.1
            }) : Collections.emptyList();
            ParsedSchema parsedSchema = null;
            if (!Status.ERRORED.symbol().equals(findById.getString(GraphQLSchemaBuilder.STATUS_ATTR_NAME))) {
                String string = findById.getString(GraphQLSchemaBuilder.SCHEMA_TYPE_ATTR_NAME);
                parsedSchema = getSchemaProvider(string).parseSchemaOrElseThrow(new Schema((String) null, (Integer) null, (Integer) null, string, emptyList, findById.getString(GraphQLSchemaBuilder.SCHEMA_RAW_ATTR_NAME)), false, false);
            }
            return new Tuple2<>(findById, Optional.ofNullable(parsedSchema));
        } catch (Exception e) {
            return new Tuple2<>(new HDocument(), Optional.empty());
        }
    }

    private Document cacheSchema(int i, String str, int i2, Status status, ParsedSchema parsedSchema) throws JsonProcessingException {
        HDocumentCollection collection = this.docdb.getCollection(status == Status.REGISTERED ? REGISTERED_SCHEMAS_COLLECTION_NAME : STAGED_SCHEMAS_COLLECTION_NAME);
        HDocument hDocument = new HDocument();
        hDocument.setId(String.valueOf(i));
        hDocument.set(GraphQLSchemaBuilder.ID_ATTR_NAME, i);
        if (str != null) {
            hDocument.set(GraphQLSchemaBuilder.SUBJECT_ATTR_NAME, str);
        }
        if (i2 > 0) {
            hDocument.set(GraphQLSchemaBuilder.VERSION_ATTR_NAME, i2);
        }
        hDocument.set(GraphQLSchemaBuilder.STATUS_ATTR_NAME, status.symbol());
        hDocument.set(GraphQLSchemaBuilder.SCHEMA_TYPE_ATTR_NAME, parsedSchema.schemaType());
        hDocument.set(GraphQLSchemaBuilder.SCHEMA_ATTR_NAME, schemaToMap(parsedSchema.schemaType(), parsedSchema));
        hDocument.set(GraphQLSchemaBuilder.SCHEMA_RAW_ATTR_NAME, parsedSchema.canonicalString());
        hDocument.set(GraphQLSchemaBuilder.REFERENCES_ATTR_NAME, (List) MAPPER.convertValue(parsedSchema.references(), new TypeReference<List<Map<String, Object>>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.2
        }));
        collection.insertOrReplace(hDocument);
        collection.flush();
        return hDocument;
    }

    private Document cacheErroredSchema(int i, String str, String str2, List<SchemaReference> list, Exception exc) {
        HDocumentCollection collection = this.docdb.getCollection(STAGED_SCHEMAS_COLLECTION_NAME);
        HDocument hDocument = new HDocument();
        hDocument.setId(String.valueOf(i));
        hDocument.set(GraphQLSchemaBuilder.ID_ATTR_NAME, i);
        hDocument.set(GraphQLSchemaBuilder.STATUS_ATTR_NAME, Status.ERRORED.symbol());
        hDocument.set(GraphQLSchemaBuilder.SCHEMA_TYPE_ATTR_NAME, str);
        hDocument.set(GraphQLSchemaBuilder.SCHEMA_RAW_ATTR_NAME, str2);
        hDocument.set(GraphQLSchemaBuilder.REFERENCES_ATTR_NAME, (List) MAPPER.convertValue(list, new TypeReference<List<Map<String, Object>>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.3
        }));
        hDocument.set(GraphQLSchemaBuilder.SCHEMA_ERROR_ATTR_NAME, exc.getLocalizedMessage());
        collection.insertOrReplace(hDocument);
        collection.flush();
        return hDocument;
    }

    private void uncacheSchema(int i, String str) {
        this.docdb.getCollection(str).remove(String.valueOf(i));
    }

    private Map<String, Object> schemaToMap(String str, ParsedSchema parsedSchema) throws JsonProcessingException {
        boolean z = -1;
        switch (str.hashCode()) {
            case -206537845:
                if (str.equals("PROTOBUF")) {
                    z = 2;
                    break;
                }
                break;
            case 2021682:
                if (str.equals("AVRO")) {
                    z = false;
                    break;
                }
                break;
            case 2286824:
                if (str.equals("JSON")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                JsonNode readTree = MAPPER.readTree(parsedSchema.canonicalString());
                if (!readTree.isObject()) {
                    JsonNode objectNode = JsonNodeFactory.instance.objectNode();
                    objectNode.set("type", readTree);
                    readTree = objectNode;
                }
                return (Map) MAPPER.convertValue(readTree, new TypeReference<Map<String, Object>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.4
                });
            case true:
                return (Map) MAPPER.convertValue(((JsonSchema) parsedSchema).toJsonNode(), new TypeReference<Map<String, Object>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.5
                });
            case true:
                return (Map) MAPPER.convertValue(new ProtoFileElem(((ProtobufSchema) parsedSchema).rawSchema()), new TypeReference<Map<String, Object>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.6
                });
            default:
                throw new IllegalArgumentException("Illegal type " + str);
        }
    }

    public Value deserializeKey(String str, byte[] bArr) throws IOException {
        return deserialize(getKeySchema(str), str, bArr);
    }

    public Value deserializeValue(String str, byte[] bArr) throws IOException {
        return deserialize(getValueSchema(str), str, bArr);
    }

    private Value deserialize(Either<Value.Type, ParsedSchema> either, String str, byte[] bArr) throws IOException {
        byte[] json;
        Object deserialize = getDeserializer(either).deserialize(str, bArr);
        if (!either.isRight()) {
            if (either.getLeft() == Value.Type.BINARY) {
                deserialize = Base64.getEncoder().encodeToString(((Bytes) deserialize).get());
            }
            return HValue.initFromObject(deserialize);
        }
        ProtobufSchema protobufSchema = (ParsedSchema) either.get();
        String str2 = null;
        String schemaType = protobufSchema.schemaType();
        boolean z = -1;
        switch (schemaType.hashCode()) {
            case -206537845:
                if (schemaType.equals("PROTOBUF")) {
                    z = 2;
                    break;
                }
                break;
            case 2021682:
                if (schemaType.equals("AVRO")) {
                    z = false;
                    break;
                }
                break;
            case 2286824:
                if (schemaType.equals("JSON")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                json = AvroSchemaUtils.toJson(deserialize);
                break;
            case true:
                json = JsonSchemaUtils.toJson(deserialize);
                break;
            case true:
                ProtobufSchema protobufSchema2 = protobufSchema;
                Message message = (Message) deserialize;
                json = ProtobufSchemaUtils.toJson(message);
                if (GraphQLProtobufConverter.hasMultipleMessageTypes(protobufSchema2)) {
                    str2 = message.getDescriptorForType().getName();
                    break;
                }
                break;
            default:
                throw new IllegalArgumentException("Illegal type " + protobufSchema.schemaType());
        }
        Document initFromMap = HValue.initFromMap((Map) MAPPER.readValue(new ByteArrayInputStream(json), new TypeReference<Map<String, Object>>() { // from class: io.kgraph.kgiraffe.KGiraffeEngine.7
        }));
        if (str2 != null) {
            Document hDocument = new HDocument();
            hDocument.set(str2, initFromMap);
            initFromMap = hDocument;
        }
        return HValue.initFromDocument(initFromMap);
    }

    public byte[] serializeKey(String str, Object obj) throws IOException {
        return serialize(getKeySchema(str), str, obj);
    }

    public byte[] serializeValue(String str, Object obj) throws IOException {
        return serialize(getValueSchema(str), str, obj);
    }

    private byte[] serialize(Either<Value.Type, ParsedSchema> either, String str, Object obj) throws IOException {
        Serializer<?> serializer = getSerializer(either);
        if (either.isRight()) {
            AvroSchema avroSchema = (ParsedSchema) either.get();
            JsonNode jsonNode = (JsonNode) MAPPER.convertValue(obj, JsonNode.class);
            String schemaType = avroSchema.schemaType();
            boolean z = -1;
            switch (schemaType.hashCode()) {
                case -206537845:
                    if (schemaType.equals("PROTOBUF")) {
                        z = 2;
                        break;
                    }
                    break;
                case 2021682:
                    if (schemaType.equals("AVRO")) {
                        z = false;
                        break;
                    }
                    break;
                case 2286824:
                    if (schemaType.equals("JSON")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    obj = AvroSchemaUtils.toObject(jsonNode, avroSchema);
                    break;
                case true:
                    obj = JsonSchemaUtils.toObject(jsonNode, (JsonSchema) avroSchema);
                    break;
                case true:
                    ProtobufSchema protobufSchema = (ProtobufSchema) avroSchema;
                    if (GraphQLProtobufConverter.hasMultipleMessageTypes(protobufSchema)) {
                        String str2 = (String) jsonNode.fieldNames().next();
                        jsonNode = jsonNode.get(str2);
                        protobufSchema = schemaWithName(protobufSchema, str2);
                    }
                    obj = ProtobufSchemaUtils.toObject(jsonNode, protobufSchema);
                    break;
                default:
                    throw new IllegalArgumentException("Illegal type " + avroSchema.schemaType());
            }
        } else if (either.getLeft() == Value.Type.BINARY) {
            obj = Base64.getDecoder().decode((String) obj);
        }
        return serializer.serialize(str, obj);
    }

    private ProtobufSchema schemaWithName(ProtobufSchema protobufSchema, String str) {
        return this.protSchemaCache.computeIfAbsent(new Tuple2<>(str, protobufSchema), tuple2 -> {
            return protobufSchema.copy(str);
        });
    }

    public Serializer<?> getSerializer(Either<Value.Type, ParsedSchema> either) {
        if (!either.isRight()) {
            switch (AnonymousClass8.$SwitchMap$org$ojai$Value$Type[((Value.Type) either.getLeft()).ordinal()]) {
                case 1:
                    return new StringSerializer();
                case 2:
                    return new ShortSerializer();
                case 3:
                    return new IntegerSerializer();
                case 4:
                    return new LongSerializer();
                case 5:
                    return new FloatSerializer();
                case 6:
                    return new DoubleSerializer();
                case 7:
                    return new BytesSerializer();
                default:
                    throw new IllegalArgumentException("Illegal type " + either.getLeft());
            }
        }
        ParsedSchema parsedSchema = (ParsedSchema) either.get();
        String schemaType = parsedSchema.schemaType();
        boolean z = -1;
        switch (schemaType.hashCode()) {
            case -206537845:
                if (schemaType.equals("PROTOBUF")) {
                    z = 2;
                    break;
                }
                break;
            case 2021682:
                if (schemaType.equals("AVRO")) {
                    z = false;
                    break;
                }
                break;
            case 2286824:
                if (schemaType.equals("JSON")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return new KafkaAvroSerializer(getSchemaRegistry(), this.config.originals());
            case true:
                return new KafkaJsonSchemaSerializer(getSchemaRegistry(), this.config.originals());
            case true:
                return new KafkaProtobufSerializer(getSchemaRegistry(), this.config.originals());
            default:
                throw new IllegalArgumentException("Illegal type " + parsedSchema.schemaType());
        }
    }

    public Deserializer<?> getDeserializer(Either<Value.Type, ParsedSchema> either) {
        if (!either.isRight()) {
            switch (AnonymousClass8.$SwitchMap$org$ojai$Value$Type[((Value.Type) either.getLeft()).ordinal()]) {
                case 1:
                    return new StringDeserializer();
                case 2:
                    return new ShortDeserializer();
                case 3:
                    return new IntegerDeserializer();
                case 4:
                    return new LongDeserializer();
                case 5:
                    return new FloatDeserializer();
                case 6:
                    return new DoubleDeserializer();
                case 7:
                    return new BytesDeserializer();
                default:
                    throw new IllegalArgumentException("Illegal type " + either.getLeft());
            }
        }
        ParsedSchema parsedSchema = (ParsedSchema) either.get();
        String schemaType = parsedSchema.schemaType();
        boolean z = -1;
        switch (schemaType.hashCode()) {
            case -206537845:
                if (schemaType.equals("PROTOBUF")) {
                    z = 2;
                    break;
                }
                break;
            case 2021682:
                if (schemaType.equals("AVRO")) {
                    z = false;
                    break;
                }
                break;
            case 2286824:
                if (schemaType.equals("JSON")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return new KafkaAvroDeserializer(getSchemaRegistry(), this.config.originals());
            case true:
                return new KafkaJsonSchemaDeserializer(getSchemaRegistry(), this.config.originals());
            case true:
                return new KafkaProtobufDeserializer(getSchemaRegistry(), this.config.originals());
            default:
                throw new IllegalArgumentException("Illegal type " + parsedSchema.schemaType());
        }
    }

    private void initCaches() {
        Iterator<String> it = this.config.getTopics().iterator();
        while (it.hasNext()) {
            initCache(it.next());
        }
    }

    private void initCache(String str) {
        Map originals = this.config.originals();
        HashMap hashMap = new HashMap(originals);
        for (Map.Entry entry : originals.entrySet()) {
            if (!((String) entry.getKey()).startsWith("kafkacache.")) {
                hashMap.put("kafkacache." + ((String) entry.getKey()), entry.getValue());
            }
        }
        String str2 = (String) hashMap.getOrDefault("kafkacache.group.id", "kgiraffe-1");
        hashMap.put("kafkacache.topic", str);
        hashMap.put("kafkacache.group.id", str2);
        hashMap.put("kafkacache.client.id", str2 + "-" + str);
        hashMap.put("kafkacache.topic.skip.validation", true);
        KafkaCache<Bytes, Bytes> kafkaCache = new KafkaCache<>(new KafkaCacheConfig(hashMap), Serdes.Bytes(), Serdes.Bytes(), new UpdateHandler(), new CaffeineCache(100, Duration.ofMillis(10000L), (CacheLoader) null));
        kafkaCache.init();
        this.caches.put(str, kafkaCache);
        this.docdb.createCollection(str);
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    public void sync() {
        this.caches.forEach((str, kafkaCache) -> {
            try {
                kafkaCache.sync();
            } catch (Exception e) {
                LOG.warn("Could not sync cache for " + str);
            }
        });
    }

    public HDocumentDB getDocDB() {
        return this.docdb;
    }

    public KafkaCache<Bytes, Bytes> getCache(String str) {
        return this.caches.get(str);
    }

    public GraphQL getGraphQL() {
        return this.executor.getGraphQL();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.caches.forEach((str, kafkaCache) -> {
            try {
                kafkaCache.close();
            } catch (IOException e) {
                LOG.warn("Could not close cache for " + str);
            }
        });
        resetSchemaRegistry(this.config.getSchemaRegistryUrls(), this.schemaRegistry);
    }

    public static <T> T getConfiguredInstance(String str, Map<String, ?> map) {
        try {
            Class<?> cls = Class.forName(str);
            Object newInstance = Utils.newInstance(cls);
            if (newInstance instanceof Configurable) {
                ((Configurable) newInstance).configure(map);
            }
            return (T) cls.cast(newInstance);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}
