/*
 * Decompiled with CFR 0.152.
 */
package net.e6tech.elements.cassandra.etl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.e6tech.elements.cassandra.Session;
import net.e6tech.elements.cassandra.Sibyl;
import net.e6tech.elements.cassandra.async.AsyncPrepared;
import net.e6tech.elements.cassandra.driver.cql.BaseResultSet;
import net.e6tech.elements.cassandra.driver.cql.Prepared;
import net.e6tech.elements.cassandra.driver.cql.ResultSet;
import net.e6tech.elements.cassandra.driver.cql.Row;
import net.e6tech.elements.cassandra.etl.BatchStrategy;
import net.e6tech.elements.cassandra.etl.ETLContext;
import net.e6tech.elements.cassandra.etl.LastUpdate;
import net.e6tech.elements.cassandra.etl.Partition;
import net.e6tech.elements.cassandra.etl.PartitionContext;
import net.e6tech.elements.common.resources.Resources;
import net.e6tech.elements.common.util.TextBuilder;

public class PartitionStrategy<S extends Partition, C extends PartitionContext>
implements BatchStrategy<S, C> {
    @Override
    public int load(C context, List<S> source) {
        if (((PartitionContext)context).getLoadDelegate() != null) {
            return ((PartitionContext)context).getLoadDelegate().applyAsInt(source);
        }
        return 0;
    }

    public Map<Comparable, Long> queryPartitions(C context) {
        LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
        Comparable end = ((ETLContext)context).getCutoff();
        String partitionKey = ((ETLContext)context).getInspector().getPartitionKeyColumn(0);
        String table = ((ETLContext)context).tableName();
        AtomicReference ref = new AtomicReference(new HashMap());
        ArrayList partitions = new ArrayList();
        String query = TextBuilder.using((String)"select ${pk}, count(*) from ${table} where ${pk} > ${start} and ${pk} < ${end} group by ${pk} allow filtering").build("pk", (Object)partitionKey, "table", (Object)table, "start", (Object)lastUpdate.getLastUpdate(), "end", (Object)end);
        ((ETLContext)context).open().accept(Resources.class, res -> {
            ResultSet rs = ((Session)res.getInstance(Session.class)).execute(query);
            List<Row> rows = rs.all();
            ref.set(new HashMap((int)((double)rows.size() * 1.4 + 16.0)));
            Map map = (Map)ref.get();
            for (Row row : rows) {
                Comparable pk = (Comparable)row.get(0, context.getPartitionKeyType());
                map.put(pk, row.get(1, Long.class));
                partitions.add(pk);
            }
        });
        Map map = ref.get();
        Collections.sort(partitions);
        LinkedHashMap<Comparable, Long> result = new LinkedHashMap<Comparable, Long>(partitions.size() + 1, 1.0f);
        for (Comparable partition : partitions) {
            result.put(partition, (Long)map.get(partition));
        }
        return result;
    }

    @Override
    public List<S> extract(C context) {
        return (List)((ETLContext)context).open().apply(Sibyl.class, sibyl -> {
            String query = TextBuilder.using((String)"select * from ${tbl} where ${pk} = :partitionKey").build("tbl", (Object)context.tableName(), "pk", (Object)context.getInspector().getPartitionKeyColumn(0));
            Prepared pstmt = context.getPreparedStatements().computeIfAbsent("extract", key -> sibyl.getSession().prepare(query));
            AsyncPrepared async = sibyl.createAsync(pstmt);
            for (Comparable hour : context.getPartitions()) {
                async.execute(bound -> bound.set("partitionKey", hour, hour.getClass()));
            }
            ArrayList list = new ArrayList();
            async.inExecutionOrder(rs -> list.addAll(sibyl.mapAll(context.getSourceClass(), (BaseResultSet)rs)));
            return list;
        });
    }

    @Override
    public int run(C context) {
        int importedCount = 0;
        ((ETLContext)context).initialize();
        Map<Comparable, Long> partitions = this.queryPartitions(context);
        logger.info("Extracting Class {} to {}", (Object)((ETLContext)context).getSourceClass(), this.getClass());
        ((PartitionContext)context).reset();
        LinkedList<Comparable> concurrent = new LinkedList<Comparable>();
        while (partitions.size() > 0) {
            LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
            concurrent.clear();
            boolean first = true;
            long count = 0L;
            for (Map.Entry<Comparable, Long> entry : partitions.entrySet()) {
                if (first) {
                    concurrent.add(entry.getKey());
                    first = false;
                    count = entry.getValue();
                    continue;
                }
                if (count + entry.getValue() > (long)((ETLContext)context).getBatchSize()) break;
                count += entry.getValue().longValue();
                concurrent.add(entry.getKey());
            }
            importedCount += this.run(context, concurrent);
            for (Comparable partition : concurrent) {
                partitions.remove(partition);
                lastUpdate.update(partition);
            }
            ((ETLContext)context).saveLastUpdate(lastUpdate);
        }
        logger.info("Done loading {} instances of {}", (Object)importedCount, (Object)((ETLContext)context).getSourceClass());
        ((PartitionContext)context).reset();
        return importedCount;
    }

    public int run(C context, List<Comparable> partitions) {
        ((PartitionContext)context).setPartitions(partitions);
        List<S> batchResults = this.extract(context);
        int processedCount = this.load(context, batchResults);
        if (logger.isInfoEnabled()) {
            logger.info("Processed {} instance of {}", (Object)processedCount, (Object)((ETLContext)context).extractor());
        }
        return processedCount;
    }

    public List<Comparable> queryRange(C context) {
        LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
        Comparable end = ((ETLContext)context).getCutoff();
        String partitionKey = ((ETLContext)context).getInspector().getPartitionKeyColumn(0);
        String table = ((ETLContext)context).tableName();
        LinkedList<Comparable> list = new LinkedList<Comparable>();
        String query = TextBuilder.using((String)"select distinct ${pk} from ${table} where ${pk} > ${start} and ${pk} < ${end} allow filtering").build("pk", (Object)partitionKey, "table", (Object)table, "start", (Object)lastUpdate.getLastUpdate(), "end", (Object)end);
        ((ETLContext)context).open().accept(Resources.class, res -> {
            ResultSet rs = ((Session)res.getInstance(Session.class)).execute(query);
            for (Row row : rs.all()) {
                list.add((Comparable)row.get(0, context.getPartitionKeyType()));
            }
        });
        list.sort(null);
        return list;
    }

    public int runPartitions(C context) {
        ((ETLContext)context).initialize();
        List<Comparable> list = this.queryRange(context);
        ArrayList<Comparable> batch = new ArrayList<Comparable>(((ETLContext)context).getBatchSize());
        for (Comparable c : list) {
            batch.add(c);
            if (batch.size() != ((ETLContext)context).getBatchSize()) continue;
            LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
            this.runPartitions(batch, context);
            lastUpdate.update((Comparable)batch.get(batch.size() - 1));
            ((ETLContext)context).saveLastUpdate(lastUpdate);
            batch.clear();
        }
        if (!batch.isEmpty()) {
            LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
            this.runPartitions(batch, context);
            lastUpdate.update((Comparable)batch.get(batch.size() - 1));
            ((ETLContext)context).saveLastUpdate(lastUpdate);
            batch.clear();
        }
        return list.size();
    }

    public void runPartitions(List<Comparable> list, C context) {
        ((PartitionContext)context).setPartitions(list);
        int processCount = ((PartitionContext)context).getLoadDelegate().applyAsInt(list);
        if (logger.isInfoEnabled()) {
            logger.info("Processed {} partitions of {}", (Object)processCount, (Object)((ETLContext)context).extractor());
        }
    }
}

