/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.iceberg.sink.writer;

import io.mantisrx.connector.iceberg.sink.codecs.IcebergCodecs;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties;
import io.mantisrx.connector.iceberg.sink.writer.factory.DefaultIcebergWriterFactory;
import io.mantisrx.connector.iceberg.sink.writer.metrics.WriterMetrics;
import io.mantisrx.connector.iceberg.sink.writer.partitioner.Partitioner;
import io.mantisrx.connector.iceberg.sink.writer.partitioner.PartitionerFactory;
import io.mantisrx.connector.iceberg.sink.writer.pool.FixedIcebergWriterPool;
import io.mantisrx.connector.iceberg.sink.writer.pool.IcebergWriterPool;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.scheduler.MantisRxSingleThreadScheduler;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;

public class IcebergWriterStage
implements ScalarComputation<Record, DataFile> {
    private static final Logger logger = LoggerFactory.getLogger(IcebergWriterStage.class);
    private Transformer transformer;

    public static ScalarToScalar.Config<Record, DataFile> config() {
        return new ScalarToScalar.Config().description("").codec(IcebergCodecs.dataFile()).serialInput().withParameters(IcebergWriterStage.parameters());
    }

    public static List<ParameterDefinition<?>> parameters() {
        return Arrays.asList(new IntParameter().name("writerRowGroupSize").description(WriterProperties.WRITER_ROW_GROUP_SIZE_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue((Object)100).build(), new StringParameter().name("writerFlushFrequencyBytes").description(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue((Object)"134217728").build(), new StringParameter().name("writerFlushFrequencyMsec").description(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue((Object)"60000").build(), new StringParameter().name("writerFileFormat").description(WriterProperties.WRITER_FILE_FORMAT_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue((Object)WriterProperties.WRITER_FILE_FORMAT_DEFAULT).build(), new IntParameter().name("writerMaximumPoolSize").description(WriterProperties.WRITER_MAXIMUM_POOL_SIZE_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue((Object)5).build());
    }

    public static Transformer newTransformer(Context context) {
        Configuration hadoopConfig = (Configuration)context.getServiceLocator().service(Configuration.class);
        WriterConfig config = new WriterConfig(context.getParameters(), hadoopConfig);
        Catalog catalog = (Catalog)context.getServiceLocator().service(Catalog.class);
        TableIdentifier id = TableIdentifier.of((String[])new String[]{config.getCatalog(), config.getDatabase(), config.getTable()});
        Table table = catalog.loadTable(id);
        WorkerInfo workerInfo = context.getWorkerInfo();
        LocationProvider locationProvider = (LocationProvider)context.getServiceLocator().service(LocationProvider.class);
        DefaultIcebergWriterFactory factory = new DefaultIcebergWriterFactory(config, workerInfo, table, locationProvider);
        FixedIcebergWriterPool writerPool = new FixedIcebergWriterPool(factory, config);
        WriterMetrics metrics = new WriterMetrics();
        PartitionerFactory partitionerFactory = (PartitionerFactory)context.getServiceLocator().service(PartitionerFactory.class);
        Partitioner partitioner = partitionerFactory.getPartitioner(table);
        return IcebergWriterStage.newTransformer(config, metrics, writerPool, partitioner, context.getWorkerInfo(), context.getClassLoader());
    }

    @VisibleForTesting
    static Transformer newTransformer(WriterConfig writerConfig, WriterMetrics writerMetrics, IcebergWriterPool writerPool, Partitioner partitioner, WorkerInfo workerInfo, @Nullable ClassLoader loader) {
        int workerIdx = workerInfo.getWorkerIndex();
        String nameFormat = "IcebergWriter (" + (workerIdx + 1) + ")-%d";
        ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder().setNameFormat(nameFormat);
        if (loader != null) {
            ThreadFactory backingTf = r -> {
                Thread thread = new Thread(r);
                thread.setContextClassLoader(loader);
                return thread;
            };
            tfBuilder.setThreadFactory(backingTf);
        }
        MantisRxSingleThreadScheduler executingService = new MantisRxSingleThreadScheduler(tfBuilder.build());
        return new Transformer(writerConfig, writerMetrics, writerPool, partitioner, Schedulers.computation(), (Scheduler)executingService);
    }

    public void init(Context context) {
        this.transformer = IcebergWriterStage.newTransformer(context);
    }

    public Observable<DataFile> call(Context context, Observable<Record> recordObservable) {
        return recordObservable.compose((Observable.Transformer)this.transformer);
    }

    public static class Transformer
    implements Observable.Transformer<Record, DataFile> {
        private static final Schema TIMEOUT_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"ts_utc_msec", (Type)Types.LongType.get())});
        private static final Record TIMEOUT_RECORD = GenericRecord.create((Schema)TIMEOUT_SCHEMA);
        private final WriterConfig config;
        private final WriterMetrics metrics;
        private final Partitioner partitioner;
        private final IcebergWriterPool writerPool;
        private final Scheduler timerScheduler;
        private final Scheduler transformerScheduler;

        public Transformer(WriterConfig config, WriterMetrics metrics, IcebergWriterPool writerPool, Partitioner partitioner, Scheduler timerScheduler, Scheduler transformerScheduler) {
            this.config = config;
            this.metrics = metrics;
            this.writerPool = writerPool;
            this.partitioner = partitioner;
            this.timerScheduler = timerScheduler;
            this.transformerScheduler = transformerScheduler;
        }

        public Observable<DataFile> call(Observable<Record> source) {
            Observable timer = Observable.interval((long)this.config.getWriterFlushFrequencyMsec(), (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)this.timerScheduler).map(i -> TIMEOUT_RECORD);
            return source.mergeWith(timer).observeOn(this.transformerScheduler).scan((Object)new Trigger(this.config.getWriterRowGroupSize()), (trigger, record) -> {
                if (record.struct().fields().equals(TIMEOUT_SCHEMA.columns())) {
                    trigger.trackAll(this.writerPool.getWriters());
                } else {
                    StructLike partition = this.partitioner.partition((StructLike)record);
                    if (this.writerPool.isClosed(partition)) {
                        try {
                            logger.info("opening file for partition {}", (Object)partition);
                            this.writerPool.open(partition);
                            this.metrics.increment("openSuccessCount");
                        }
                        catch (IOException e) {
                            this.metrics.increment("openFailureCount");
                            throw Exceptions.propagate((Throwable)e);
                        }
                    }
                    try {
                        this.writerPool.write(partition, (Record)record);
                        trigger.increment();
                        if (trigger.isOverCountThreshold()) {
                            trigger.trackAll(this.writerPool.getFlushableWriters());
                        }
                        this.metrics.increment("writeSuccessCount");
                    }
                    catch (RuntimeException e) {
                        this.metrics.increment("writeFailureCount");
                        logger.debug("error writing record {}", record);
                    }
                }
                return trigger;
            }).filter(Trigger::shouldFlush).map(trigger -> {
                ArrayList<DataFile> dataFiles = new ArrayList<DataFile>();
                for (StructLike partition : trigger.getTrackedWriters()) {
                    try {
                        DataFile dataFile = this.writerPool.close(partition);
                        dataFiles.add(dataFile);
                    }
                    catch (IOException | RuntimeException e) {
                        this.metrics.increment("batchFailureCount");
                        logger.error("error writing DataFile", (Throwable)e);
                    }
                }
                trigger.reset();
                return dataFiles;
            }).filter(dataFiles -> !dataFiles.isEmpty()).flatMapIterable(t -> t).doOnNext(dataFile -> {
                this.metrics.increment("batchSuccessCount");
                logger.info("writing DataFile: {}", dataFile);
                this.metrics.setGauge("batchSize", dataFile.recordCount());
                this.metrics.setGauge("batchSizeBytes", dataFile.fileSizeInBytes());
            }).doOnTerminate(() -> {
                try {
                    logger.info("closing writer on rx terminate signal");
                    this.writerPool.closeAll();
                }
                catch (IOException e) {
                    throw Exceptions.propagate((Throwable)e);
                }
            }).share();
        }

        private static class Trigger {
            private final int countThreshold;
            private final Set<StructLike> writers;
            private int counter;

            Trigger(int countThreshold) {
                this.countThreshold = countThreshold;
                this.writers = new HashSet<StructLike>();
            }

            void increment() {
                ++this.counter;
            }

            void reset() {
                this.counter = 0;
                this.writers.clear();
            }

            void trackAll(Set<StructLike> partitions) {
                this.writers.addAll(partitions);
            }

            Set<StructLike> getTrackedWriters() {
                return this.writers;
            }

            boolean isOverCountThreshold() {
                return this.counter >= this.countThreshold;
            }

            boolean shouldFlush() {
                return !this.writers.isEmpty();
            }

            public String toString() {
                return "Trigger{ countThreshold=" + this.countThreshold + ", writers=" + this.writers + ", counter=" + this.counter + '}';
            }
        }
    }
}

