/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner;

import com.google.cloud.Timestamp;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.connector.spanner.CommittingRecordsStreamingChangeEventSource;
import io.debezium.connector.spanner.FinishPartitionStrategy;
import io.debezium.connector.spanner.FinishPartitionWatchDog;
import io.debezium.connector.spanner.FinishingPartitionManager;
import io.debezium.connector.spanner.PartitionManager;
import io.debezium.connector.spanner.SpannerPartition;
import io.debezium.connector.spanner.StreamEventQueue;
import io.debezium.connector.spanner.StuckPartitionStrategy;
import io.debezium.connector.spanner.context.offset.SpannerOffsetContext;
import io.debezium.connector.spanner.context.offset.SpannerOffsetContextFactory;
import io.debezium.connector.spanner.db.metadata.SchemaRegistry;
import io.debezium.connector.spanner.db.metadata.TableId;
import io.debezium.connector.spanner.db.model.Mod;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.model.event.ChildPartitionsEvent;
import io.debezium.connector.spanner.db.model.event.DataChangeEvent;
import io.debezium.connector.spanner.db.model.event.FinishPartitionEvent;
import io.debezium.connector.spanner.db.model.event.HeartbeatEvent;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.db.stream.PartitionEventListener;
import io.debezium.connector.spanner.exception.FinishingPartitionTimeout;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.ChildPartitionsMetricEvent;
import io.debezium.connector.spanner.processor.SourceRecordUtils;
import io.debezium.connector.spanner.processor.SpannerChangeRecordEmitter;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.util.Clock;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerStreamingChangeEventSource
implements CommittingRecordsStreamingChangeEventSource<SpannerPartition, SpannerOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerStreamingChangeEventSource.class);
    private static final StuckPartitionStrategy STUCK_PARTITION_STRATEGY = StuckPartitionStrategy.ESCALATE;
    private static final Duration FINISHING_PARTITION_TIMEOUT = Duration.ofSeconds(60L);
    private final FinishPartitionStrategy finishPartitionStrategy;
    private final ErrorHandler errorHandler;
    private final StreamEventQueue eventQueue;
    private final MetricsEventPublisher metricsEventPublisher;
    private final ChangeStream stream;
    private final PartitionManager partitionManager;
    private final SchemaRegistry schemaRegistry;
    private final SpannerEventDispatcher spannerEventDispatcher;
    private final FinishingPartitionManager finishingPartitionManager;
    private final FinishPartitionWatchDog finishPartitionWatchDog;
    private final SpannerOffsetContextFactory offsetContextFactory;
    private volatile Thread thread;

    public SpannerStreamingChangeEventSource(ErrorHandler errorHandler, ChangeStream stream, StreamEventQueue eventQueue, MetricsEventPublisher metricsEventPublisher, PartitionManager partitionManager, SchemaRegistry schemaRegistry, SpannerEventDispatcher spannerEventDispatcher, boolean finishingAfterCommit, SpannerOffsetContextFactory offsetContextFactory) {
        this.offsetContextFactory = offsetContextFactory;
        this.errorHandler = errorHandler;
        this.eventQueue = eventQueue;
        this.metricsEventPublisher = metricsEventPublisher;
        this.stream = stream;
        this.partitionManager = partitionManager;
        this.schemaRegistry = schemaRegistry;
        this.spannerEventDispatcher = spannerEventDispatcher;
        this.finishingPartitionManager = new FinishingPartitionManager(partitionManager::updateToFinished);
        this.finishPartitionWatchDog = new FinishPartitionWatchDog(this.finishingPartitionManager, FINISHING_PARTITION_TIMEOUT, tokens -> this.processFailure(new FinishingPartitionTimeout((List<String>)tokens)));
        this.finishPartitionStrategy = finishingAfterCommit ? FinishPartitionStrategy.AFTER_COMMIT : FinishPartitionStrategy.AFTER_STREAMING_FINISH;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(ChangeEventSource.ChangeEventSourceContext context, SpannerPartition partition, SpannerOffsetContext offsetContext) throws InterruptedException {
        LOGGER.info("Starting streaming...");
        try {
            this.startProcessing(context);
            this.stream.run(() -> ((ChangeEventSource.ChangeEventSourceContext)context).isRunning(), this.eventQueue::put, new PartitionEventListener(){

                @Override
                public void onRun(Partition partition) {
                    SpannerStreamingChangeEventSource.this.finishingPartitionManager.registerPartition(partition.getToken());
                    SpannerStreamingChangeEventSource.this.partitionManager.updateToRunning(partition.getToken());
                }

                @Override
                public void onFinish(Partition partition) {
                    LOGGER.info("Partition onFinish: {}", (Object)partition.getToken());
                }

                @Override
                public void onException(Partition partition, Exception exception) {
                    LOGGER.error("Try to stream again from partition {} after exception {}", (Object)partition.getToken(), (Object)exception.getMessage());
                    SpannerStreamingChangeEventSource.this.partitionManager.updateToReadyForStreaming(partition.getToken());
                }

                @Override
                public boolean onStuckPartition(String token) {
                    if (STUCK_PARTITION_STRATEGY.equals((Object)StuckPartitionStrategy.REPEAT_STREAMING)) {
                        LOGGER.warn("Try to requery partition {}", (Object)token);
                        SpannerStreamingChangeEventSource.this.partitionManager.updateToReadyForStreaming(token);
                    } else if (STUCK_PARTITION_STRATEGY.equals((Object)StuckPartitionStrategy.ESCALATE)) {
                        return true;
                    }
                    return false;
                }
            });
        }
        catch (InterruptedException ex) {
            LOGGER.info("Continue to stop streaming...");
        }
        catch (Exception ex) {
            this.processFailure(ex);
        }
        finally {
            LOGGER.info("Stopping streaming...");
            this.finishPartitionWatchDog.stop();
            if (this.thread != null) {
                this.thread.interrupt();
            }
        }
    }

    private void startProcessing(ChangeEventSource.ChangeEventSourceContext context) {
        this.thread = new Thread(() -> {
            try {
                while (context.isRunning()) {
                    ChangeStreamEvent event = this.eventQueue.take();
                    if (event instanceof DataChangeEvent) {
                        DataChangeEvent dataChangeEvent = (DataChangeEvent)event;
                        this.processDataChangeEvent(dataChangeEvent);
                        continue;
                    }
                    if (event instanceof HeartbeatEvent) {
                        HeartbeatEvent heartbeatEvent = (HeartbeatEvent)event;
                        this.processHeartBeatEvent(heartbeatEvent);
                        continue;
                    }
                    if (event instanceof ChildPartitionsEvent) {
                        ChildPartitionsEvent childPartitionsEvent = (ChildPartitionsEvent)event;
                        this.processChildPartitionsEvent(childPartitionsEvent);
                        continue;
                    }
                    if (!(event instanceof FinishPartitionEvent)) continue;
                    if (this.finishPartitionStrategy.equals((Object)FinishPartitionStrategy.AFTER_COMMIT)) {
                        this.finishingPartitionManager.onPartitionFinishEvent(event.getMetadata().getPartitionToken());
                        continue;
                    }
                    if (!this.finishPartitionStrategy.equals((Object)FinishPartitionStrategy.AFTER_STREAMING_FINISH)) continue;
                    this.finishingPartitionManager.forceFinish(event.getMetadata().getPartitionToken());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (Exception ex) {
                this.processFailure(ex);
            }
        }, "SpannerConnector-SpannerStreamingChangeEventSource");
        this.thread.start();
    }

    @VisibleForTesting
    void processDataChangeEvent(DataChangeEvent event) throws InterruptedException {
        TableId tableId = TableId.getTableId(event.getTableName());
        this.schemaRegistry.checkSchema(tableId, event.getCommitTimestamp(), event.getRowType());
        SpannerPartition partition = new SpannerPartition(event.getPartitionToken());
        for (Mod mod : event.getMods()) {
            String recordUid = UUID.randomUUID().toString();
            SpannerOffsetContext offsetContext = this.offsetContextFactory.getOffsetContextFromDataChangeEvent(mod.getModNumber(), event);
            this.finishingPartitionManager.newRecord(partition.getValue(), recordUid);
            boolean dispatched = this.spannerEventDispatcher.dispatchDataChangeEvent(partition, tableId, (ChangeRecordEmitter)new SpannerChangeRecordEmitter(recordUid, event.getModType(), mod, partition, offsetContext, Clock.SYSTEM));
            if (dispatched) {
                LOGGER.debug("DataChangeEvent has been dispatched form table {} with modification: {}, offset{}, event: {}", new Object[]{tableId.getTableName(), mod, offsetContext.getOffset(), event});
                continue;
            }
            LOGGER.info("DataChangeEvent has not been dispatched form table {} with modification: {}", (Object)tableId.getTableName(), (Object)mod);
        }
    }

    private void processHeartBeatEvent(HeartbeatEvent event) throws InterruptedException {
        SpannerOffsetContext offsetContext = this.offsetContextFactory.getOffsetContextFromHeartbeatEvent(event);
        SpannerPartition partition = new SpannerPartition(event.getMetadata().getPartitionToken());
        this.spannerEventDispatcher.alwaysDispatchHeartbeatEvent(partition, offsetContext);
        LOGGER.debug("Dispatching heartbeat for event {} with partition {} and offset {}", new Object[]{event, partition, offsetContext.getOffset()});
    }

    private void processChildPartitionsEvent(ChildPartitionsEvent event) {
        LOGGER.info("Received ChildPartitionsEvent: {}", (Object)event);
        List<Partition> partitions = event.getChildPartitions().stream().map(childPartition -> {
            Timestamp startTimeStamp = event.getStartTimestamp();
            return Partition.builder().token(childPartition.getToken()).parentTokens(childPartition.getParentTokens()).startTimestamp(startTimeStamp).endTimestamp(event.getMetadata().getPartitionEndTimestamp()).build();
        }).collect(Collectors.toList());
        this.partitionManager.newChildPartitions(partitions);
        this.metricsEventPublisher.publishMetricEvent(new ChildPartitionsMetricEvent(event.getChildPartitions().size()));
    }

    private void processFailure(Exception ex) {
        this.errorHandler.setProducerThrowable((Throwable)ex);
    }

    @Override
    public void commitRecords(List<SourceRecord> records) {
        if (!this.finishPartitionStrategy.equals((Object)FinishPartitionStrategy.AFTER_COMMIT)) {
            return;
        }
        records.forEach(sourceRecord -> {
            String token = SourceRecordUtils.extractToken(sourceRecord);
            String recordUid = SourceRecordUtils.extractRecordUid(sourceRecord);
            if (token == null || recordUid == null) {
                return;
            }
            this.finishingPartitionManager.commitRecord(token, recordUid);
        });
    }
}

