/*
 * Decompiled with CFR 0.152.
 */
package net.e6tech.elements.cassandra.etl;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import net.e6tech.elements.cassandra.async.Async;
import net.e6tech.elements.cassandra.etl.BatchStrategy;
import net.e6tech.elements.cassandra.etl.ETLContext;
import net.e6tech.elements.cassandra.etl.LastUpdate;
import net.e6tech.elements.cassandra.etl.Partition;
import net.e6tech.elements.cassandra.etl.PartitionContext;
import net.e6tech.elements.common.resources.Resources;
import net.e6tech.elements.common.util.TextBuilder;

public class PartitionStrategy<S extends Partition, C extends PartitionContext>
implements BatchStrategy<S, C> {
    @Override
    public int load(C context, List<S> source) {
        if (((PartitionContext)context).getLoadDelegate() != null) {
            return ((PartitionContext)context).getLoadDelegate().apply(source);
        }
        return 0;
    }

    public Map<Comparable, Long> queryPartitions(C context) {
        LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
        Comparable end = ((ETLContext)context).getCutoff();
        String partitionKey = ((ETLContext)context).getInspector().getPartitionKeyColumn(0);
        String table = ((ETLContext)context).tableName();
        HashMap map = new HashMap();
        ArrayList partitions = new ArrayList();
        ((ETLContext)context).open().accept(Resources.class, res -> {
            String query = TextBuilder.using((String)"select ${pk}, count(*) from ${table} where ${pk} > ${start} and ${pk} < ${end} group by ${pk} allow filtering").build("pk", (Object)partitionKey, "table", (Object)table, "start", (Object)lastUpdate.getLastUpdate(), "end", (Object)end);
            ResultSet rs = ((Session)res.getInstance(Session.class)).execute(query);
            for (Row row : rs.all()) {
                Comparable pk = (Comparable)row.get(0, context.getPartitionKeyType());
                map.put(pk, row.getLong(1));
                partitions.add(pk);
            }
        });
        Collections.sort(partitions);
        LinkedHashMap<Comparable, Long> result = new LinkedHashMap<Comparable, Long>();
        for (Comparable partition : partitions) {
            result.put(partition, (Long)map.get(partition));
        }
        return result;
    }

    @Override
    public List<S> extract(C context) {
        String query = TextBuilder.using((String)"select * from ${table} where ${pk} = :partitionKey").build("table", (Object)((ETLContext)context).tableName(), "pk", (Object)((ETLContext)context).getInspector().getPartitionKeyColumn(0));
        PreparedStatement pstmt = ((PartitionContext)context).getPreparedStatements().computeIfAbsent("extract", key -> context.getSession().prepare(query));
        Async async = ((ETLContext)context).createAsync(pstmt);
        for (Comparable hour : ((PartitionContext)context).getPartitions()) {
            async.execute(bound -> bound.set("partitionKey", (Object)hour, hour.getClass()));
        }
        ArrayList list = new ArrayList();
        async.inExecutionOrder(rs -> list.addAll(context.getMapper(context.getSourceClass()).map(rs).all()));
        return list;
    }

    @Override
    public int run(C context) {
        int importedCount = 0;
        ((ETLContext)context).initialize();
        Map<Comparable, Long> partitions = this.queryPartitions(context);
        logger.info("Extracting Class {} to {}", (Object)((ETLContext)context).getSourceClass(), this.getClass());
        ((PartitionContext)context).reset();
        ArrayList<Comparable> concurrent = new ArrayList<Comparable>();
        while (partitions.size() > 0) {
            LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
            concurrent.clear();
            boolean first = true;
            long count = 0L;
            for (Map.Entry<Comparable, Long> entry : partitions.entrySet()) {
                if (first) {
                    concurrent.add(entry.getKey());
                    first = false;
                    count = entry.getValue();
                    continue;
                }
                if (count + entry.getValue() > (long)((ETLContext)context).getBatchSize()) break;
                count += entry.getValue().longValue();
                concurrent.add(entry.getKey());
            }
            for (Comparable partition : concurrent) {
                partitions.remove(partition);
                lastUpdate.update(partition);
            }
            importedCount += this.run(concurrent, context);
            ((ETLContext)context).saveLastUpdate(lastUpdate);
        }
        logger.info("Done loading {} instances of {}", (Object)importedCount, (Object)((ETLContext)context).getSourceClass());
        ((PartitionContext)context).reset();
        return importedCount;
    }

    public int run(List<Comparable> partitions, C context) {
        ((PartitionContext)context).setPartitions(partitions);
        List<S> batchResults = this.extract(context);
        int processedCount = this.load(context, batchResults);
        if (logger.isInfoEnabled()) {
            logger.info("Processed {} instance of {}", (Object)processedCount, (Object)((ETLContext)context).extractor());
        }
        return processedCount;
    }
}

