/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.iceberg.sink.writer;

import io.mantisrx.connector.iceberg.sink.codecs.IcebergCodecs;
import io.mantisrx.connector.iceberg.sink.writer.IcebergWriter;
import io.mantisrx.connector.iceberg.sink.writer.PartitionedIcebergWriter;
import io.mantisrx.connector.iceberg.sink.writer.UnpartitionedIcebergWriter;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties;
import io.mantisrx.connector.iceberg.sink.writer.metrics.WriterMetrics;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.exceptions.Exceptions;

public class IcebergWriterStage
implements ScalarComputation<Record, DataFile> {
    private static final Logger logger = LoggerFactory.getLogger(IcebergWriterStage.class);
    private final String[] tableIdentifierNames;
    private Transformer transformer;

    public static ScalarToScalar.Config<Record, DataFile> config() {
        return new ScalarToScalar.Config().description("").codec(IcebergCodecs.dataFile()).withParameters(IcebergWriterStage.parameters());
    }

    public static List<ParameterDefinition<?>> parameters() {
        return Arrays.asList(new IntParameter().name("writerRowGroupSize").description(WriterProperties.WRITER_ROW_GROUP_SIZE_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(1000).build(), new StringParameter().name("writerFlushFrequencyBytes").description(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue("134217728").build(), new StringParameter().name("writerFileFormat").description(WriterProperties.WRITER_FILE_FORMAT_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(WriterProperties.WRITER_FILE_FORMAT_DEFAULT).build());
    }

    public static IcebergWriter newIcebergWriter(WriterConfig config, WorkerInfo workerInfo, Table table2) {
        if (table2.spec().fields().isEmpty()) {
            return new UnpartitionedIcebergWriter(config, workerInfo, table2);
        }
        return new PartitionedIcebergWriter(config, workerInfo, table2);
    }

    public IcebergWriterStage(String ... tableIdentifierNames) {
        this.tableIdentifierNames = tableIdentifierNames;
    }

    @Override
    public void init(Context context) {
        Configuration hadoopConfig = context.getServiceLocator().service(Configuration.class);
        WriterConfig config = new WriterConfig(context.getParameters(), hadoopConfig);
        Catalog catalog = context.getServiceLocator().service(Catalog.class);
        TableIdentifier id = TableIdentifier.of(this.tableIdentifierNames);
        Table table2 = catalog.loadTable(id);
        WorkerInfo workerInfo = context.getWorkerInfo();
        IcebergWriter writer2 = IcebergWriterStage.newIcebergWriter(config, workerInfo, table2);
        WriterMetrics metrics = new WriterMetrics();
        this.transformer = new Transformer(config, metrics, writer2);
    }

    @Override
    public Observable<DataFile> call(Context context, Observable<Record> recordObservable) {
        return recordObservable.compose(this.transformer);
    }

    public static class Transformer
    implements Observable.Transformer<Record, DataFile> {
        private static final DataFile ERROR_DATA_FILE = new DataFiles.Builder().withPath("/error.parquet").withFileSizeInBytes(0L).withRecordCount(0L).build();
        private final WriterConfig config;
        private final WriterMetrics metrics;
        private final IcebergWriter writer;

        public Transformer(WriterConfig config, WriterMetrics metrics, IcebergWriter writer2) {
            this.config = config;
            this.metrics = metrics;
            this.writer = writer2;
        }

        @Override
        public Observable<DataFile> call(Observable<Record> source2) {
            return source2.doOnNext(record -> {
                if (this.writer.isClosed()) {
                    try {
                        this.writer.open();
                        this.metrics.increment("openSuccessCount");
                    }
                    catch (IOException e2) {
                        this.metrics.increment("openFailureCount");
                        throw Exceptions.propagate(e2);
                    }
                }
            }).scan(new Counter(this.config.getWriterRowGroupSize()), (counter, record) -> {
                try {
                    this.writer.write((Record)record);
                    counter.increment();
                    this.metrics.increment("writeSuccessCount");
                }
                catch (RuntimeException e2) {
                    this.metrics.increment("writeFailureCount");
                    logger.error("error writing record {}", record);
                }
                return counter;
            }).filter(Counter::shouldReset).filter(counter -> this.writer.length() >= this.config.getWriterFlushFrequencyBytes()).map(counter -> {
                try {
                    DataFile dataFile = this.writer.close();
                    counter.reset();
                    return dataFile;
                }
                catch (IOException e2) {
                    this.metrics.increment("batchFailureCount");
                    logger.error("error writing DataFile", e2);
                    return ERROR_DATA_FILE;
                }
            }).filter(dataFile -> !this.isErrorDataFile((DataFile)dataFile)).doOnNext(dataFile -> {
                this.metrics.increment("batchSuccessCount");
                logger.info("writing DataFile: {}", dataFile);
                this.metrics.setGauge("batchSize", dataFile.recordCount());
                this.metrics.setGauge("batchSizeBytes", dataFile.fileSizeInBytes());
            }).doOnSubscribe(() -> {
                try {
                    this.writer.open();
                    this.metrics.increment("openSuccessCount");
                }
                catch (IOException e2) {
                    this.metrics.increment("openFailureCount");
                    throw Exceptions.propagate(e2);
                }
            }).doOnTerminate(() -> {
                if (!this.writer.isClosed()) {
                    try {
                        logger.info("closing writer on rx terminate signal");
                        this.writer.close();
                    }
                    catch (IOException e2) {
                        throw Exceptions.propagate(e2);
                    }
                }
            });
        }

        private boolean isErrorDataFile(DataFile dataFile) {
            return ERROR_DATA_FILE.path() == dataFile.path() && ERROR_DATA_FILE.fileSizeInBytes() == dataFile.fileSizeInBytes() && ERROR_DATA_FILE.recordCount() == dataFile.recordCount();
        }

        private static class Counter {
            private final int threshold;
            private int counter;

            Counter(int threshold) {
                this.threshold = threshold;
                this.counter = 0;
            }

            void increment() {
                ++this.counter;
            }

            void reset() {
                this.counter = 0;
            }

            boolean shouldReset() {
                return this.counter >= this.threshold;
            }
        }
    }
}

