/*
 * Decompiled with CFR 0.152.
 */
package net.e6tech.elements.cassandra.etl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.e6tech.elements.cassandra.Sibyl;
import net.e6tech.elements.cassandra.async.AsyncPrepared;
import net.e6tech.elements.cassandra.driver.cql.BaseResultSet;
import net.e6tech.elements.cassandra.driver.cql.Prepared;
import net.e6tech.elements.cassandra.etl.PartitionOrderBy;
import net.e6tech.elements.cassandra.etl.PartitionOrderByContext;
import net.e6tech.elements.cassandra.etl.PartitionStrategy;

public class PartitionOrderByStrategy<S extends PartitionOrderBy>
extends PartitionStrategy<S, PartitionOrderByContext> {
    @Override
    public List<S> extract(PartitionOrderByContext context) {
        return (List)context.open().apply(Sibyl.class, sibyl -> {
            int before;
            HashMap results = new HashMap((int)((double)context.getPartitions().size() * 1.4 + 16.0));
            Class sourceClass = context.getSourceClass();
            Prepared pstmt = context.getPreparedStatements().computeIfAbsent("extract", key -> sibyl.getSession().prepare(context.getExtractionQuery()));
            AtomicInteger total = new AtomicInteger(0);
            do {
                AsyncPrepared async = sibyl.createAsync(pstmt);
                for (Comparable partition : context.getPartitions()) {
                    Comparable startId = context.getStartId(partition);
                    Comparable endId = context.getEndId(partition);
                    if (endId.compareTo(startId) <= 0) continue;
                    startId = endId;
                    context.setStartId(partition, startId);
                    async.execute(bound -> bound.set(context.getInspector().getPartitionKeyColumn(0), partition, partition.getClass()).set(context.getInspector().getClusteringKeyColumn(0), context.getStartId(partition), partition.getClass()));
                }
                before = total.get();
                async.inExecutionOrder(rs -> {
                    List subList = sibyl.mapAll(sourceClass, (BaseResultSet)rs);
                    if (!subList.isEmpty()) {
                        PartitionOrderBy last = (PartitionOrderBy)subList.get(subList.size() - 1);
                        Comparable partition = (Comparable)context.getInspector().getPartitionKey(last, 0);
                        context.setEndId(partition, (Comparable)context.getInspector().getClusteringKey(last, 0));
                        List list = results.computeIfAbsent(partition, key -> new ArrayList());
                        list.addAll(subList);
                        total.addAndGet(subList.size());
                    }
                });
            } while (before != total.get() && total.get() < context.getBatchSize());
            ArrayList list = new ArrayList(total.get());
            for (Comparable partition : context.getPartitions()) {
                List subList = (List)results.get(partition);
                if (subList == null) continue;
                list.addAll(subList);
            }
            results.clear();
            return list;
        });
    }

    @Override
    public int run(PartitionOrderByContext context, List<Comparable> partitions) {
        List<S> batchResults = null;
        int processedCount = 0;
        context.setPartitions(partitions);
        batchResults = this.extract(context);
        while (batchResults.size() > 0) {
            processedCount += this.load(context, batchResults);
            if (logger.isInfoEnabled()) {
                logger.info("Processed {} instance of {}", (Object)processedCount, (Object)context.extractor());
            }
            batchResults = this.extract(context);
        }
        return processedCount;
    }
}

