/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.timeout;

import java.io.Closeable;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;

public class GridTimeoutProcessor
extends GridProcessorAdapter {
    private final TimeoutWorker timeoutWorker;
    private final GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjs = new GridConcurrentSkipListSet<GridTimeoutObject>(new Comparator<GridTimeoutObject>(){

        @Override
        public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
            int res = Long.compare(o1.endTime(), o2.endTime());
            if (res != 0) {
                return res;
            }
            return o1.timeoutId().compareTo(o2.timeoutId());
        }
    });
    private final Object mux = new Object();

    public GridTimeoutProcessor(GridKernalContext ctx) {
        super(ctx);
        this.timeoutWorker = new TimeoutWorker();
    }

    @Override
    public void start() {
        new IgniteThread(this.timeoutWorker).start();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Timeout processor started.");
        }
    }

    @Override
    public void stop(boolean cancel) throws IgniteCheckedException {
        this.timeoutWorker.cancel();
        U.join(this.timeoutWorker);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Timeout processor stopped.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean addTimeoutObject(GridTimeoutObject timeoutObj) {
        if (timeoutObj.endTime() <= 0L || timeoutObj.endTime() == Long.MAX_VALUE) {
            return false;
        }
        boolean added = this.timeoutObjs.add(timeoutObj);
        assert (added) : "Duplicate timeout object found: " + timeoutObj;
        if (this.timeoutObjs.firstx() == timeoutObj) {
            Object object = this.mux;
            synchronized (object) {
                this.mux.notify();
            }
        }
        return true;
    }

    public CancelableTask schedule(Runnable task, long delay, long period) {
        assert (delay >= 0L) : delay;
        assert (period > 0L || period == -1L) : period;
        CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
        this.addTimeoutObject(obj);
        return obj;
    }

    public boolean removeTimeoutObject(GridTimeoutObject timeoutObj) {
        return this.timeoutObjs.remove(timeoutObj);
    }

    public void waitAsync(IgniteInternalFuture<?> fut, long timeout, final IgniteBiInClosure<IgniteCheckedException, Boolean> clo) {
        if (timeout == -1L) {
            clo.apply(null, true);
            return;
        }
        if (fut == null || fut.isDone()) {
            clo.apply(null, false);
        } else {
            WaitFutureTimeoutObject timeoutObj = null;
            if (timeout > 0L) {
                timeoutObj = new WaitFutureTimeoutObject(fut, timeout, clo);
                this.addTimeoutObject(timeoutObj);
            }
            final WaitFutureTimeoutObject finalTimeoutObj = timeoutObj;
            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> fut) {
                    if (finalTimeoutObj != null && !finalTimeoutObj.finishGuard.compareAndSet(false, true)) {
                        return;
                    }
                    try {
                        fut.get();
                        clo.apply(null, false);
                    }
                    catch (IgniteCheckedException e) {
                        clo.apply(e, false);
                    }
                    finally {
                        if (finalTimeoutObj != null) {
                            GridTimeoutProcessor.this.removeTimeoutObject(finalTimeoutObj);
                        }
                    }
                }
            });
        }
    }

    @Override
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> Timeout processor memory stats [igniteInstanceName=" + this.ctx.igniteInstanceName() + ']', new Object[0]);
        X.println(">>>   timeoutObjsSize: " + this.timeoutObjs.size(), new Object[0]);
    }

    private static class WaitFutureTimeoutObject
    extends GridTimeoutObjectAdapter {
        private final IgniteInternalFuture<?> fut;
        private final AtomicBoolean finishGuard = new AtomicBoolean();
        private final IgniteBiInClosure<IgniteCheckedException, Boolean> clo;

        WaitFutureTimeoutObject(IgniteInternalFuture<?> fut, long timeout, IgniteBiInClosure<IgniteCheckedException, Boolean> clo) {
            super(timeout);
            this.fut = fut;
            this.clo = clo;
        }

        @Override
        public void onTimeout() {
            if (!this.fut.isDone() && this.finishGuard.compareAndSet(false, true)) {
                this.clo.apply(null, true);
            }
        }

        public String toString() {
            return S.toString(WaitFutureTimeoutObject.class, this);
        }
    }

    public class CancelableTask
    implements GridTimeoutObject,
    Closeable {
        private final IgniteUuid id = IgniteUuid.randomUuid();
        private long endTime;
        private final long period;
        private volatile boolean cancel;
        @GridToStringInclude
        private final Runnable task;

        CancelableTask(Runnable task, long firstTime, long period) {
            this.task = task;
            this.endTime = firstTime;
            this.period = period;
        }

        @Override
        public IgniteUuid timeoutId() {
            return this.id;
        }

        @Override
        public long endTime() {
            return this.endTime;
        }

        @Override
        public synchronized void onTimeout() {
            if (this.cancel) {
                return;
            }
            try {
                this.task.run();
            }
            finally {
                if (!this.cancel && this.period > 0L) {
                    this.endTime = U.currentTimeMillis() + this.period;
                    GridTimeoutProcessor.this.addTimeoutObject(this);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            this.cancel = true;
            CancelableTask cancelableTask = this;
            synchronized (cancelableTask) {
                GridTimeoutProcessor.this.removeTimeoutObject(this);
            }
        }

        public String toString() {
            return S.toString(CancelableTask.class, this);
        }
    }

    private class TimeoutWorker
    extends GridWorker {
        TimeoutWorker() {
            super(GridTimeoutProcessor.this.ctx.config().getIgniteInstanceName(), "grid-timeout-worker", GridTimeoutProcessor.this.log, GridTimeoutProcessor.this.ctx.workersRegistry());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        protected void body() throws InterruptedException {
            Throwable err = null;
            while (true) {
                Object object;
                try {
                    GridTimeoutObject timeoutObj;
                    if (this.isCancelled()) return;
                    this.updateHeartbeat();
                    long now = U.currentTimeMillis();
                    this.onIdle();
                    Iterator iter = GridTimeoutProcessor.this.timeoutObjs.iterator();
                    while (iter.hasNext() && (timeoutObj = (GridTimeoutObject)iter.next()).endTime() <= now) {
                        try {
                            boolean rmvd = GridTimeoutProcessor.this.timeoutObjs.remove(timeoutObj);
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']');
                            }
                            if (!rmvd) continue;
                            timeoutObj.onTimeout();
                        }
                        catch (Throwable e) {
                            if (this.isCancelled() && !(e instanceof Error)) {
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("Error when executing timeout callback: " + timeoutObj);
                                }
                                if (err == null && !this.isCancelled.get()) {
                                    err = new IllegalStateException("Thread " + this.name() + " is terminated unexpectedly.");
                                }
                                if (err instanceof OutOfMemoryError) {
                                    GridTimeoutProcessor.this.ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err));
                                    return;
                                }
                                if (err == null) return;
                                GridTimeoutProcessor.this.ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, err));
                                return;
                            }
                            U.error(this.log, "Error when executing timeout callback: " + timeoutObj, e);
                            if (!(e instanceof Error)) continue;
                            throw e;
                        }
                    }
                    object = GridTimeoutProcessor.this.mux;
                }
                catch (Throwable t) {
                    if (t instanceof InterruptedException) throw t;
                    err = t;
                    throw t;
                }
                synchronized (object) {
                    while (!this.isCancelled()) {
                        GridTimeoutObject first = (GridTimeoutObject)GridTimeoutProcessor.this.timeoutObjs.firstx();
                        if (first != null) {
                            long waitTime = first.endTime() - U.currentTimeMillis();
                            if (waitTime <= 0L) break;
                            this.blockingSectionBegin();
                            try {
                                GridTimeoutProcessor.this.mux.wait(waitTime);
                                continue;
                            }
                            finally {
                                this.blockingSectionEnd();
                                continue;
                            }
                        }
                        this.blockingSectionBegin();
                        try {
                            GridTimeoutProcessor.this.mux.wait(5000L);
                        }
                        finally {
                            this.blockingSectionEnd();
                        }
                    }
                }
            }
            finally {
                if (err == null && !this.isCancelled.get()) {
                    err = new IllegalStateException("Thread " + this.name() + " is terminated unexpectedly.");
                }
                if (err instanceof OutOfMemoryError) {
                    GridTimeoutProcessor.this.ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err));
                } else if (err != null) {
                    GridTimeoutProcessor.this.ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, err));
                }
            }
        }
    }
}

