package org.apache.flink.cdc.connectors.postgres.source.fetch;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.class */
public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class);
    private final StreamSplit split;
    private volatile boolean taskRunning = false;
    private volatile boolean stopped = false;
    private StreamSplitReadTask streamSplitReadTask;
    private Long lastCommitLsn;

    /* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask$StreamSplitReadTask.class */
    public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource {
        private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
        private final StreamSplit streamSplit;
        private final WatermarkDispatcher watermarkDispatcher;
        private final ErrorHandler errorHandler;
        public ChangeEventSource.ChangeEventSourceContext context;
        public PostgresOffsetContext offsetContext;

        public StreamSplitReadTask(PostgresConnectorConfig postgresConnectorConfig, Snapshotter snapshotter, PostgresConnection postgresConnection, PostgresEventDispatcher<TableId> postgresEventDispatcher, WatermarkDispatcher watermarkDispatcher, ErrorHandler errorHandler, Clock clock, PostgresSchema postgresSchema, PostgresTaskContext postgresTaskContext, ReplicationConnection replicationConnection, StreamSplit streamSplit) {
            super(postgresConnectorConfig, snapshotter, postgresConnection, postgresEventDispatcher, errorHandler, clock, postgresSchema, postgresTaskContext, replicationConnection);
            this.streamSplit = streamSplit;
            this.watermarkDispatcher = watermarkDispatcher;
            this.errorHandler = errorHandler;
        }

        @Override // io.debezium.connector.postgresql.PostgresStreamingChangeEventSource
        public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, PostgresPartition postgresPartition, PostgresOffsetContext postgresOffsetContext) throws InterruptedException {
            this.context = changeEventSourceContext;
            this.offsetContext = postgresOffsetContext;
            LOG.info("Execute StreamSplitReadTask for split: {}", this.streamSplit);
            postgresOffsetContext.setStreamingStoppingLsn(((PostgresOffset) this.streamSplit.getEndingOffset()).getLsn());
            super.execute(changeEventSourceContext, postgresPartition, postgresOffsetContext);
            if (isBoundedRead()) {
                LOG.debug("StreamSplit is bounded read: {}", this.streamSplit);
                PostgresOffset of = PostgresOffset.of((Map<String, ?>) postgresOffsetContext.getOffset());
                try {
                    this.watermarkDispatcher.dispatchWatermarkEvent(postgresPartition.getSourcePartition(), this.streamSplit, of, WatermarkKind.END);
                    LOG.info("StreamSplitReadTask finished for {} at {}", this.streamSplit, of);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    this.errorHandler.setProducerThrowable(new FlinkRuntimeException("Error processing WAL signal event", e));
                }
                ((StoppableChangeEventSourceContext) changeEventSourceContext).stopChangeEventSource();
            }
        }

        private boolean isBoundedRead() {
            return !PostgresOffset.NO_STOPPING_OFFSET.getLsn().equals(((PostgresOffset) this.streamSplit.getEndingOffset()).getLsn());
        }
    }

    public PostgresStreamFetchTask(StreamSplit streamSplit) {
        this.split = streamSplit;
    }

    public void execute(FetchTask.Context context) throws Exception {
        if (this.stopped) {
            LOG.debug("StreamFetchTask for split: {} is already stopped and can not be executed", this.split);
            return;
        }
        LOG.debug("execute StreamFetchTask for split: {}", this.split);
        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = (PostgresSourceFetchTaskContext) context;
        this.taskRunning = true;
        this.streamSplitReadTask = new StreamSplitReadTask(postgresSourceFetchTaskContext.m29getDbzConnectorConfig(), postgresSourceFetchTaskContext.getSnapShotter(), postgresSourceFetchTaskContext.getConnection(), postgresSourceFetchTaskContext.m27getEventDispatcher(), postgresSourceFetchTaskContext.getWaterMarkDispatcher(), postgresSourceFetchTaskContext.getErrorHandler(), postgresSourceFetchTaskContext.getTaskContext().getClock(), postgresSourceFetchTaskContext.m28getDatabaseSchema(), postgresSourceFetchTaskContext.getTaskContext(), postgresSourceFetchTaskContext.getReplicationConnection(), this.split);
        this.streamSplitReadTask.execute(new StoppableChangeEventSourceContext(), postgresSourceFetchTaskContext.m25getPartition(), postgresSourceFetchTaskContext.m26getOffsetContext());
    }

    public void close() {
        LOG.debug("stopping StreamFetchTask for split: {}", this.split);
        if (this.streamSplitReadTask != null) {
            ((StoppableChangeEventSourceContext) this.streamSplitReadTask.context).stopChangeEventSource();
        }
        this.stopped = true;
        this.taskRunning = false;
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    /* renamed from: getSplit, reason: merged with bridge method [inline-methods] */
    public SourceSplitBase m31getSplit() {
        return this.split;
    }

    public void commitCurrentOffset(@Nullable Offset offset) {
        if (this.streamSplitReadTask == null || this.streamSplitReadTask.offsetContext == null) {
            return;
        }
        Long l = (Long) this.streamSplitReadTask.offsetContext.getOffset().get("lsn_commit");
        if (offset != null) {
            l = Long.valueOf(((PostgresOffset) offset).getLsn().asLong());
        }
        if (l != null) {
            if (this.lastCommitLsn == null || Lsn.valueOf(l).compareTo(Lsn.valueOf(this.lastCommitLsn)) > 0) {
                this.lastCommitLsn = l;
                HashMap hashMap = new HashMap();
                hashMap.put("lsn_commit", this.lastCommitLsn);
                LOG.debug("Committing offset {} for {}", Lsn.valueOf(this.lastCommitLsn), this.streamSplitReadTask.streamSplit);
                this.streamSplitReadTask.commitOffset(hashMap);
            }
        }
    }
}
