/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.AbstractProcessor;
import io.debezium.connector.cassandra.CassandraClient;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CellData;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Filters;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.OffsetWriter;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.RecordMaker;
import io.debezium.connector.cassandra.RowData;
import io.debezium.connector.cassandra.SchemaHolder;
import io.debezium.connector.cassandra.SnapshotProcessorMetrics;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.function.BlockingConsumer;
import io.debezium.time.Conversions;
import io.debezium.util.Collect;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotProcessor
extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotProcessor.class);
    private static final String NAME = "Snapshot Processor";
    private static final String CASSANDRA_NOW_UNIXTIMESTAMP = "UNIXTIMESTAMPOF(NOW())";
    private static final String EXECUTION_TIME_ALIAS = "execution_time";
    private static final Set<DataType.Name> collectionTypes = Collect.unmodifiableSet((Object[])new DataType.Name[]{DataType.Name.LIST, DataType.Name.SET, DataType.Name.MAP});
    private final CassandraClient cassandraClient;
    private final ChangeEventQueue<Event> queue;
    private final OffsetWriter offsetWriter;
    private final SchemaHolder schemaHolder;
    private final RecordMaker recordMaker;
    private final CassandraConnectorConfig.SnapshotMode snapshotMode;
    private final ConsistencyLevel consistencyLevel;
    private final Set<String> startedTableNames = new HashSet<String>();
    private final SnapshotProcessorMetrics metrics = new SnapshotProcessorMetrics();
    private boolean initial = true;

    public SnapshotProcessor(CassandraConnectorContext context) {
        super(NAME, context.getCassandraConnectorConfig().snapshotPollIntervalMs().toMillis());
        this.cassandraClient = context.getCassandraClient();
        this.queue = context.getQueue();
        this.offsetWriter = context.getOffsetWriter();
        this.schemaHolder = context.getSchemaHolder();
        this.recordMaker = new RecordMaker(context.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(context.getCassandraConnectorConfig().fieldBlacklist()), context.getCassandraConnectorConfig());
        this.snapshotMode = context.getCassandraConnectorConfig().snapshotMode();
        this.consistencyLevel = context.getCassandraConnectorConfig().snapshotConsistencyLevel();
    }

    @Override
    public void initialize() {
        this.metrics.registerMetrics();
    }

    @Override
    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    @Override
    public void process() {
        if (this.snapshotMode == CassandraConnectorConfig.SnapshotMode.ALWAYS) {
            this.snapshot();
        } else if (this.snapshotMode == CassandraConnectorConfig.SnapshotMode.INITIAL && this.initial) {
            this.snapshot();
            this.initial = false;
        } else {
            LOGGER.debug("Skipping snapshot [mode: {}]", (Object)this.snapshotMode);
        }
    }

    synchronized void snapshot() {
        try {
            Set<TableMetadata> tables = this.getTablesToSnapshot();
            if (!tables.isEmpty()) {
                String[] tableArr = (String[])tables.stream().map(SnapshotProcessor::tableName).toArray(String[]::new);
                LOGGER.info("Found {} tables to snapshot: {}", (Object)tables.size(), (Object)tableArr);
                long startTime = System.currentTimeMillis();
                this.metrics.setTableCount(tables.size());
                this.metrics.startSnapshot();
                for (TableMetadata table : tables) {
                    if (!this.isRunning()) continue;
                    String tableName = SnapshotProcessor.tableName(table);
                    LOGGER.info("Snapshotting table {}", (Object)tableName);
                    this.startedTableNames.add(tableName);
                    this.takeTableSnapshot(table);
                    this.metrics.completeTable();
                }
                this.metrics.stopSnapshot();
                long endTime = System.currentTimeMillis();
                long durationInSeconds = Duration.ofMillis(endTime - startTime).getSeconds();
                LOGGER.info("Snapshot completely queued in {} seconds for tables: {}", (Object)durationInSeconds, (Object)tableArr);
            } else {
                LOGGER.info("No tables to snapshot");
            }
        }
        catch (IOException e) {
            throw new CassandraConnectorTaskException(e);
        }
    }

    private Set<TableMetadata> getTablesToSnapshot() {
        return this.schemaHolder.getCdcEnabledTableMetadataSet().stream().filter(tm -> !this.offsetWriter.isOffsetProcessed(SnapshotProcessor.tableName(tm), OffsetPosition.defaultOffsetPosition().serialize(), true)).filter(tm -> !this.startedTableNames.contains(SnapshotProcessor.tableName(tm))).collect(Collectors.toSet());
    }

    private void takeTableSnapshot(TableMetadata tableMetadata) throws IOException {
        BuiltStatement statement = SnapshotProcessor.generateSnapshotStatement(tableMetadata);
        statement.setConsistencyLevel(this.consistencyLevel);
        LOGGER.info("Executing snapshot query '{}' with consistency level {}", (Object)statement.getQueryString(), (Object)statement.getConsistencyLevel());
        ResultSet resultSet = this.cassandraClient.execute((Statement)statement);
        this.processResultSet(tableMetadata, resultSet);
        LOGGER.debug("The snapshot of table '{}' has been taken", (Object)SnapshotProcessor.tableName(tableMetadata));
    }

    private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetadata) {
        List allCols = tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
        Set primaryCols = tableMetadata.getPrimaryKey().stream().map(ColumnMetadata::getName).collect(Collectors.toSet());
        List collectionCols = tableMetadata.getColumns().stream().filter(cm -> collectionTypes.contains(cm.getType().getName())).map(ColumnMetadata::getName).collect(Collectors.toList());
        Select.Selection selection = QueryBuilder.select().raw(CASSANDRA_NOW_UNIXTIMESTAMP).as(EXECUTION_TIME_ALIAS);
        for (String col : allCols) {
            selection.column(SnapshotProcessor.withQuotes(col));
            if (primaryCols.contains(col) || collectionCols.contains(col)) continue;
            selection.ttl(SnapshotProcessor.withQuotes(col)).as(SnapshotProcessor.ttlAlias(col));
        }
        return selection.from(tableMetadata.getKeyspace().getName(), tableMetadata.getName());
    }

    private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException {
        String tableName = SnapshotProcessor.tableName(tableMetadata);
        KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata);
        SchemaHolder.KeyValueSchema keyValueSchema = this.schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
        Schema keySchema = keyValueSchema.keySchema();
        Schema valueSchema = keyValueSchema.valueSchema();
        Set<String> partitionKeyNames = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toSet());
        Set<String> clusteringKeyNames = tableMetadata.getClusteringColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toSet());
        Iterator rowIter = resultSet.iterator();
        long rowNum = 0L;
        if (!rowIter.hasNext()) {
            this.offsetWriter.markOffset(tableName, OffsetPosition.defaultOffsetPosition().serialize(), true);
            this.offsetWriter.flush();
        }
        while (rowIter.hasNext()) {
            if (this.isRunning()) {
                Row row = (Row)rowIter.next();
                Object executionTime = SnapshotProcessor.readExecutionTime(row);
                RowData after = SnapshotProcessor.extractRowData(row, tableMetadata.getColumns(), partitionKeyNames, clusteringKeyNames, executionTime);
                boolean markOffset = !rowIter.hasNext();
                this.recordMaker.insert(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(), keyspaceTable, true, Conversions.toInstantFromMicros((long)TimeUnit.MICROSECONDS.convert((Long)executionTime, TimeUnit.MILLISECONDS)), after, keySchema, valueSchema, markOffset, (BlockingConsumer<Record>)((BlockingConsumer)arg_0 -> this.queue.enqueue(arg_0)));
                if (++rowNum % 10000L != 0L) continue;
                LOGGER.info("Queued {} snapshot records from table {}", (Object)rowNum, (Object)tableName);
                this.metrics.setRowsScanned(tableName, rowNum);
                continue;
            }
            LOGGER.warn("Terminated snapshot processing while table {} is in progress", (Object)tableName);
            this.metrics.setRowsScanned(tableName, rowNum);
            return;
        }
        this.metrics.setRowsScanned(tableName, rowNum);
    }

    private static RowData extractRowData(Row row, List<ColumnMetadata> columns, Set<String> partitionKeyNames, Set<String> clusteringKeyNames, Object executionTime) {
        RowData rowData = new RowData();
        for (ColumnMetadata columnMetadata : columns) {
            Object ttl;
            String name = columnMetadata.getName();
            Object value = SnapshotProcessor.readCol(row, name, columnMetadata);
            Long deletionTs = null;
            CellData.ColumnType type = SnapshotProcessor.getType(name, partitionKeyNames, clusteringKeyNames);
            if (type == CellData.ColumnType.REGULAR && value != null && !collectionTypes.contains(columnMetadata.getType().getName()) && (ttl = SnapshotProcessor.readColTtl(row, name)) != null && executionTime != null) {
                deletionTs = SnapshotProcessor.calculateDeletionTs(executionTime, ttl);
            }
            CellData cellData = new CellData(name, value, deletionTs, type);
            rowData.addCell(cellData);
        }
        return rowData;
    }

    private static CellData.ColumnType getType(String name, Set<String> partitionKeyNames, Set<String> clusteringKeyNames) {
        if (partitionKeyNames.contains(name)) {
            return CellData.ColumnType.PARTITION;
        }
        if (clusteringKeyNames.contains(name)) {
            return CellData.ColumnType.CLUSTERING;
        }
        return CellData.ColumnType.REGULAR;
    }

    private static Object readExecutionTime(Row row) {
        return CassandraTypeDeserializer.deserialize(DataType.bigint(), row.getBytesUnsafe(EXECUTION_TIME_ALIAS));
    }

    private static Object readCol(Row row, String col, ColumnMetadata cm) {
        return CassandraTypeDeserializer.deserialize(cm.getType(), row.getBytesUnsafe(col));
    }

    private static Object readColTtl(Row row, String col) {
        return CassandraTypeDeserializer.deserialize(DataType.cint(), row.getBytesUnsafe(SnapshotProcessor.ttlAlias(col)));
    }

    private static long calculateDeletionTs(Object executionTime, Object ttl) {
        return TimeUnit.MICROSECONDS.convert((Long)executionTime, TimeUnit.MILLISECONDS) + TimeUnit.MICROSECONDS.convert(((Integer)ttl).intValue(), TimeUnit.SECONDS);
    }

    private static String ttlAlias(String colName) {
        return colName + "_ttl";
    }

    private static String withQuotes(String s) {
        return "\"" + s + "\"";
    }

    private static String tableName(TableMetadata tm) {
        return tm.getKeyspace().getName() + "." + tm.getName();
    }
}

