package io.delta.flink.source.internal.enumerator;

import io.delta.flink.source.internal.DeltaSourceConfiguration;
import io.delta.flink.source.internal.DeltaSourceOptions;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitor;
import io.delta.flink.source.internal.enumerator.processor.ActionProcessor;
import io.delta.flink.source.internal.enumerator.processor.ChangesProcessor;
import io.delta.flink.source.internal.enumerator.processor.ContinuousTableProcessor;
import io.delta.flink.source.internal.enumerator.processor.SnapshotAndChangesTableProcessor;
import io.delta.flink.source.internal.enumerator.processor.SnapshotProcessor;
import io.delta.flink.source.internal.file.AddFileEnumerator;
import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpoint;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import io.delta.flink.source.internal.utils.SourceUtils;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import java.util.Collections;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:io/delta/flink/source/internal/enumerator/ContinuousSplitEnumeratorProvider.class */
public class ContinuousSplitEnumeratorProvider implements SplitEnumeratorProvider {
    private final FileSplitAssigner.Provider splitAssignerProvider;
    private final AddFileEnumerator.Provider<DeltaSourceSplit> fileEnumeratorProvider;

    public ContinuousSplitEnumeratorProvider(FileSplitAssigner.Provider provider, AddFileEnumerator.Provider<DeltaSourceSplit> provider2) {
        this.splitAssignerProvider = provider;
        this.fileEnumeratorProvider = provider2;
    }

    @Override // io.delta.flink.source.internal.enumerator.SplitEnumeratorProvider
    public ContinuousDeltaSourceSplitEnumerator createInitialStateEnumerator(Path path, Configuration configuration, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration) {
        DeltaLog forTable = DeltaLog.forTable(configuration, SourceUtils.pathToString(path));
        return new ContinuousDeltaSourceSplitEnumerator(path, createTableProcessor(path, splitEnumeratorContext, deltaSourceConfiguration, forTable, forTable.getSnapshotForVersionAsOf(((Long) deltaSourceConfiguration.getValue(DeltaSourceOptions.LOADED_SCHEMA_SNAPSHOT_VERSION)).longValue())), this.splitAssignerProvider.create(Collections.emptyList()), splitEnumeratorContext);
    }

    @Override // io.delta.flink.source.internal.enumerator.SplitEnumeratorProvider
    public ContinuousDeltaSourceSplitEnumerator createEnumeratorForCheckpoint(DeltaEnumeratorStateCheckpoint<DeltaSourceSplit> deltaEnumeratorStateCheckpoint, Configuration configuration, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration) {
        return new ContinuousDeltaSourceSplitEnumerator(deltaEnumeratorStateCheckpoint.getDeltaTablePath(), createTableProcessorFromCheckpoint(deltaEnumeratorStateCheckpoint, configuration, splitEnumeratorContext, deltaSourceConfiguration), this.splitAssignerProvider.create(deltaEnumeratorStateCheckpoint.getSplits()), splitEnumeratorContext);
    }

    private ContinuousTableProcessor createTableProcessorFromCheckpoint(DeltaEnumeratorStateCheckpoint<DeltaSourceSplit> deltaEnumeratorStateCheckpoint, Configuration configuration, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration) {
        long snapshotVersion = deltaEnumeratorStateCheckpoint.getSnapshotVersion();
        Path deltaTablePath = deltaEnumeratorStateCheckpoint.getDeltaTablePath();
        DeltaLog forTable = DeltaLog.forTable(configuration, SourceUtils.pathToString(deltaTablePath));
        return deltaEnumeratorStateCheckpoint.isMonitoringForChanges() ? createChangesProcessor(deltaTablePath, splitEnumeratorContext, deltaSourceConfiguration, forTable, snapshotVersion) : createSnapshotAndChangesProcessor(deltaTablePath, splitEnumeratorContext, deltaSourceConfiguration, forTable, forTable.getSnapshotForVersionAsOf(snapshotVersion));
    }

    private ContinuousTableProcessor createTableProcessor(Path path, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration, DeltaLog deltaLog, Snapshot snapshot) {
        return isChangeStreamOnly(deltaSourceConfiguration) ? createChangesProcessor(path, splitEnumeratorContext, deltaSourceConfiguration, deltaLog, snapshot.getVersion()) : createSnapshotAndChangesProcessor(path, splitEnumeratorContext, deltaSourceConfiguration, deltaLog, snapshot);
    }

    private ChangesProcessor createChangesProcessor(Path path, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration, DeltaLog deltaLog, long j) {
        return new ChangesProcessor(path, new TableMonitor(deltaLog, j, ((Long) deltaSourceConfiguration.getValue(DeltaSourceOptions.UPDATE_CHECK_INTERVAL)).longValue(), new ActionProcessor(((Boolean) deltaSourceConfiguration.getValue(DeltaSourceOptions.IGNORE_CHANGES)).booleanValue(), ((Boolean) deltaSourceConfiguration.getValue(DeltaSourceOptions.IGNORE_DELETES)).booleanValue())), splitEnumeratorContext, this.fileEnumeratorProvider.create(), deltaSourceConfiguration);
    }

    private ContinuousTableProcessor createSnapshotAndChangesProcessor(Path path, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration, DeltaLog deltaLog, Snapshot snapshot) {
        return new SnapshotAndChangesTableProcessor(new SnapshotProcessor(path, snapshot, this.fileEnumeratorProvider.create(), Collections.emptySet()), createChangesProcessor(path, splitEnumeratorContext, deltaSourceConfiguration, deltaLog, snapshot.getVersion() + 1));
    }

    @Override // io.delta.flink.source.internal.enumerator.SplitEnumeratorProvider
    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    private boolean isChangeStreamOnly(DeltaSourceConfiguration deltaSourceConfiguration) {
        return deltaSourceConfiguration.hasOption(DeltaSourceOptions.STARTING_VERSION) || deltaSourceConfiguration.hasOption(DeltaSourceOptions.STARTING_TIMESTAMP);
    }

    @Override // io.delta.flink.source.internal.enumerator.SplitEnumeratorProvider
    public /* bridge */ /* synthetic */ SplitEnumerator createEnumeratorForCheckpoint(DeltaEnumeratorStateCheckpoint deltaEnumeratorStateCheckpoint, Configuration configuration, SplitEnumeratorContext splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration) {
        return createEnumeratorForCheckpoint((DeltaEnumeratorStateCheckpoint<DeltaSourceSplit>) deltaEnumeratorStateCheckpoint, configuration, (SplitEnumeratorContext<DeltaSourceSplit>) splitEnumeratorContext, deltaSourceConfiguration);
    }

    @Override // io.delta.flink.source.internal.enumerator.SplitEnumeratorProvider
    public /* bridge */ /* synthetic */ SplitEnumerator createInitialStateEnumerator(Path path, Configuration configuration, SplitEnumeratorContext splitEnumeratorContext, DeltaSourceConfiguration deltaSourceConfiguration) {
        return createInitialStateEnumerator(path, configuration, (SplitEnumeratorContext<DeltaSourceSplit>) splitEnumeratorContext, deltaSourceConfiguration);
    }
}
