/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlBinaryProtocolFieldReader;
import io.debezium.connector.mysql.MySqlChangeEventSourceFactory;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlErrorHandler;
import io.debezium.connector.mysql.MySqlEventMetadataProvider;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.MySqlTextProtocolFieldReader;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlConnectorTask
extends BaseSourceTask<MySqlPartition, MySqlOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnectorTask.class);
    private static final String CONTEXT_NAME = "mysql-connector-task";
    private volatile MySqlTaskContext taskContext;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile MySqlConnection connection;
    private volatile ErrorHandler errorHandler;
    private volatile MySqlDatabaseSchema schema;

    public String version() {
        return Module.version();
    }

    public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Configuration config) {
        Clock clock = Clock.system();
        MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(config);
        TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(MySqlConnectorConfig.TOPIC_NAMING_STRATEGY);
        SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();
        MySqlValueConverters valueConverters = this.getValueConverters(connectorConfig);
        config = ((Configuration.Builder)((Configuration.Builder)config.edit().withDefault("database.responseBuffering", "adaptive").withDefault("database.fetchSize", 10000)).withDefault("database.useCursorFetch", connectorConfig.useCursorFetch())).build();
        this.connection = new MySqlConnection(new MySqlConnection.MySqlConnectionConfiguration(config), connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig));
        this.validateBinlogConfiguration(connectorConfig);
        Offsets previousOffsets = this.getPreviousOffsets(new MySqlPartition.Provider(connectorConfig, config), new MySqlOffsetContext.Loader(connectorConfig));
        boolean tableIdCaseInsensitive = this.connection.isTableIdCaseSensitive();
        this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, (TopicNamingStrategy<TableId>)topicNamingStrategy, schemaNameAdjuster, tableIdCaseInsensitive);
        LOGGER.info("Closing connection before starting schema recovery");
        try {
            this.connection.close();
        }
        catch (SQLException e) {
            throw new DebeziumException((Throwable)e);
        }
        MySqlPartition partition = (MySqlPartition)previousOffsets.getTheOnlyPartition();
        MySqlOffsetContext previousOffset = (MySqlOffsetContext)previousOffsets.getTheOnlyOffset();
        this.validateAndLoadSchemaHistory(connectorConfig, partition, previousOffset, this.schema);
        LOGGER.info("Reconnecting after finishing schema recovery");
        try {
            this.connection.setAutoCommit(false);
        }
        catch (SQLException e) {
            throw new DebeziumException((Throwable)e);
        }
        if (this.validateSnapshotFeasibility(connectorConfig, previousOffset)) {
            previousOffsets.resetOffset((Partition)partition);
        }
        this.taskContext = new MySqlTaskContext(connectorConfig, this.schema);
        this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(connectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext(CONTEXT_NAME)).buffering().build();
        this.errorHandler = new MySqlErrorHandler(connectorConfig, this.queue);
        MySqlEventMetadataProvider metadataProvider = new MySqlEventMetadataProvider();
        Configuration heartbeatConfig = config;
        EventDispatcher dispatcher = new EventDispatcher((CommonConnectorConfig)connectorConfig, topicNamingStrategy, (DatabaseSchema)this.schema, this.queue, (DataCollectionFilters.DataCollectionFilter)connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, null, (EventMetadataProvider)metadataProvider, connectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, () -> new MySqlConnection(new MySqlConnection.MySqlConnectionConfiguration(heartbeatConfig), connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig)), exception -> {
            String sqlErrorId;
            switch (sqlErrorId = exception.getSQLState()) {
                case "42000": {
                    throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", (Throwable)exception);
                }
                case "3D000": {
                    throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", (Throwable)exception);
                }
            }
        }), schemaNameAdjuster);
        MySqlStreamingChangeEventSourceMetrics streamingMetrics = new MySqlStreamingChangeEventSourceMetrics(this.taskContext, (ChangeEventQueueMetrics)this.queue, metadataProvider);
        ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(previousOffsets, this.errorHandler, MySqlConnector.class, (CommonConnectorConfig)connectorConfig, (ChangeEventSourceFactory)new MySqlChangeEventSourceFactory(connectorConfig, this.connection, this.errorHandler, (EventDispatcher<MySqlPartition, TableId>)dispatcher, clock, this.schema, this.taskContext, streamingMetrics, this.queue), (ChangeEventSourceMetricsFactory)new MySqlChangeEventSourceMetricsFactory(streamingMetrics), dispatcher, (DatabaseSchema)this.schema);
        coordinator.start((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)metadataProvider);
        return coordinator;
    }

    private MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {
        TemporalPrecisionMode timePrecisionMode = configuration.getTemporalPrecisionMode();
        JdbcValueConverters.DecimalMode decimalMode = configuration.getDecimalMode();
        String bigIntUnsignedHandlingModeStr = configuration.getConfig().getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
        MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode = MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedHandlingModeStr);
        JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode = bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
        boolean timeAdjusterEnabled = configuration.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
        return new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode, configuration.binaryHandlingMode(), timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x, MySqlValueConverters::defaultParsingErrorHandler);
    }

    public List<SourceRecord> doPoll() throws InterruptedException {
        List records = this.queue.poll();
        List<SourceRecord> sourceRecords = records.stream().map(DataChangeEvent::getRecord).collect(Collectors.toList());
        return sourceRecords;
    }

    protected void doStop() {
        try {
            if (this.connection != null) {
                this.connection.close();
            }
        }
        catch (SQLException e) {
            LOGGER.error("Exception while closing JDBC connection", (Throwable)e);
        }
        if (this.schema != null) {
            this.schema.close();
        }
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return MySqlConnectorConfig.ALL_FIELDS;
    }

    private void validateBinlogConfiguration(MySqlConnectorConfig config) {
        if (config.getSnapshotMode().shouldStream()) {
            if (!this.connection.isBinlogFormatRow()) {
                throw new DebeziumException("The MySQL server is not configured to use a ROW binlog_format, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.");
            }
            if (!this.connection.isBinlogRowImageFull()) {
                throw new DebeziumException("The MySQL server is not configured to use a FULL binlog_row_image, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_row_image=FULL and restart the connector.");
            }
        }
    }

    protected boolean isBinlogAvailable(MySqlConnectorConfig config, MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();
        if (gtidStr != null) {
            GtidSet availableGtidSet;
            if (gtidStr.trim().isEmpty()) {
                return true;
            }
            String availableGtidStr = this.connection.knownGtidSet();
            if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
                LOGGER.info("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
                return false;
            }
            GtidSet gtidSet = new GtidSet(gtidStr).retainAll(config.gtidSourceFilter());
            if (gtidSet.isContainedWithin(availableGtidSet = new GtidSet(availableGtidStr))) {
                LOGGER.info("MySQL current GTID set {} does contain the GTID set required by the connector {}", (Object)availableGtidSet, (Object)gtidSet);
                GtidSet knownServerSet = availableGtidSet.retainAll(config.gtidSourceFilter());
                GtidSet gtidSetToReplicate = this.connection.subtractGtidSet(knownServerSet, gtidSet);
                GtidSet purgedGtidSet = this.connection.purgedGtidSet();
                LOGGER.info("Server has already purged {} GTIDs", (Object)purgedGtidSet);
                GtidSet nonPurgedGtidSetToReplicate = this.connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
                LOGGER.info("GTIDs known by the server but not processed yet {}, for replication are available only {}", (Object)gtidSetToReplicate, (Object)nonPurgedGtidSetToReplicate);
                if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
                    LOGGER.info("Some of the GTIDs needed to replicate have been already purged");
                    return false;
                }
                return true;
            }
            LOGGER.info("Connector last known GTIDs are {}, but MySQL has {}", (Object)gtidSet, (Object)availableGtidSet);
            return false;
        }
        String binlogFilename = offset.getSource().binlogFilename();
        if (binlogFilename == null) {
            return true;
        }
        if (binlogFilename.equals("")) {
            return true;
        }
        List<String> logNames = this.connection.availableBinlogFiles();
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Connector requires binlog file '{}', but MySQL only has {}", (Object)binlogFilename, (Object)String.join((CharSequence)", ", logNames));
            }
        } else {
            LOGGER.info("MySQL has the binlog file '{}' required by the connector", (Object)binlogFilename);
        }
        return found;
    }

    private boolean validateAndLoadSchemaHistory(MySqlConnectorConfig config, MySqlPartition partition, MySqlOffsetContext offset, MySqlDatabaseSchema schema) {
        if (offset == null) {
            if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
                throw new DebeziumException("Could not find existing binlog information while attempting schema only recovery snapshot");
            }
            LOGGER.info("Connector started for the first time, database schema history recovery will not be executed");
            schema.initializeStorage();
            return false;
        }
        if (!schema.historyExists()) {
            LOGGER.warn("Database schema history was not found but was expected");
            if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
                if (!this.isBinlogAvailable(config, offset)) {
                    throw new DebeziumException("The connector is trying to read binlog starting at " + offset.getSource() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
                }
            } else {
                throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to " + MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
            }
            LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.", (Object)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
            schema.initializeStorage();
            return true;
        }
        schema.recover(partition, (OffsetContext)offset);
        return false;
    }

    private boolean validateSnapshotFeasibility(MySqlConnectorConfig config, MySqlOffsetContext offset) {
        if (offset != null) {
            if (offset.isSnapshotRunning()) {
                if (!config.getSnapshotMode().shouldSnapshot()) {
                    throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
                }
            } else if (!this.isBinlogAvailable(config, offset)) {
                if (!config.getSnapshotMode().shouldSnapshotOnDataError()) {
                    throw new DebeziumException("The connector is trying to read binlog starting at " + offset.getSource() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
                }
                LOGGER.warn("The connector is trying to read binlog starting at '{}', but this is no longer available on the server. Forcing the snapshot execution as it is allowed by the configuration.", (Object)offset.getSource());
                return true;
            }
        } else if (!config.getSnapshotMode().shouldSnapshot()) {
            String earliestBinlogFilename = this.connection.earliestBinlogFilename();
            if (earliestBinlogFilename == null) {
                LOGGER.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
            } else if (!earliestBinlogFilename.endsWith("00001")) {
                LOGGER.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
            }
        }
        return false;
    }
}

