/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import com.datastax.driver.core.AggregateMetadata;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.FunctionMetadata;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.MaterializedViewMetadata;
import com.datastax.driver.core.SchemaChangeListener;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.UserType;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.cassandra.CassandraClient;
import io.debezium.connector.cassandra.KeyValueSchema;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.SourceInfo;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.schema.KeyspaceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaHolder {
    private static final Logger LOGGER = LoggerFactory.getLogger(SchemaHolder.class);
    private final ConcurrentMap<KeyspaceTable, KeyValueSchema> tableToKVSchemaMap;
    private final String kafkaTopicPrefix;
    private final SourceInfoStructMaker<SourceInfo> sourceInfoStructMaker;
    private final CassandraClient cassandraClient;
    private final SchemaChangeListener schemaChangeListener;

    public SchemaHolder(CassandraClient cassandraClient, String kafkaTopicPrefix, SourceInfoStructMaker<SourceInfo> sourceInfoStructMaker) {
        this.cassandraClient = cassandraClient;
        this.kafkaTopicPrefix = kafkaTopicPrefix;
        this.sourceInfoStructMaker = sourceInfoStructMaker;
        this.tableToKVSchemaMap = new ConcurrentHashMap<KeyspaceTable, KeyValueSchema>();
        this.schemaChangeListener = new CassandraSchemaChangeListener();
        this.initialize();
    }

    private void initialize() {
        LOGGER.info("Initializing SchemaHolder ...");
        List<TableMetadata> cdcEnabledTableMetadataList = this.cassandraClient.getCdcEnabledTableMetadataList();
        for (TableMetadata tm : cdcEnabledTableMetadataList) {
            this.addOrUpdateTableSchema(new KeyspaceTable(tm), new KeyValueSchema(this.kafkaTopicPrefix, tm, this.sourceInfoStructMaker));
        }
        this.cassandraClient.getCluster().register(this.schemaChangeListener);
        LOGGER.info("Initialized SchemaHolder.");
    }

    public void close() {
        this.cassandraClient.getCluster().unregister(this.schemaChangeListener);
        LOGGER.info("Closed SchemaHolder.");
    }

    public KeyValueSchema getKeyValueSchema(KeyspaceTable kst) {
        return this.tableToKVSchemaMap.getOrDefault(kst, null);
    }

    public Set<TableMetadata> getCdcEnabledTableMetadataSet() {
        return this.tableToKVSchemaMap.values().stream().map(KeyValueSchema::tableMetadata).collect(Collectors.toSet());
    }

    private void removeTableSchema(KeyspaceTable kst) {
        this.tableToKVSchemaMap.remove(kst);
        LOGGER.info("Removed the schema for {}.{} from table schema cache.", (Object)kst.keyspace, (Object)kst.table);
    }

    private void addOrUpdateTableSchema(KeyspaceTable kst, KeyValueSchema kvs) {
        boolean isUpdate = this.tableToKVSchemaMap.containsKey(kst);
        this.tableToKVSchemaMap.put(kst, kvs);
        if (isUpdate) {
            LOGGER.info("Updated the schema for {}.{} in table schema cache.", (Object)kst.keyspace, (Object)kst.table);
        } else {
            LOGGER.info("Added the schema for {}.{} to table schema cache.", (Object)kst.keyspace, (Object)kst.table);
        }
    }

    class CassandraSchemaChangeListener
    implements SchemaChangeListener {
        CassandraSchemaChangeListener() {
        }

        public void onKeyspaceAdded(KeyspaceMetadata keyspaceMetadata) {
            try {
                Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create((String)keyspaceMetadata.getName(), (KeyspaceParams)KeyspaceParams.create((boolean)keyspaceMetadata.isDurableWrites(), (Map)keyspaceMetadata.getReplication())));
                Keyspace.openWithoutSSTables((String)keyspaceMetadata.getName());
                LOGGER.info("Added keyspace [{}] to schema instance.", (Object)keyspaceMetadata.asCQLQuery());
            }
            catch (Exception e) {
                LOGGER.warn("Error happened while adding the keyspace {} to schema instance.", (Object)keyspaceMetadata.getName(), (Object)e);
            }
        }

        public void onKeyspaceChanged(KeyspaceMetadata current, KeyspaceMetadata previous) {
            try {
                Schema.instance.updateKeyspace(current.getName(), KeyspaceParams.create((boolean)current.isDurableWrites(), (Map)current.getReplication()));
                LOGGER.info("Updated keyspace [{}] in schema instance.", (Object)current.asCQLQuery());
            }
            catch (Exception e) {
                LOGGER.warn("Error happened while updating the keyspace {} in schema instance.", (Object)current.getName(), (Object)e);
            }
        }

        public void onKeyspaceRemoved(KeyspaceMetadata keyspaceMetadata) {
            try {
                Schema.instance.clearKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create((String)keyspaceMetadata.getName(), (KeyspaceParams)KeyspaceParams.create((boolean)keyspaceMetadata.isDurableWrites(), (Map)keyspaceMetadata.getReplication())));
                LOGGER.info("Removed keyspace [{}] from schema instance.", (Object)keyspaceMetadata.asCQLQuery());
            }
            catch (Exception e) {
                LOGGER.warn("Error happened while removing the keyspace {} from schema instance.", (Object)keyspaceMetadata.getName(), (Object)e);
            }
        }

        public void onTableAdded(TableMetadata tableMetadata) {
            if (tableMetadata.getOptions().isCDC()) {
                SchemaHolder.this.addOrUpdateTableSchema(new KeyspaceTable(tableMetadata), new KeyValueSchema(SchemaHolder.this.kafkaTopicPrefix, tableMetadata, SchemaHolder.this.sourceInfoStructMaker));
            }
            try {
                LOGGER.debug("Table {}.{} detected to be added!", (Object)tableMetadata.getKeyspace().getName(), (Object)tableMetadata.getName());
                CFMetaData rawCFMetaData = CFMetaData.compile((String)tableMetadata.asCQLQuery(), (String)tableMetadata.getKeyspace().getName());
                CFMetaData newCFMetaData = rawCFMetaData.copy(tableMetadata.getId());
                Keyspace.open((String)newCFMetaData.ksName).initCf(newCFMetaData, false);
                org.apache.cassandra.schema.KeyspaceMetadata current = Schema.instance.getKSMetaData(newCFMetaData.ksName);
                if (current == null) {
                    LOGGER.warn("Keyspace {} doesn't exist", (Object)newCFMetaData.ksName);
                    return;
                }
                if (current.tables.get(tableMetadata.getName()).isPresent()) {
                    LOGGER.debug("Table {}.{} is already added!", (Object)tableMetadata.getKeyspace(), (Object)tableMetadata.getName());
                    return;
                }
                org.apache.cassandra.schema.KeyspaceMetadata transformed = current.withSwapped(current.tables.with(newCFMetaData));
                Schema.instance.setKeyspaceMetadata(transformed);
                Schema.instance.load(newCFMetaData);
                LOGGER.info("Added table [{}] to schema instance.", (Object)tableMetadata.asCQLQuery());
            }
            catch (Exception e) {
                LOGGER.warn("Error happened while adding table {}.{} to schema instance.", new Object[]{tableMetadata.getKeyspace(), tableMetadata.getName(), e});
            }
        }

        public void onTableRemoved(TableMetadata table) {
            if (table.getOptions().isCDC()) {
                SchemaHolder.this.removeTableSchema(new KeyspaceTable(table));
            }
            try {
                String ksName = table.getKeyspace().getName();
                String tableName = table.getName();
                LOGGER.debug("Table {}.{} detected to be removed!", (Object)ksName, (Object)tableName);
                org.apache.cassandra.schema.KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(ksName);
                if (oldKsm == null) {
                    LOGGER.warn("KeyspaceMetadata for keyspace {} is not found!", (Object)ksName);
                    return;
                }
                ColumnFamilyStore cfs = Keyspace.openWithoutSSTables((String)ksName).getColumnFamilyStore(tableName);
                if (cfs == null) {
                    LOGGER.warn("ColumnFamilyStore for {}.{} is not found!", (Object)ksName, (Object)tableName);
                    return;
                }
                cfs.indexManager.markAllIndexesRemoved();
                Optional cfm = oldKsm.tables.get(tableName);
                Method unregisterMBeanMethod = ColumnFamilyStore.class.getDeclaredMethod("unregisterMBean", new Class[0]);
                unregisterMBeanMethod.setAccessible(true);
                unregisterMBeanMethod.invoke((Object)cfs, new Object[0]);
                if (cfm.isPresent()) {
                    org.apache.cassandra.schema.KeyspaceMetadata newKsm = oldKsm.withSwapped(oldKsm.tables.without(tableName));
                    Schema.instance.unload((CFMetaData)cfm.get());
                    Schema.instance.setKeyspaceMetadata(newKsm);
                    LOGGER.info("Removed table [{}] from schema instance.", (Object)table.asCQLQuery());
                } else {
                    LOGGER.warn("Table {}.{} is not present in old keyspace meta data!", (Object)ksName, (Object)tableName);
                }
            }
            catch (Exception e) {
                LOGGER.warn("Error happened while removing table {}.{} from schema instance.", new Object[]{table.getKeyspace().getName(), table.getName(), e});
            }
        }

        public void onTableChanged(TableMetadata newTableMetadata, TableMetadata oldTableMetaData) {
            if (newTableMetadata.getOptions().isCDC()) {
                SchemaHolder.this.addOrUpdateTableSchema(new KeyspaceTable(newTableMetadata), new KeyValueSchema(SchemaHolder.this.kafkaTopicPrefix, newTableMetadata, SchemaHolder.this.sourceInfoStructMaker));
            } else if (oldTableMetaData.getOptions().isCDC()) {
                SchemaHolder.this.removeTableSchema(new KeyspaceTable(newTableMetadata));
            }
            try {
                LOGGER.debug("Detected alternation in schema of {}.{} (previous cdc = {}, current cdc = {})", new Object[]{newTableMetadata.getKeyspace().getName(), newTableMetadata.getName(), oldTableMetaData.getOptions().isCDC(), newTableMetadata.getOptions().isCDC()});
                CFMetaData rawNewMetadata = CFMetaData.compile((String)newTableMetadata.asCQLQuery(), (String)newTableMetadata.getKeyspace().getName());
                CFMetaData oldMetadata = Schema.instance.getCFMetaData(oldTableMetaData.getKeyspace().getName(), oldTableMetaData.getName());
                CFMetaData newMetadata = rawNewMetadata.copy(oldMetadata.cfId);
                oldMetadata.apply(newMetadata);
                LOGGER.info("Updated table [{}] in schema instance.", (Object)newTableMetadata.asCQLQuery());
            }
            catch (Exception e) {
                LOGGER.warn("Error happened while reacting on changed table {}.{} in schema instance.", new Object[]{newTableMetadata.getKeyspace(), newTableMetadata.getName(), e});
            }
        }

        public void onUserTypeAdded(UserType type) {
        }

        public void onUserTypeRemoved(UserType type) {
        }

        public void onUserTypeChanged(UserType current, UserType previous) {
        }

        public void onFunctionAdded(FunctionMetadata function) {
        }

        public void onFunctionRemoved(FunctionMetadata function) {
        }

        public void onFunctionChanged(FunctionMetadata current, FunctionMetadata previous) {
        }

        public void onAggregateAdded(AggregateMetadata aggregate) {
        }

        public void onAggregateRemoved(AggregateMetadata aggregate) {
        }

        public void onAggregateChanged(AggregateMetadata current, AggregateMetadata previous) {
        }

        public void onMaterializedViewAdded(MaterializedViewMetadata view) {
        }

        public void onMaterializedViewRemoved(MaterializedViewMetadata view) {
        }

        public void onMaterializedViewChanged(MaterializedViewMetadata current, MaterializedViewMetadata previous) {
        }

        public void onRegister(Cluster cluster) {
        }

        public void onUnregister(Cluster cluster) {
        }
    }
}

