/*
 * Decompiled with CFR 0.152.
 */
package cascading.stats;

import cascading.flow.FlowException;
import cascading.stats.CascadingStats;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CounterCache<Config, JobStatus, Counters> {
    public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.counter.timeout.seconds";
    public static final String COUNTER_FETCH_RETRIES_PROPERTY = "cascading.counter.fetch.retries";
    public static final String COUNTER_MAX_AGE_PROPERTY = "cascading.counter.age.max.seconds";
    public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds";
    public static final int DEFAULT_TIMEOUT_TIMEOUT_SEC = 0;
    public static final int DEFAULT_FETCH_RETRIES = 3;
    public static final int DEFAULT_CACHED_AGE_MAX = 0;
    public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30;
    private static final Logger LOG = LoggerFactory.getLogger(CounterCache.class);
    private static ExecutorService futuresPool = Executors.newSingleThreadExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "stats-counter-future");
            thread.setDaemon(true);
            return thread;
        }
    });
    private CascadingStats stats;
    private boolean hasCapturedFinalCounters;
    private boolean hasAvailableCounters = true;
    private Counters cachedCounters = null;
    private long lastFetch = -1L;
    private boolean warnedStale = false;
    protected int maxFetchAttempts;
    protected int fetchAttempts;
    protected int timeout;
    protected int maxAge;
    protected final Config configuration;

    protected CounterCache(CascadingStats stats, Config configuration) {
        this.stats = stats;
        this.configuration = configuration;
        this.timeout = this.getIntProperty(COUNTER_TIMEOUT_PROPERTY, 0);
        this.maxFetchAttempts = this.getIntProperty(COUNTER_FETCH_RETRIES_PROPERTY, 3);
        this.maxAge = stats.getType() == CascadingStats.Type.NODE ? this.getIntProperty(NODE_COUNTER_MAX_AGE_PROPERTY, 30) : this.getIntProperty(COUNTER_MAX_AGE_PROPERTY, 0);
    }

    protected abstract int getIntProperty(String var1, int var2);

    public long getLastSuccessfulFetch() {
        return this.lastFetch;
    }

    protected abstract JobStatus getJobStatusClient();

    protected abstract boolean areCountersAvailable(JobStatus var1);

    protected abstract Counters getCounters(JobStatus var1) throws IOException;

    protected abstract Collection<String> getGroupNames(Counters var1);

    protected abstract Set<String> getCountersFor(Counters var1, String var2);

    protected abstract long getCounterValue(Counters var1, Enum var2);

    protected abstract long getCounterValue(Counters var1, String var2, String var3);

    public Collection<String> getCounterGroups() {
        Counters counters = this.cachedCounters();
        if (counters == null) {
            return Collections.emptySet();
        }
        return Collections.unmodifiableCollection(this.getGroupNames(counters));
    }

    public Collection<String> getCounterGroupsMatching(String regex) {
        Counters counters = this.cachedCounters();
        if (counters == null) {
            return Collections.emptySet();
        }
        HashSet<String> results = new HashSet<String>();
        for (String counter : this.getGroupNames(counters)) {
            if (!counter.matches(regex)) continue;
            results.add(counter);
        }
        return Collections.unmodifiableCollection(results);
    }

    public Collection<String> getCountersFor(String group) {
        Counters counters = this.cachedCounters();
        if (counters == null) {
            return Collections.emptySet();
        }
        Set<String> results = this.getCountersFor(counters, group);
        return Collections.unmodifiableCollection(results);
    }

    public long getCounterValue(Enum counter) {
        Counters counters = this.cachedCounters();
        if (counters == null) {
            return 0L;
        }
        return this.getCounterValue(counters, counter);
    }

    public long getCounterValue(String group, String counter) {
        Counters counters = this.cachedCounters();
        if (counters == null) {
            return 0L;
        }
        return this.getCounterValue(counters, group, counter);
    }

    public Counters cachedCounters() {
        return this.cachedCounters(false);
    }

    public synchronized Counters cachedCounters(boolean force) {
        int currentAge;
        boolean isStale;
        if (!this.hasAvailableCounters) {
            return this.cachedCounters;
        }
        if (this.fetchAttempts >= this.maxFetchAttempts) {
            if (!this.hasCapturedFinalCounters && !this.warnedStale) {
                if (this.cachedCounters == null) {
                    LOG.warn("no counters fetched, max num consecutive retries reached: {}, type: {}, status: {}", new Object[]{this.maxFetchAttempts, this.stats.getType(), this.stats.getStatus()});
                } else {
                    LOG.warn("stale counters being returned, max num consecutive retries reached, age: {}, type: {}, status: {}", new Object[]{Util.formatDurationFromMillis(System.currentTimeMillis() - this.lastFetch), this.stats.getType(), this.stats.getStatus()});
                }
                this.warnedStale = true;
            }
            return this.cachedCounters;
        }
        boolean isProcessFinished = this.stats.isFinished();
        if (isProcessFinished && this.hasCapturedFinalCounters) {
            return this.cachedCounters;
        }
        if (!force && isProcessFinished) {
            force = true;
        }
        boolean bl = isStale = (currentAge = (int)((this.lastFetch - System.currentTimeMillis()) / 1000L)) >= this.maxAge;
        if (this.cachedCounters != null && !force && !isStale) {
            return this.cachedCounters;
        }
        JobStatus runningJob = this.getJobStatusClient();
        if (runningJob == null) {
            return this.cachedCounters;
        }
        if (!this.areCountersAvailable(runningJob)) {
            this.hasAvailableCounters = false;
            return this.cachedCounters;
        }
        boolean success = false;
        try {
            Counters fetched = this.fetchCounters(runningJob);
            boolean bl2 = success = fetched != null;
            if (success) {
                this.cachedCounters = fetched;
                this.lastFetch = System.currentTimeMillis();
                this.fetchAttempts = 0;
            }
        }
        catch (InterruptedException exception) {
            LOG.warn("fetching counters was interrupted");
        }
        catch (ExecutionException exception) {
            ++this.fetchAttempts;
            if (this.fetchAttempts >= this.maxFetchAttempts) {
                LOG.error("fetching counters failed, was final consecutive attempt: {}, type: {}, status: {}", new Object[]{this.fetchAttempts, this.stats.getType(), this.stats.getStatus(), exception.getCause()});
            } else {
                LOG.warn("fetching counters failed, consecutive attempts: {}, type: {}, status: {}, message: {}", new Object[]{this.fetchAttempts, this.stats.getType(), this.stats.getStatus(), exception.getCause().getMessage()});
            }
            if (this.cachedCounters != null) {
                LOG.error("returning cached values");
                return this.cachedCounters;
            }
            LOG.error("unable to get remote counters, no cached values, rethrowing exception", exception.getCause());
            if (exception.getCause() instanceof FlowException) {
                throw (FlowException)exception.getCause();
            }
            throw new FlowException(exception.getCause());
        }
        catch (TimeoutException exception) {
            ++this.fetchAttempts;
            if (this.fetchAttempts >= this.maxFetchAttempts) {
                LOG.warn("fetching counters timed out after: {} seconds, was final consecutive attempt: {}, type: {}, status: {}", new Object[]{this.timeout, this.fetchAttempts, this.stats.getType(), this.stats.getStatus()});
            }
            LOG.warn("fetching counters timed out after: {} seconds, consecutive attempts: {}, type: {}, status: {}", new Object[]{this.timeout, this.fetchAttempts, this.stats.getType(), this.stats.getStatus()});
        }
        this.hasCapturedFinalCounters = isProcessFinished && success;
        return this.cachedCounters;
    }

    private Counters fetchCounters(JobStatus runningJob) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.timeout > 0) {
            return this.runFuture(runningJob).get(this.timeout, TimeUnit.SECONDS);
        }
        try {
            return this.getCounters(runningJob);
        }
        catch (IOException exception) {
            throw new FlowException("unable to get remote counter values", exception);
        }
    }

    private Future<Counters> runFuture(final JobStatus jobStatus) {
        Callable task = new Callable<Counters>(){

            @Override
            public Counters call() throws Exception {
                try {
                    return CounterCache.this.getCounters(jobStatus);
                }
                catch (IOException exception) {
                    throw new FlowException("unable to get remote counter values", exception);
                }
            }
        };
        return futuresPool.submit(task);
    }
}

