/*
 * Decompiled with CFR 0.152.
 */
package net.e6tech.elements.cassandra.etl;

import java.io.IOException;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.e6tech.elements.cassandra.Session;
import net.e6tech.elements.cassandra.Sibyl;
import net.e6tech.elements.cassandra.async.AsyncPrepared;
import net.e6tech.elements.cassandra.driver.cql.BaseResultSet;
import net.e6tech.elements.cassandra.driver.cql.Prepared;
import net.e6tech.elements.cassandra.driver.cql.ResultSet;
import net.e6tech.elements.cassandra.driver.cql.Row;
import net.e6tech.elements.cassandra.etl.BatchStrategy;
import net.e6tech.elements.cassandra.etl.ETLContext;
import net.e6tech.elements.cassandra.etl.LastUpdate;
import net.e6tech.elements.cassandra.etl.Partition;
import net.e6tech.elements.cassandra.etl.PartitionContext;
import net.e6tech.elements.common.reflection.ObjectConverter;
import net.e6tech.elements.common.resources.Resources;
import net.e6tech.elements.common.util.SystemException;
import net.e6tech.elements.common.util.TextBuilder;
import net.e6tech.elements.common.util.datastructure.Pair;

public class PartitionStrategy<S extends Partition, C extends PartitionContext>
implements BatchStrategy<S, C> {
    public static final String QUERY_PARTITION = "select ${pk}, count(*) from ${table} where ${pk} > ${start} and ${pk} < ${end} group by ${pk} allow filtering";
    public static final String ASYNC_QUERY_PARTITION = "select ${pk}, count(*) from ${table} where ${pk} = :pk";
    public static final String QUERY_RANGE = "select distinct ${pk} from ${table} where ${pk} > ${start} and ${pk} < ${end} allow filtering";
    public static final String ASYNC_QUERY_RANGE = "select ${pk} from ${table} where ${pk} = :pk";
    private ObjectConverter converter = new ObjectConverter();
    private String partitionTiming;

    @Override
    public int load(C context, List<S> source) {
        if (((PartitionContext)context).getLoadDelegate() != null) {
            return ((PartitionContext)context).getLoadDelegate().applyAsInt(source);
        }
        return 0;
    }

    public Map<Comparable<Object>, Long> queryPartitions(PartitionQuery<C> p) {
        long start = System.currentTimeMillis();
        String query = TextBuilder.using((String)QUERY_PARTITION).build("pk", (Object)p.partitionKey, "table", (Object)p.table, "start", (Object)p.lastUpdate.getLastUpdate(), "end", p.end);
        Map<Comparable<Object>, Long> map = Collections.synchronizedMap(new TreeMap());
        List<Comparable<Object>> partitions = Collections.synchronizedList(new LinkedList());
        ((ETLContext)p.context).open().accept(Resources.class, res -> {
            try {
                ResultSet rs = ((Session)res.getInstance(Session.class)).execute(query);
                List<Row> rows = rs.all();
                for (Row row : rows) {
                    Comparable pk = (Comparable)row.get(0, ((ETLContext)p.context).getPartitionKeyType());
                    map.put(pk, row.get(1, Long.class));
                    partitions.add(pk);
                }
            }
            catch (Exception ex) {
                logger.warn("queryPartitions failed: " + query, (Throwable)ex);
                throw ex;
            }
        });
        Map<Comparable<Object>, Long> ret = this.sortPartitions(map, partitions);
        this.partitionTiming = "queryPartition for " + p.table + " took " + (System.currentTimeMillis() - start) + "ms";
        return ret;
    }

    public Map<Comparable<Object>, Long> queryPartitions2(PartitionQuery<C> p) {
        try {
            new BigDecimal(p.lastUpdate.getLastUpdate());
        }
        catch (Exception ex) {
            logger.warn("Cannot parse latUpdate " + p.lastUpdate.getLastUpdate() + " for " + p.lastUpdate.getExtractor());
            return this.queryPartitions(p);
        }
        if (((ETLContext)p.context).getTimeUnit() != null && p.asyncStep != null && p.asyncStep > 0) {
            long start = System.currentTimeMillis();
            String query = this.buildQuery(p, QUERY_PARTITION, ASYNC_QUERY_PARTITION);
            Map<Comparable<Object>, Long> map = Collections.synchronizedMap(new TreeMap());
            List<Comparable<Object>> partitions = Collections.synchronizedList(new LinkedList());
            this.asyncQuery(p, query, row -> {
                Comparable pk = (Comparable)row.get(0, ((ETLContext)p.context).getPartitionKeyType());
                map.put(pk, row.get(1, Long.class));
                partitions.add(pk);
            });
            Map<Comparable<Object>, Long> ret = this.sortPartitions(map, partitions);
            this.partitionTiming = "queryPartition2 for " + p.table + " took " + (System.currentTimeMillis() - start) + "ms";
            return ret;
        }
        return this.queryPartitions(p);
    }

    private String buildQuery(PartitionQuery<C> p, String query, String asyncQuery) {
        if (((ETLContext)p.context).isAsyncUseFutures()) {
            return TextBuilder.using((String)asyncQuery).build("pk", (Object)p.partitionKey, "table", (Object)p.table);
        }
        return TextBuilder.using((String)query).build("pk", (Object)p.partitionKey, "table", (Object)p.table, "start", (Object)":start", "end", (Object)":end");
    }

    private Map<Comparable<Object>, Long> sortPartitions(Map<Comparable<Object>, Long> map, List<Comparable<Object>> partitions) {
        partitions.sort(null);
        LinkedHashMap<Comparable<Object>, Long> result = new LinkedHashMap<Comparable<Object>, Long>(partitions.size() + 1, 1.0f);
        for (Comparable<Object> partition : partitions) {
            result.put(partition, map.get(partition));
        }
        return result;
    }

    private Pair<BigDecimal, BigDecimal> fromAndTo(PartitionQuery<C> p) {
        BigDecimal from = new BigDecimal(p.lastUpdate.getLastUpdate());
        BigDecimal to = new BigDecimal(((ETLContext)p.context).getCutoff().toString());
        if (from.compareTo(BigDecimal.ZERO) <= 0) {
            from = new BigDecimal(((ETLContext)p.context).getCutoff(System.currentTimeMillis(), ((ETLContext)p.context).getMaxPast()).toString());
        }
        return new Pair((Object)from, (Object)to);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void asyncQuery(PartitionQuery<C> p, String query, Consumer<Row> rowConsumer) {
        Object context = p.context;
        Pair<BigDecimal, BigDecimal> fromTo = this.fromAndTo(p);
        BigDecimal from = (BigDecimal)fromTo.key();
        BigDecimal to = (BigDecimal)fromTo.value();
        BigDecimal start = from;
        List<Range> ranges = this.getAsyncRanges(query, start, to, p.asyncStep, p.asyncMaxChunkSize);
        while (!ranges.isEmpty()) {
            List<Range> working = ranges;
            int retries = p.retries;
            while (retries >= 0) {
                try {
                    this.execAsyncQuery(p.context, query, working, rowConsumer);
                    break;
                }
                catch (Exception ex) {
                    String info = "extractor=" + ((ETLContext)context).extractor() + " sourceClass=" + ((ETLContext)context).getSourceClass() + " tableName=" + ((ETLContext)context).tableName();
                    if (retries == 0) {
                        logger.warn("Cannot transmutate " + query, (Throwable)ex);
                        throw ex;
                    }
                    logger.warn("Cannot transmutate " + query + ", " + retries + " retry attempts left, " + info, (Throwable)ex);
                    try {
                        Thread.sleep(((ETLContext)context).getRetrySleep());
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                finally {
                    --retries;
                }
            }
            Range lastRange = ranges.get(ranges.size() - 1);
            start = lastRange.end.subtract(BigDecimal.ONE);
            ranges = this.getAsyncRanges(query, start, to, p.asyncStep, p.asyncMaxChunkSize);
        }
    }

    protected void execAsyncQuery(C context, String query, List<Range> working, Consumer<Row> rowConsumer) {
        if (((ETLContext)context).isAsyncUseFutures()) {
            this.execAsyncQuery2(context, query, working, rowConsumer);
            return;
        }
        ((ETLContext)context).open().accept(Sibyl.class, sibyl -> sibyl.createAsync(query).execute(working, (range, bound) -> {
            try {
                bound.set("start", this.converter.convert((Object)range.start.toString(), (Type)context.getPartitionKeyType(), null), context.getPartitionKeyType());
                bound.set("end", this.converter.convert((Object)range.end.toPlainString(), (Type)context.getPartitionKeyType(), null), context.getPartitionKeyType());
            }
            catch (IOException ex) {
                throw new SystemException((Throwable)ex);
            }
        }).inExecutionOrderRows(rowConsumer));
    }

    protected void execAsyncQuery2(C context, String query, List<Range> working, Consumer<Row> rowConsumer) {
        for (Range r : working) {
            LinkedList<BigDecimal> list = new LinkedList<BigDecimal>();
            BigDecimal i = r.start.add(BigDecimal.ONE);
            while (i.compareTo(r.end) < 0) {
                list.add(i);
                i = i.add(BigDecimal.ONE);
            }
            ((ETLContext)context).open().accept(Sibyl.class, sibyl -> sibyl.createAsync(query).execute(list, (pk, bound) -> {
                try {
                    bound.set("pk", this.converter.convert((Object)pk.toPlainString(), (Type)context.getPartitionKeyType(), null), context.getPartitionKeyType());
                }
                catch (IOException ex) {
                    throw new SystemException((Throwable)ex);
                }
            }).inExecutionOrderRows(row -> {
                if (!row.isNull(0)) {
                    rowConsumer.accept((Row)row);
                }
            }));
        }
    }

    protected List<Range> getAsyncRanges(String query, BigDecimal from, BigDecimal to, int asyncStepSize, int asyncMaxChunkSize) {
        BigDecimal start = from;
        BigDecimal end = from.add(new BigDecimal(asyncStepSize));
        if (end.compareTo(to) > 0) {
            end = to;
        }
        if (end.subtract(start).compareTo(BigDecimal.ONE) <= 0) {
            return new ArrayList<Range>();
        }
        LinkedList<Range> ranges = new LinkedList<Range>();
        boolean loopEntered = false;
        while (true) {
            if (!loopEntered) {
                loopEntered = true;
            } else if (end.compareTo(to) >= 0 || ranges.size() >= asyncMaxChunkSize) break;
            ranges.add(new Range(start, end));
            start = end.subtract(BigDecimal.ONE);
            if ((end = start.add(new BigDecimal(asyncStepSize))).compareTo(to) <= 0) continue;
            end = to;
        }
        Range first = (Range)ranges.get(0);
        if (from.compareTo(first.start) != 0) {
            logger.warn("async range error for {}, first {} ", (Object)query, (Object)first);
        }
        return new ArrayList<Range>(ranges);
    }

    @Override
    public List<S> extract(C context) {
        return (List)((ETLContext)context).open().apply(Sibyl.class, sibyl -> {
            String query = TextBuilder.using((String)"select * from ${tbl} where ${pk} = :partitionKey").build("tbl", (Object)context.tableName(), "pk", (Object)context.getInspector().getPartitionKeyColumn(0));
            Prepared pstmt = context.getPreparedStatements().computeIfAbsent("extract", key -> sibyl.getSession().prepare(query));
            AsyncPrepared async = sibyl.createAsync(pstmt);
            for (Comparable<?> hour : context.getPartitions()) {
                async.execute(bound -> bound.set("partitionKey", hour, hour.getClass()));
            }
            ArrayList list = new ArrayList();
            async.inExecutionOrder(rs -> list.addAll(sibyl.mapAll(context.getSourceClass(), (BaseResultSet)rs)));
            return list;
        });
    }

    @Override
    public int run(C context) {
        LastUpdate lastUpdate;
        long start = System.currentTimeMillis();
        int importedCount = 0;
        ((ETLContext)context).initialize();
        PartitionQuery<C> p = new PartitionQuery<C>(context);
        Map<Comparable<Object>, Long> partitions = this.queryPartitions2(p);
        ((PartitionContext)context).reset();
        LinkedList concurrent = new LinkedList();
        boolean empty = partitions.isEmpty();
        while (partitions.size() > 0) {
            lastUpdate = ((ETLContext)context).getLastUpdate();
            concurrent.clear();
            boolean first = true;
            long count = 0L;
            for (Map.Entry<Comparable<Object>, Long> entry : partitions.entrySet()) {
                if (first) {
                    concurrent.add(entry.getKey());
                    first = false;
                    count = entry.getValue();
                    continue;
                }
                if (count + entry.getValue() > (long)((ETLContext)context).getBatchSize()) break;
                count += entry.getValue().longValue();
                concurrent.add(entry.getKey());
            }
            int processedCount = this.run(context, concurrent);
            importedCount += processedCount;
            if (logger.isInfoEnabled()) {
                logger.info("Batch loaded {} instances of {}", (Object)processedCount, (Object)((ETLContext)context).extractor());
            }
            for (Comparable comparable : concurrent) {
                partitions.remove(comparable);
                lastUpdate.update(comparable);
            }
            ((ETLContext)context).saveLastUpdate(lastUpdate);
        }
        if (((ETLContext)context).getTimeUnit() != null && empty && ((ETLContext)context).isAsyncUseFutures()) {
            lastUpdate = ((ETLContext)context).getLastUpdate();
            BigDecimal from = new BigDecimal(lastUpdate.getLastUpdate());
            BigDecimal to = new BigDecimal(((ETLContext)p.context).getCutoff().toString());
            if (to.compareTo(from) > 0) {
                BigDecimal currentTimeInTimeUnit = this.timeExpressedInCorrectTimeUnit(((ETLContext)context).getTimeUnit(), System.currentTimeMillis());
                if (to.compareTo(currentTimeInTimeUnit) > 0) {
                    lastUpdate.update(currentTimeInTimeUnit.subtract(BigDecimal.ONE));
                } else {
                    lastUpdate.update(to.subtract(BigDecimal.ONE));
                }
                ((ETLContext)context).saveLastUpdate(lastUpdate);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("{}.run for {} loaded {} instances of {} took {}ms, {}", new Object[]{this.getClass().getSimpleName(), ((ETLContext)context).extractor(), importedCount, ((ETLContext)context).getSourceClass().getSimpleName(), System.currentTimeMillis() - start, this.partitionTiming});
        }
        ((PartitionContext)context).reset();
        return importedCount;
    }

    public BigDecimal timeExpressedInCorrectTimeUnit(TimeUnit timeUnit, long time) {
        BigDecimal timeToConvert = new BigDecimal(time);
        if (timeUnit == TimeUnit.DAYS) {
            return timeToConvert.divide(new BigDecimal(86400000L), 0, RoundingMode.DOWN);
        }
        if (timeUnit == TimeUnit.HOURS) {
            return timeToConvert.divide(new BigDecimal(3600000L), 0, RoundingMode.DOWN);
        }
        if (timeUnit == TimeUnit.MINUTES) {
            return timeToConvert.divide(new BigDecimal(60000L), 0, RoundingMode.DOWN);
        }
        if (timeUnit == TimeUnit.SECONDS) {
            return timeToConvert.divide(new BigDecimal(1000L), 0, RoundingMode.DOWN);
        }
        return timeToConvert;
    }

    public int run(C context, List<Comparable<?>> partitions) {
        ((PartitionContext)context).setPartitions(partitions);
        List<S> batchResults = this.extract(context);
        return this.load(context, batchResults);
    }

    public List<Comparable<Object>> queryRange(PartitionQuery<C> p) {
        long start = System.currentTimeMillis();
        Object context = p.context;
        LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
        Comparable end = ((ETLContext)context).getCutoff();
        String partitionKey = ((ETLContext)context).getInspector().getPartitionKeyColumn(0);
        String table = ((ETLContext)context).tableName();
        List<Comparable<Object>> list = Collections.synchronizedList(new LinkedList());
        String query = TextBuilder.using((String)QUERY_RANGE).build("pk", (Object)partitionKey, "table", (Object)table, "start", (Object)lastUpdate.getLastUpdate(), "end", (Object)end);
        ((ETLContext)context).open().accept(Resources.class, res -> {
            try {
                ResultSet rs = ((Session)res.getInstance(Session.class)).execute(query);
                for (Row row : rs.all()) {
                    list.add((Comparable)row.get(0, context.getPartitionKeyType()));
                }
            }
            catch (Exception ex) {
                logger.warn("PartitionStrategy.queryRange failed: " + query, (Throwable)ex);
                throw ex;
            }
        });
        list.sort(null);
        this.partitionTiming = "queryRange for " + p.table + " took " + (System.currentTimeMillis() - start) + "ms";
        return list;
    }

    public List<Comparable<Object>> queryRange2(PartitionQuery<C> p) {
        try {
            new BigDecimal(p.lastUpdate.getLastUpdate());
        }
        catch (Exception ex) {
            logger.warn("Cannot parse latUpdate " + p.lastUpdate.getLastUpdate() + " for " + p.lastUpdate.getExtractor());
            return this.queryRange(p);
        }
        if (((ETLContext)p.context).getTimeUnit() != null && p.asyncStep != null && p.asyncStep > 0) {
            long start = System.currentTimeMillis();
            List<Comparable<Object>> list = Collections.synchronizedList(new LinkedList());
            String query = this.buildQuery(p, QUERY_RANGE, ASYNC_QUERY_RANGE);
            this.asyncQuery(p, query, row -> list.add((Comparable)row.get(0, ((ETLContext)p.context).getPartitionKeyType())));
            list.sort(null);
            this.partitionTiming = "queryRange2 for " + p.table + " took " + (System.currentTimeMillis() - start) + "ms";
            return list;
        }
        return this.queryRange(p);
    }

    public int runPartitions(C context) {
        long start = System.currentTimeMillis();
        ((ETLContext)context).initialize();
        PartitionQuery<C> p = new PartitionQuery<C>(context);
        List<Comparable<Object>> list = this.queryRange2(p);
        ArrayList batch = new ArrayList(((ETLContext)context).getBatchSize());
        int processCount = 0;
        for (Comparable<Object> c : list) {
            batch.add(c);
            if (batch.size() != ((ETLContext)context).getBatchSize()) continue;
            LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
            processCount += this.runPartitions(batch, context);
            lastUpdate.update((Comparable)batch.get(batch.size() - 1));
            ((ETLContext)context).saveLastUpdate(lastUpdate);
            batch.clear();
        }
        if (!batch.isEmpty()) {
            LastUpdate lastUpdate = ((ETLContext)context).getLastUpdate();
            processCount += this.runPartitions(batch, context);
            lastUpdate.update((Comparable)batch.get(batch.size() - 1));
            ((ETLContext)context).saveLastUpdate(lastUpdate);
            batch.clear();
        }
        if (logger.isInfoEnabled()) {
            logger.info("{}.runPartitions for {} loaded {} instances of {} took {}ms, {}", new Object[]{this.getClass().getSimpleName(), ((ETLContext)context).extractor(), processCount, ((ETLContext)context).getSourceClass().getSimpleName(), System.currentTimeMillis() - start, this.partitionTiming});
        }
        return list.size();
    }

    public int runPartitions(List<Comparable<?>> list, C context) {
        ((PartitionContext)context).setPartitions(list);
        return ((PartitionContext)context).getLoadDelegate().applyAsInt(list);
    }

    public static class PartitionQuery<C extends PartitionContext> {
        C context;
        LastUpdate lastUpdate;
        Comparable<?> end;
        String partitionKey;
        String table;
        int retries;
        Integer asyncStep;
        Integer asyncMaxChunkSize;

        public PartitionQuery(C context) {
            this.context = context;
            this.lastUpdate = ((ETLContext)context).getLastUpdate();
            this.end = ((ETLContext)context).getCutoff();
            this.partitionKey = ((ETLContext)context).getInspector().getPartitionKeyColumn(0);
            this.table = ((ETLContext)context).tableName();
            this.asyncStep = ((ETLContext)context).getAsyncTimeUnitStepSize();
            this.asyncMaxChunkSize = ((ETLContext)context).getAsyncMaxNumOfChunks() == null ? Integer.valueOf(100) : ((ETLContext)context).getAsyncMaxNumOfChunks();
            this.retries = ((ETLContext)context).getRetries();
            if (this.retries < 0) {
                this.retries = 0;
            }
        }
    }

    public static class Range {
        BigDecimal start;
        BigDecimal end;

        public Range(BigDecimal start, BigDecimal end) {
            this.start = start;
            this.end = end;
        }

        public String toString() {
            return "range start= " + this.start + " end=" + this.end;
        }
    }
}

