/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mariadb;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.mariadb.MariaDbConnectorConfig;
import io.debezium.connector.mariadb.MariaDbDatabaseSchema;
import io.debezium.connector.mariadb.MariaDbOffsetContext;
import io.debezium.connector.mariadb.MariaDbPartition;
import io.debezium.connector.mariadb.MariaDbReadOnlyIncrementalSnapshotChangeEventSource;
import io.debezium.connector.mariadb.MariaDbSnapshotChangeEventSource;
import io.debezium.connector.mariadb.MariaDbStreamingChangeEventSource;
import io.debezium.connector.mariadb.MariaDbTaskContext;
import io.debezium.connector.mariadb.metrics.MariaDbSnapshotChangeEventSourceMetrics;
import io.debezium.connector.mariadb.metrics.MariaDbStreamingChangeEventSourceMetrics;
import io.debezium.function.BlockingConsumer;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.connect.source.SourceRecord;

public class MariaDbChangeEventSourceFactory
implements ChangeEventSourceFactory<MariaDbPartition, MariaDbOffsetContext> {
    private final MariaDbConnectorConfig configuration;
    private final MainConnectionProvidingConnectionFactory<BinlogConnectorConnection> connectionFactory;
    private final ErrorHandler errorHandler;
    private final EventDispatcher<MariaDbPartition, TableId> dispatcher;
    private final Clock clock;
    private final MariaDbTaskContext taskContext;
    private final MariaDbStreamingChangeEventSourceMetrics streamingMetrics;
    private final MariaDbDatabaseSchema schema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final SnapshotterService snapshotterService;

    public MariaDbChangeEventSourceFactory(MariaDbConnectorConfig configuration, MainConnectionProvidingConnectionFactory<BinlogConnectorConnection> connectionFactory, ErrorHandler errorHandler, EventDispatcher<MariaDbPartition, TableId> dispatcher, Clock clock, MariaDbDatabaseSchema schema, MariaDbTaskContext taskContext, MariaDbStreamingChangeEventSourceMetrics streamingMetrics, ChangeEventQueue<DataChangeEvent> queue, SnapshotterService snapshotterService) {
        this.configuration = configuration;
        this.connectionFactory = connectionFactory;
        this.errorHandler = errorHandler;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.taskContext = taskContext;
        this.streamingMetrics = streamingMetrics;
        this.queue = queue;
        this.schema = schema;
        this.snapshotterService = snapshotterService;
    }

    public SnapshotChangeEventSource<MariaDbPartition, MariaDbOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MariaDbPartition> snapshotProgressListener, NotificationService<MariaDbPartition, MariaDbOffsetContext> notificationService) {
        return new MariaDbSnapshotChangeEventSource(this.configuration, this.connectionFactory, (MariaDbDatabaseSchema)this.taskContext.getSchema(), this.dispatcher, this.clock, (MariaDbSnapshotChangeEventSourceMetrics)snapshotProgressListener, (BlockingConsumer<Function<SourceRecord, SourceRecord>>)((BlockingConsumer)this::modifyAndFlushLastRecord), this::preSnapshot, notificationService, this.snapshotterService);
    }

    public StreamingChangeEventSource<MariaDbPartition, MariaDbOffsetContext> getStreamingChangeEventSource() {
        this.queue.disableBuffering();
        return new MariaDbStreamingChangeEventSource(this.configuration, (BinlogConnectorConnection)this.connectionFactory.mainConnection(), this.dispatcher, this.errorHandler, this.clock, this.taskContext, this.streamingMetrics, this.snapshotterService);
    }

    public Optional<IncrementalSnapshotChangeEventSource<MariaDbPartition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(MariaDbOffsetContext offsetContext, SnapshotProgressListener<MariaDbPartition> snapshotProgressListener, DataChangeEventListener<MariaDbPartition> dataChangeEventListener, NotificationService<MariaDbPartition, MariaDbOffsetContext> notificationService) {
        if (this.configuration.isReadOnlyConnection()) {
            if (((BinlogConnectorConnection)this.connectionFactory.mainConnection()).isGtidModeEnabled()) {
                return Optional.of(new MariaDbReadOnlyIncrementalSnapshotChangeEventSource(this.configuration, this.connectionFactory.mainConnection(), this.dispatcher, (DatabaseSchema<?>)this.schema, this.clock, snapshotProgressListener, dataChangeEventListener, notificationService));
            }
            throw new UnsupportedOperationException("Read only connection requires GTID_MODE to be ON");
        }
        if (Strings.isNullOrBlank((String)this.configuration.getSignalingDataCollectionId())) {
            return Optional.empty();
        }
        return Optional.of(new SignalBasedIncrementalSnapshotChangeEventSource((RelationalDatabaseConnectorConfig)this.configuration, this.connectionFactory.mainConnection(), this.dispatcher, (DatabaseSchema)this.schema, this.clock, snapshotProgressListener, dataChangeEventListener, notificationService));
    }

    private void preSnapshot() {
        this.queue.enableBuffering();
    }

    private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> modify) throws InterruptedException {
        this.queue.flushBuffer(dataChange -> new DataChangeEvent((SourceRecord)modify.apply(dataChange.getRecord())));
        this.queue.disableBuffering();
    }
}

