/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.iceberg.sink.committer;

import io.mantisrx.connector.iceberg.sink.committer.IcebergCommitter;
import io.mantisrx.connector.iceberg.sink.committer.config.CommitterConfig;
import io.mantisrx.connector.iceberg.sink.committer.config.CommitterProperties;
import io.mantisrx.connector.iceberg.sink.committer.metrics.CommitterMetrics;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

public class IcebergCommitterStage
implements ScalarComputation<DataFile, Map<String, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(IcebergCommitterStage.class);
    private Transformer transformer;

    public static ScalarToScalar.Config<DataFile, Map<String, Object>> config() {
        return new ScalarToScalar.Config().description("").codec(JacksonCodecs.mapStringObject()).withParameters(IcebergCommitterStage.parameters());
    }

    public static List<ParameterDefinition<?>> parameters() {
        return Arrays.asList(new StringParameter().name("sinkCatalog").description("Name of Iceberg Catalog").validator(Validators.notNullOrEmpty()).required().build(), new StringParameter().name("sinkDatabase").description("Name of database within Iceberg Catalog").validator(Validators.notNullOrEmpty()).required().build(), new StringParameter().name("sinkTable").description("Name of table within database").validator(Validators.notNullOrEmpty()).required().build(), new StringParameter().name("commitFrequencyMs").description(CommitterProperties.COMMIT_FREQUENCY_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue("300000").build());
    }

    @Override
    public void init(Context context) {
        CommitterConfig config = new CommitterConfig(context.getParameters());
        CommitterMetrics metrics = new CommitterMetrics();
        Catalog catalog = context.getServiceLocator().service(Catalog.class);
        TableIdentifier id = TableIdentifier.of((String[])new String[]{config.getCatalog(), config.getDatabase(), config.getTable()});
        Table table2 = catalog.loadTable(id);
        IcebergCommitter committer = new IcebergCommitter(table2);
        this.transformer = new Transformer(config, metrics, committer, Schedulers.computation());
    }

    @Override
    public Observable<Map<String, Object>> call(Context context, Observable<DataFile> dataFileObservable) {
        return dataFileObservable.compose(this.transformer);
    }

    public static class Transformer
    implements Observable.Transformer<DataFile, Map<String, Object>> {
        private final CommitterConfig config;
        private final CommitterMetrics metrics;
        private final IcebergCommitter committer;
        private final Scheduler scheduler;

        public Transformer(CommitterConfig config, CommitterMetrics metrics, IcebergCommitter committer, Scheduler scheduler) {
            this.config = config;
            this.metrics = metrics;
            this.committer = committer;
            this.scheduler = scheduler;
        }

        @Override
        public Observable<Map<String, Object>> call(Observable<DataFile> source2) {
            return source2.buffer(this.config.getCommitFrequencyMs(), TimeUnit.MILLISECONDS, this.scheduler).doOnNext(dataFiles -> this.metrics.increment("invocationCount")).filter(dataFiles -> !dataFiles.isEmpty()).map(dataFiles -> {
                try {
                    long start = this.scheduler.now();
                    Map<String, Object> summary = this.committer.commit((List<DataFile>)dataFiles);
                    long now = this.scheduler.now();
                    this.metrics.setGauge("commitLatencyMsec", now - start);
                    this.metrics.setGauge("commitBatchSize", dataFiles.size());
                    return summary;
                }
                catch (RuntimeException e2) {
                    this.metrics.increment("commitFailureCount");
                    logger.error("error committing to Iceberg", e2);
                    return new HashMap();
                }
            }).filter(summary -> !summary.isEmpty()).doOnNext(summary -> {
                this.metrics.increment("commitSuccessCount");
                logger.info("committed {}", summary);
            });
        }
    }
}

