/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.iceberg.sink.writer.pool;

import io.mantisrx.connector.iceberg.sink.writer.IcebergWriter;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
import io.mantisrx.connector.iceberg.sink.writer.factory.IcebergWriterFactory;
import io.mantisrx.connector.iceberg.sink.writer.pool.IcebergWriterPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.Record;

public class FixedIcebergWriterPool
implements IcebergWriterPool {
    private final IcebergWriterFactory factory;
    private final Map<StructLike, IcebergWriter> pool;
    private final long flushFrequencyBytes;
    private final int maximumPoolSize;

    public FixedIcebergWriterPool(IcebergWriterFactory factory, WriterConfig writerConfig) {
        this(factory, writerConfig.getWriterFlushFrequencyBytes(), writerConfig.getWriterMaximumPoolSize());
    }

    public FixedIcebergWriterPool(IcebergWriterFactory factory, long flushFrequencyBytes, int maximumPoolSize) {
        this.factory = factory;
        this.flushFrequencyBytes = flushFrequencyBytes;
        this.maximumPoolSize = maximumPoolSize;
        this.pool = new HashMap<StructLike, IcebergWriter>(this.maximumPoolSize);
    }

    @Override
    public void open(StructLike partition) throws IOException {
        if (this.pool.size() >= this.maximumPoolSize) {
            throw new IOException("problem opening writer; maximum writer pool size (" + this.maximumPoolSize + ") exceeded");
        }
        if (!this.isClosed(partition)) {
            return;
        }
        IcebergWriter writer = this.factory.newIcebergWriter();
        writer.open(partition);
        this.pool.put(partition, writer);
    }

    @Override
    public void write(StructLike partition, Record record) {
        IcebergWriter writer = this.pool.get(partition);
        if (writer == null) {
            throw new RuntimeException("writer does not exist in writer pool");
        }
        writer.write(record);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DataFile close(StructLike partition) throws IOException, UncheckedIOException {
        IcebergWriter writer = this.pool.get(partition);
        if (writer == null) {
            throw new RuntimeException("writer does not exist in writer pool");
        }
        try {
            DataFile dataFile = writer.close();
            return dataFile;
        }
        finally {
            this.pool.remove(partition);
        }
    }

    @Override
    public List<DataFile> closeAll() throws IOException, UncheckedIOException {
        ArrayList<DataFile> dataFiles = new ArrayList<DataFile>();
        for (StructLike partition : this.pool.keySet()) {
            DataFile dataFile = this.close(partition);
            if (dataFile == null) continue;
            dataFiles.add(dataFile);
        }
        return dataFiles;
    }

    @Override
    public Set<StructLike> getWriters() {
        return this.pool.keySet();
    }

    @Override
    public Set<StructLike> getFlushableWriters() {
        return this.pool.entrySet().stream().filter(entry -> ((IcebergWriter)entry.getValue()).length() >= this.flushFrequencyBytes).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    @Override
    public boolean isClosed(StructLike partition) {
        return !this.pool.containsKey(partition);
    }
}

