/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.db.stream;

import io.debezium.connector.spanner.db.dao.ChangeStreamDao;
import io.debezium.connector.spanner.db.dao.ChangeStreamResultSet;
import io.debezium.connector.spanner.db.mapper.ChangeStreamRecordMapper;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.model.event.FinishPartitionEvent;
import io.debezium.connector.spanner.db.model.event.HeartbeatEvent;
import io.debezium.connector.spanner.db.stream.ChangeStreamEventConsumer;
import io.debezium.connector.spanner.db.stream.PartitionEventListener;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.DelayChangeStreamEventsMetricEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerChangeStreamService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerChangeStreamService.class);
    private final ChangeStreamDao changeStreamDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final Duration heartbeatMillis;
    private final MetricsEventPublisher metricsEventPublisher;

    public SpannerChangeStreamService(ChangeStreamDao changeStreamDao, ChangeStreamRecordMapper changeStreamRecordMapper, Duration heartbeatMillis, MetricsEventPublisher metricsEventPublisher) {
        this.changeStreamDao = changeStreamDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.heartbeatMillis = heartbeatMillis;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    public void getEvents(Partition partition, ChangeStreamEventConsumer changeStreamEventConsumer, PartitionEventListener partitionEventListener) throws InterruptedException {
        String token = partition.getToken();
        partitionEventListener.onRun(partition);
        LOGGER.info("Streaming {} from {} to {}", new Object[]{token, partition.getStartTimestamp(), partition.getEndTimestamp()});
        try (ChangeStreamResultSet resultSet = this.changeStreamDao.streamQuery(token, partition.getStartTimestamp(), partition.getEndTimestamp(), this.heartbeatMillis.toMillis());){
            long start = this.now();
            while (resultSet.next()) {
                long delay = this.now() - start;
                List<ChangeStreamEvent> events = this.changeStreamRecordMapper.toChangeStreamEvents(partition, resultSet.getCurrentRowAsStruct(), resultSet.getMetadata());
                LOGGER.debug("Events receive from stream: {}", events);
                if (!events.isEmpty() && events.get(0) instanceof HeartbeatEvent) {
                    HeartbeatEvent heartbeatEvent = (HeartbeatEvent)events.get(0);
                    long heartbeatLag = System.currentTimeMillis() - heartbeatEvent.getRecordTimestamp().toSqlTimestamp().toInstant().toEpochMilli();
                    if (heartbeatLag > 60000L) {
                        LOGGER.warn("heartbeat has very old timestamp, lag: {}, token: {}, event: {}", new Object[]{heartbeatLag, heartbeatEvent.getMetadata().getPartitionToken(), heartbeatEvent});
                    }
                }
                this.processEvents(partition, events, changeStreamEventConsumer);
                if (!events.isEmpty() && !(events.get(0) instanceof HeartbeatEvent)) {
                    this.metricsEventPublisher.publishMetricEvent(new DelayChangeStreamEventsMetricEvent((int)delay));
                }
                start = this.now();
            }
        }
        partitionEventListener.onFinish(partition);
        changeStreamEventConsumer.acceptChangeStreamEvent(new FinishPartitionEvent(partition));
    }

    private long now() {
        return Instant.now().toEpochMilli();
    }

    private void processEvents(Partition partition, List<ChangeStreamEvent> events, ChangeStreamEventConsumer changeStreamEventConsumer) throws InterruptedException {
        for (ChangeStreamEvent changeStreamEvent : events) {
            LOGGER.debug("Received record from partition {}: {}", (Object)partition.getToken(), (Object)changeStreamEvent);
            changeStreamEventConsumer.acceptChangeStreamEvent(changeStreamEvent);
        }
    }
}

