/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.iceberg.sink.writer;

import io.mantisrx.connector.iceberg.sink.codecs.IcebergCodecs;
import io.mantisrx.connector.iceberg.sink.writer.DefaultIcebergWriter;
import io.mantisrx.connector.iceberg.sink.writer.IcebergWriter;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties;
import io.mantisrx.connector.iceberg.sink.writer.metrics.WriterMetrics;
import io.mantisrx.connector.iceberg.sink.writer.partitioner.Partitioner;
import io.mantisrx.connector.iceberg.sink.writer.partitioner.PartitionerFactory;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;

public class IcebergWriterStage
implements ScalarComputation<Record, DataFile> {
    private static final Logger logger = LoggerFactory.getLogger(IcebergWriterStage.class);
    private Transformer transformer;

    public static ScalarToScalar.Config<Record, DataFile> config() {
        return new ScalarToScalar.Config().description("").codec(IcebergCodecs.dataFile()).withParameters(IcebergWriterStage.parameters());
    }

    public static List<ParameterDefinition<?>> parameters() {
        return Arrays.asList(new IntParameter().name("writerRowGroupSize").description(WriterProperties.WRITER_ROW_GROUP_SIZE_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(1000).build(), new StringParameter().name("writerFlushFrequencyBytes").description(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue("134217728").build(), new StringParameter().name("writerFlushFrequencyMsec").description(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue("60000").build(), new StringParameter().name("writerFileFormat").description(WriterProperties.WRITER_FILE_FORMAT_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(WriterProperties.WRITER_FILE_FORMAT_DEFAULT).build());
    }

    @Override
    public void init(Context context) {
        Configuration hadoopConfig = context.getServiceLocator().service(Configuration.class);
        WriterConfig config = new WriterConfig(context.getParameters(), hadoopConfig);
        Catalog catalog = context.getServiceLocator().service(Catalog.class);
        TableIdentifier id = TableIdentifier.of((String[])new String[]{config.getCatalog(), config.getDatabase(), config.getTable()});
        Table table2 = catalog.loadTable(id);
        WorkerInfo workerInfo = context.getWorkerInfo();
        DefaultIcebergWriter writer2 = new DefaultIcebergWriter(config, workerInfo, table2);
        WriterMetrics metrics = new WriterMetrics();
        PartitionerFactory partitionerFactory = context.getServiceLocator().service(PartitionerFactory.class);
        Partitioner partitioner = partitionerFactory.getPartitioner(table2);
        this.transformer = new Transformer(config, metrics, writer2, partitioner, Schedulers.computation(), Schedulers.io());
    }

    @Override
    public Observable<DataFile> call(Context context, Observable<Record> recordObservable) {
        return recordObservable.compose(this.transformer);
    }

    public static class Transformer
    implements Observable.Transformer<Record, DataFile> {
        private static final DataFile ERROR_DATA_FILE = new DataFiles.Builder().withPath("/error.parquet").withFileSizeInBytes(0L).withRecordCount(0L).build();
        private static final Schema TIMER_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"ts_utc_msec", (Type)Types.LongType.get())});
        private static final Record TIMER_RECORD = GenericRecord.create((Schema)TIMER_SCHEMA);
        private final WriterConfig config;
        private final WriterMetrics metrics;
        private final Partitioner partitioner;
        private final IcebergWriter writer;
        private final Scheduler timerScheduler;
        private final Scheduler transformerScheduler;

        public Transformer(WriterConfig config, WriterMetrics metrics, IcebergWriter writer2, Partitioner partitioner, Scheduler timerScheduler, Scheduler transformerScheduler) {
            this.config = config;
            this.metrics = metrics;
            this.writer = writer2;
            this.partitioner = partitioner;
            this.timerScheduler = timerScheduler;
            this.transformerScheduler = transformerScheduler;
        }

        @Override
        public Observable<DataFile> call(Observable<Record> source2) {
            Observable<Record> timer = Observable.interval(this.config.getWriterFlushFrequencyMsec(), TimeUnit.MILLISECONDS, this.timerScheduler).map(i -> TIMER_RECORD);
            return source2.mergeWith(timer).observeOn(this.transformerScheduler).scan(new Trigger(this.config.getWriterRowGroupSize()), (trigger, record) -> {
                if (record.struct().fields().equals(TIMER_SCHEMA.columns())) {
                    trigger.timeout();
                } else {
                    StructLike partition2 = this.partitioner.partition((StructLike)record);
                    if (this.writer.isClosed()) {
                        try {
                            logger.info("opening file for partition {}", (Object)partition2);
                            this.writer.open(partition2);
                            trigger.setPartition(partition2);
                            this.metrics.increment("openSuccessCount");
                        }
                        catch (IOException e2) {
                            this.metrics.increment("openFailureCount");
                            throw Exceptions.propagate(e2);
                        }
                    }
                    if (trigger.isNewPartition(partition2)) {
                        trigger.setPartition(partition2);
                        try {
                            DataFile dataFile = this.writer.close();
                            trigger.stage(dataFile);
                            trigger.reset();
                        }
                        catch (IOException | RuntimeException e3) {
                            this.metrics.increment("batchFailureCount");
                            logger.error("error writing DataFile", e3);
                        }
                        try {
                            logger.info("opening file for new partition {}", (Object)partition2);
                            this.writer.open(partition2);
                            this.metrics.increment("openSuccessCount");
                        }
                        catch (IOException e4) {
                            this.metrics.increment("openFailureCount");
                            throw Exceptions.propagate(e4);
                        }
                    }
                    try {
                        this.writer.write((Record)record);
                        trigger.increment();
                        this.metrics.increment("writeSuccessCount");
                    }
                    catch (RuntimeException e5) {
                        this.metrics.increment("writeFailureCount");
                        logger.debug("error writing record {}", record);
                    }
                }
                return trigger;
            }).filter(this::shouldFlush).filter(trigger -> !this.writer.isClosed()).map(trigger -> {
                if (trigger.hasStagedDataFile()) {
                    DataFile dataFile = trigger.getStagedDataFile().copy();
                    trigger.clearStagedDataFile();
                    return dataFile;
                }
                try {
                    DataFile dataFile = this.writer.close();
                    trigger.reset();
                    return dataFile;
                }
                catch (IOException | RuntimeException e2) {
                    this.metrics.increment("batchFailureCount");
                    logger.error("error writing DataFile", e2);
                    return ERROR_DATA_FILE;
                }
            }).filter(dataFile -> !this.isErrorDataFile((DataFile)dataFile)).doOnNext(dataFile -> {
                this.metrics.increment("batchSuccessCount");
                logger.info("writing DataFile: {}", dataFile);
                this.metrics.setGauge("batchSize", dataFile.recordCount());
                this.metrics.setGauge("batchSizeBytes", dataFile.fileSizeInBytes());
            }).doOnTerminate(() -> {
                try {
                    logger.info("closing writer on rx terminate signal");
                    this.writer.close();
                }
                catch (IOException e2) {
                    throw Exceptions.propagate(e2);
                }
            });
        }

        private boolean isErrorDataFile(DataFile dataFile) {
            return Comparators.charSequences().compare(ERROR_DATA_FILE.path(), dataFile.path()) == 0 && ERROR_DATA_FILE.fileSizeInBytes() == dataFile.fileSizeInBytes() && ERROR_DATA_FILE.recordCount() == dataFile.recordCount();
        }

        private boolean shouldFlush(Trigger trigger) {
            return trigger.hasStagedDataFile() || trigger.isOverCountThreshold() && this.writer.length() >= this.config.getWriterFlushFrequencyBytes() || trigger.isTimedOut();
        }

        private static class Trigger {
            private final int countThreshold;
            private int counter;
            private boolean timedOut;
            private StructLike partition;
            private DataFile stagedDataFile;

            Trigger(int countThreshold) {
                this.countThreshold = countThreshold;
            }

            void increment() {
                ++this.counter;
            }

            void timeout() {
                this.timedOut = true;
            }

            void setPartition(StructLike newPartition) {
                this.partition = newPartition;
            }

            void reset() {
                this.counter = 0;
                this.timedOut = false;
            }

            void stage(DataFile dataFile) {
                this.stagedDataFile = dataFile;
            }

            boolean isOverCountThreshold() {
                return this.counter >= this.countThreshold;
            }

            boolean isTimedOut() {
                return this.timedOut;
            }

            boolean isNewPartition(StructLike newPartition) {
                return this.partition != null && !this.partition.equals(newPartition);
            }

            boolean hasStagedDataFile() {
                return this.stagedDataFile != null;
            }

            DataFile getStagedDataFile() {
                return this.stagedDataFile;
            }

            void clearStagedDataFile() {
                this.stagedDataFile = null;
            }

            public String toString() {
                return "Trigger{ countThreshold=" + this.countThreshold + ", counter=" + this.counter + ", timedOut=" + this.timedOut + ", partition=" + this.partition + ", stagedDataFile=" + this.stagedDataFile + '}';
            }
        }
    }
}

