/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.BinlogReader;
import io.debezium.connector.mysql.BlockingReader;
import io.debezium.connector.mysql.ChainedReader;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlJdbcContext;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.SnapshotReader;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.TimedBlockingReader;
import io.debezium.util.LoggingContext;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class MySqlConnectorTask
extends BaseSourceTask {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private volatile MySqlTaskContext taskContext;
    private volatile MySqlJdbcContext connectionContext;
    private volatile ChainedReader readers;

    public String version() {
        return Module.version();
    }

    public synchronized void start(Configuration config) {
        this.taskContext = new MySqlTaskContext(config);
        this.connectionContext = this.taskContext.getConnectionContext();
        LoggingContext.PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
        try {
            this.taskContext.start();
            boolean startWithSnapshot = false;
            boolean snapshotEventsAreInserts = true;
            SourceInfo source = this.taskContext.source();
            Map offsets = this.context.offsetStorageReader().offset(this.taskContext.source().partition());
            if (offsets != null) {
                source.setOffset(offsets);
                this.logger.info("Found existing offset: {}", (Object)offsets);
                if (!this.taskContext.historyExists()) {
                    if (this.taskContext.isSchemaOnlyRecoverySnapshot()) {
                        startWithSnapshot = true;
                        if (!this.isBinlogAvailable()) {
                            String msg = "The connector is trying to read binlog starting at " + (Object)((Object)source) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.";
                            throw new ConnectException(msg);
                        }
                    } else {
                        String msg = "The db history topic is missing. You may attempt to recover it by reconfiguring the connector to " + (Object)((Object)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
                        throw new ConnectException(msg);
                    }
                    this.logger.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.", (Object)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
                    this.taskContext.initializeHistoryStorage();
                } else {
                    this.taskContext.loadHistory(source);
                    if (source.isSnapshotInEffect()) {
                        if (this.taskContext.isSnapshotNeverAllowed()) {
                            String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
                            throw new ConnectException(msg);
                        }
                        startWithSnapshot = true;
                        this.logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
                    } else {
                        startWithSnapshot = false;
                        if (!this.isBinlogAvailable()) {
                            if (!this.taskContext.isSnapshotAllowedWhenNeeded()) {
                                String msg = "The connector is trying to read binlog starting at " + (Object)((Object)source) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.";
                                throw new ConnectException(msg);
                            }
                            startWithSnapshot = true;
                        }
                    }
                }
            } else {
                this.taskContext.initializeHistoryStorage();
                if (this.taskContext.isSnapshotNeverAllowed()) {
                    this.logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
                    source.setBinlogStartPoint("", 0L);
                    this.taskContext.initializeHistory();
                    String earliestBinlogFilename = this.earliestBinlogFilename();
                    if (earliestBinlogFilename == null) {
                        this.logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
                    } else if (!earliestBinlogFilename.endsWith("00001")) {
                        this.logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
                    }
                } else {
                    startWithSnapshot = true;
                    this.logger.info("Found no existing offset, so preparing to perform a snapshot");
                }
            }
            if (!startWithSnapshot && source.gtidSet() == null && this.isGtidModeEnabled()) {
                source.setCompletedGtidSet("");
            }
            boolean rowBinlogEnabled = this.isRowBinlogEnabled();
            ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder();
            BinlogReader binlogReader = new BinlogReader("binlog", this.taskContext);
            if (startWithSnapshot) {
                SnapshotReader snapshotReader = new SnapshotReader("snapshot", this.taskContext);
                if (snapshotEventsAreInserts) {
                    snapshotReader.generateInsertEvents();
                }
                if (!this.taskContext.snapshotDelay().isZero()) {
                    chainedReaderBuilder.addReader(new TimedBlockingReader("timed-blocker", this.taskContext.snapshotDelay()));
                }
                chainedReaderBuilder.addReader(snapshotReader);
                if (this.taskContext.isInitialSnapshotOnly()) {
                    this.logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
                    chainedReaderBuilder.addReader(new BlockingReader("blocker", "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
                    chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
                } else {
                    if (!rowBinlogEnabled) {
                        throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is required for this connector to work properly. Change the MySQL configuration to use a row-level binlog and restart the connector.");
                    }
                    chainedReaderBuilder.addReader(binlogReader);
                }
            } else {
                if (!rowBinlogEnabled) {
                    throw new ConnectException("The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
                }
                chainedReaderBuilder.addReader(binlogReader);
            }
            this.readers = chainedReaderBuilder.build();
            this.readers.uponCompletion(this::completeReaders);
            this.readers.initialize();
            this.readers.start();
        }
        catch (Throwable e) {
            try {
                this.stop();
            }
            catch (Throwable s) {
                this.logger.error("Failed to start the connector (see other exception), but got this error while cleaning up", s);
            }
            if (e instanceof InterruptedException) {
                Thread.interrupted();
                throw new ConnectException("Interrupted while starting the connector", e);
            }
            if (e instanceof ConnectException) {
                throw (ConnectException)e;
            }
            throw new ConnectException(e);
        }
        finally {
            prevLoggingContext.restore();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> poll() throws InterruptedException {
        ChainedReader currentReader = this.readers;
        if (currentReader == null) {
            return null;
        }
        LoggingContext.PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
        try {
            this.logger.trace("Polling for events");
            List<SourceRecord> list = currentReader.poll();
            return list;
        }
        finally {
            prevLoggingContext.restore();
        }
    }

    public synchronized void stop() {
        if (this.context != null) {
            LoggingContext.PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
            try {
                this.logger.info("Stopping MySQL connector task");
                if (this.readers != null) {
                    this.readers.stop();
                    this.readers.destroy();
                }
            }
            finally {
                prevLoggingContext.restore();
            }
        }
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return MySqlConnectorConfig.ALL_FIELDS;
    }

    protected void completeReaders() {
        LoggingContext.PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
        try {
            if (this.taskContext != null) {
                this.taskContext.shutdown();
            }
        }
        catch (Throwable e) {
            this.logger.error("Unexpected error shutting down the database history and/or closing JDBC connections", e);
        }
        finally {
            this.context = null;
            this.logger.info("Connector task finished all work and is now shutdown");
            prevLoggingContext.restore();
        }
    }

    protected boolean isBinlogAvailable() {
        String gtidStr = this.taskContext.source().gtidSet();
        if (gtidStr != null) {
            GtidSet availableGtidSet;
            if (gtidStr.trim().isEmpty()) {
                return true;
            }
            String availableGtidStr = this.connectionContext.knownGtidSet();
            if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
                this.logger.info("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
                return false;
            }
            GtidSet gtidSet = new GtidSet(gtidStr).retainAll(this.taskContext.gtidSourceFilter());
            if (gtidSet.isContainedWithin(availableGtidSet = new GtidSet(availableGtidStr))) {
                this.logger.info("MySQL current GTID set {} does contain the GTID set required by the connector {}", (Object)availableGtidSet, (Object)gtidSet);
                return true;
            }
            this.logger.info("Connector last known GTIDs are {}, but MySQL has {}", (Object)gtidSet, (Object)availableGtidSet);
            return false;
        }
        String binlogFilename = this.taskContext.source().binlogFilename();
        if (binlogFilename == null) {
            return true;
        }
        if (binlogFilename.equals("")) {
            return true;
        }
        ArrayList logNames = new ArrayList();
        try {
            this.logger.info("Step 0: Get all known binlogs from MySQL");
            this.connectionContext.jdbc().query("SHOW BINARY LOGS", rs -> {
                while (rs.next()) {
                    logNames.add(rs.getString(1));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", (Throwable)e);
        }
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            this.logger.info("Connector requires binlog file '{}', but MySQL only has {}", (Object)binlogFilename, (Object)String.join((CharSequence)", ", logNames));
        }
        this.logger.info("MySQL has the binlog file '{}' required by the connector", (Object)binlogFilename);
        return found;
    }

    protected String earliestBinlogFilename() {
        ArrayList logNames = new ArrayList();
        try {
            this.logger.info("Checking all known binlogs from MySQL");
            this.connectionContext.jdbc().query("SHOW BINARY LOGS", rs -> {
                while (rs.next()) {
                    logNames.add(rs.getString(1));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", (Throwable)e);
        }
        if (logNames.isEmpty()) {
            return null;
        }
        return (String)logNames.get(0);
    }

    protected boolean isGtidModeEnabled() {
        AtomicReference<String> mode = new AtomicReference<String>("off");
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", rs -> {
                if (rs.next()) {
                    mode.set(rs.getString(1));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", (Throwable)e);
        }
        return !"OFF".equalsIgnoreCase(mode.get());
    }

    protected boolean isRowBinlogEnabled() {
        AtomicReference<String> mode = new AtomicReference<String>("");
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", rs -> {
                if (rs.next()) {
                    mode.set(rs.getString(2));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG mode: ", (Throwable)e);
        }
        this.logger.debug("binlog_format={}", (Object)mode.get());
        return "ROW".equalsIgnoreCase(mode.get());
    }
}

