/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryViewWalker;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryHistory;
import org.apache.ignite.internal.processors.query.QueryHistoryKey;
import org.apache.ignite.internal.processors.query.QueryHistoryTracker;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest;
import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.systemview.view.SqlQueryHistoryView;
import org.apache.ignite.spi.systemview.view.SqlQueryView;
import org.jetbrains.annotations.Nullable;

public class RunningQueryManager {
    public static final String SQL_USER_QUERIES_REG_NAME = "sql.queries.user";
    public static final String SQL_QRY_VIEW = MetricUtils.metricName("sql", "queries");
    public static final String SQL_QRY_VIEW_DESC = "Running SQL queries.";
    public static final String SQL_QRY_HIST_VIEW = MetricUtils.metricName("sql", "queries", "history");
    public static final String SQL_QRY_HIST_VIEW_DESC = "SQL queries history.";
    public static final long UNDEFINED_QUERY_ID = 0L;
    private final GridClosureProcessor closure;
    private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<Long, GridRunningQueryInfo>();
    private final AtomicLong qryIdGen = new AtomicLong();
    private final UUID localNodeId;
    private final int histSz;
    private volatile QueryHistoryTracker qryHistTracker;
    private final LongAdderMetric successQrsCnt;
    private final AtomicLongMetric failedQrsCnt;
    private final AtomicLongMetric canceledQrsCnt;
    private final GridKernalContext ctx;
    private final IgniteLogger log;
    private final ThreadLocal<GridRunningQueryInfo> currQryInfo = new ThreadLocal();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private GridSpinBusyLock busyLock;
    private final ConcurrentMap<Long, CancelQueryFuture> cancellationRuns = new ConcurrentHashMap<Long, CancelQueryFuture>();
    private final AtomicLong qryCancelReqCntr = new AtomicLong();
    private volatile boolean stopped;
    private final CIX2<ClusterNode, Message> locNodeMsgHnd = new CIX2<ClusterNode, Message>(){

        @Override
        public void applyx(ClusterNode locNode, Message msg) {
            RunningQueryManager.this.onMessage(locNode.id(), msg);
        }
    };
    private final List<Consumer<GridQueryStartedInfo>> qryStartedListeners = new CopyOnWriteArrayList<Consumer<GridQueryStartedInfo>>();
    private final List<Consumer<GridQueryFinishedInfo>> qryFinishedListeners = new CopyOnWriteArrayList<Consumer<GridQueryFinishedInfo>>();

    public RunningQueryManager(GridKernalContext ctx) {
        this.ctx = ctx;
        this.log = ctx.log(this.getClass());
        this.localNodeId = ctx.localNodeId();
        this.histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize();
        this.closure = ctx.closure();
        this.qryHistTracker = new QueryHistoryTracker(this.histSz);
        ctx.systemView().registerView(SQL_QRY_VIEW, SQL_QRY_VIEW_DESC, new SqlQueryViewWalker(), this.runs.values(), SqlQueryView::new);
        ctx.systemView().registerView(SQL_QRY_HIST_VIEW, SQL_QRY_HIST_VIEW_DESC, new SqlQueryHistoryViewWalker(), this.qryHistTracker.queryHistory().values(), SqlQueryHistoryView::new);
        MetricRegistry userMetrics = ctx.metric().registry(SQL_USER_QUERIES_REG_NAME);
        this.successQrsCnt = userMetrics.longAdderMetric("success", "Number of successfully executed user queries that have been started on this node.");
        this.failedQrsCnt = userMetrics.longMetric("failed", "Total number of failed by any reason (cancel, etc) queries that have been started on this node.");
        this.canceledQrsCnt = userMetrics.longMetric("canceled", "Number of canceled queries that have been started on this node. This metric number included in the general 'failed' metric.");
    }

    public void start(GridSpinBusyLock busyLock) {
        this.busyLock = busyLock;
        this.ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, (nodeId, msg, plc) -> this.onMessage(nodeId, msg));
        this.ctx.event().addLocalEventListener(new GridLocalEventListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onEvent(Event evt) {
                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
                ArrayList<CancelQueryFuture> futs = new ArrayList<CancelQueryFuture>();
                RunningQueryManager.this.lock.writeLock().lock();
                try {
                    Iterator it = RunningQueryManager.this.cancellationRuns.values().iterator();
                    while (it.hasNext()) {
                        CancelQueryFuture fut = (CancelQueryFuture)it.next();
                        if (!fut.nodeId().equals(nodeId)) continue;
                        futs.add(fut);
                        it.remove();
                    }
                }
                finally {
                    RunningQueryManager.this.lock.writeLock().unlock();
                }
                futs.forEach(f -> f.onDone("Query node has left the grid: [nodeId=" + nodeId + "]"));
            }
        }, 12, 11);
    }

    public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc, @Nullable GridQueryCancel cancel, String qryInitiatorId, boolean enforceJoinOrder, boolean lazy, boolean distributedJoins) {
        long qryId = this.qryIdGen.incrementAndGet();
        if (qryInitiatorId == null) {
            qryInitiatorId = SqlFieldsQuery.threadedQueryInitiatorId();
        }
        GridRunningQueryInfo run = new GridRunningQueryInfo(qryId, this.localNodeId, qry, qryType, schemaName, System.currentTimeMillis(), this.ctx.performanceStatistics().enabled() ? System.nanoTime() : 0L, cancel, loc, qryInitiatorId, enforceJoinOrder, lazy, distributedJoins, SecurityUtils.securitySubjectId(this.ctx));
        GridRunningQueryInfo preRun = this.runs.putIfAbsent(qryId, run);
        if (this.ctx.performanceStatistics().enabled()) {
            this.currQryInfo.set(run);
        }
        assert (preRun == null) : "Running query already registered [prev_qry=" + preRun + ", newQry=" + run + ']';
        run.span().addTag("sql.query.id", run::globalQueryId);
        if (!this.qryStartedListeners.isEmpty()) {
            GridQueryStartedInfo info = new GridQueryStartedInfo(run.id(), this.localNodeId, run.query(), run.queryType(), run.schemaName(), run.startTime(), run.cancelable(), run.local(), run.enforceJoinOrder(), run.lazy(), run.distributedJoins(), run.queryInitiatorId());
            try {
                this.closure.runLocal(() -> this.qryStartedListeners.forEach(lsnr -> {
                    try {
                        lsnr.accept(info);
                    }
                    catch (Exception ex) {
                        this.log.error("Listener fails during handling query started event [qryId=" + qryId + "]", ex);
                    }
                }), (byte)0);
            }
            catch (IgniteCheckedException ex) {
                throw new IgniteException(ex.getMessage(), ex);
            }
        }
        return qryId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregister(long qryId, @Nullable Throwable failReason) {
        if (qryId <= 0L) {
            return;
        }
        boolean failed = failReason != null;
        GridRunningQueryInfo qry = (GridRunningQueryInfo)this.runs.remove(qryId);
        if (qry == null) {
            return;
        }
        Span qrySpan = qry.span();
        try {
            if (failed) {
                qrySpan.addTag("error", failReason::getMessage);
            }
            if (!this.qryFinishedListeners.isEmpty()) {
                GridQueryFinishedInfo info = new GridQueryFinishedInfo(qry.id(), this.localNodeId, qry.query(), qry.queryType(), qry.schemaName(), qry.startTime(), System.currentTimeMillis(), qry.local(), qry.enforceJoinOrder(), qry.lazy(), qry.distributedJoins(), failed, failReason, qry.queryInitiatorId());
                try {
                    this.closure.runLocal(() -> this.qryFinishedListeners.forEach(lsnr -> {
                        try {
                            lsnr.accept(info);
                        }
                        catch (Exception ex) {
                            this.log.error("Listener fails during handling query finished event [qryId=" + qryId + "]", ex);
                        }
                    }), (byte)0);
                }
                catch (IgniteCheckedException ex) {
                    throw new IgniteException(ex.getMessage(), ex);
                }
            }
            if (this.isSqlQuery(qry)) {
                qry.runningFuture().onDone();
                this.qryHistTracker.collectHistory(qry, failed);
                if (!failed) {
                    this.successQrsCnt.increment();
                } else {
                    this.failedQrsCnt.increment();
                    if (QueryUtils.wasCancelled(failReason)) {
                        this.canceledQrsCnt.increment();
                    }
                }
            }
            if (this.ctx.performanceStatistics().enabled() && qry.startTimeNanos() > 0L) {
                this.ctx.performanceStatistics().query(qry.queryType(), qry.query(), qry.requestId(), qry.startTime(), System.nanoTime() - qry.startTimeNanos(), !failed);
            }
        }
        finally {
            qrySpan.end();
        }
    }

    public void trackRequestId(long reqId) {
        GridRunningQueryInfo info;
        if (this.ctx.performanceStatistics().enabled() && (info = this.currQryInfo.get()) != null) {
            info.requestId(reqId);
        }
    }

    public List<GridRunningQueryInfo> runningSqlQueries() {
        ArrayList<GridRunningQueryInfo> res = new ArrayList<GridRunningQueryInfo>();
        for (GridRunningQueryInfo run : this.runs.values()) {
            if (!this.isSqlQuery(run)) continue;
            res.add(run);
        }
        return res;
    }

    public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
        A.notNull(lsnr, "lsnr");
        this.qryStartedListeners.add(lsnr);
    }

    public boolean unregisterQueryStartedListener(Object lsnr) {
        A.notNull(lsnr, "lsnr");
        return this.qryStartedListeners.remove(lsnr);
    }

    public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
        A.notNull(lsnr, "lsnr");
        this.qryFinishedListeners.add(lsnr);
    }

    public boolean unregisterQueryFinishedListener(Object lsnr) {
        A.notNull(lsnr, "lsnr");
        return this.qryFinishedListeners.remove(lsnr);
    }

    private boolean isSqlQuery(GridRunningQueryInfo runningQryInfo) {
        return runningQryInfo.queryType() == GridCacheQueryType.SQL_FIELDS || runningQryInfo.queryType() == GridCacheQueryType.SQL;
    }

    public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
        ArrayList<GridRunningQueryInfo> res = new ArrayList<GridRunningQueryInfo>();
        long curTime = System.currentTimeMillis();
        for (GridRunningQueryInfo runningQryInfo : this.runs.values()) {
            if (!runningQryInfo.longQuery(curTime, duration)) continue;
            res.add(runningQryInfo);
        }
        return res;
    }

    public void cancelLocalQuery(long qryId) {
        GridRunningQueryInfo run = (GridRunningQueryInfo)this.runs.get(qryId);
        if (run != null) {
            run.cancel();
        }
    }

    public void stop() {
        this.stopped = true;
        this.completeCancellationFutures("Local node is stopping: [nodeId=" + this.ctx.localNodeId() + "]");
        Iterator iter = this.runs.values().iterator();
        while (iter.hasNext()) {
            try {
                GridRunningQueryInfo r = (GridRunningQueryInfo)iter.next();
                iter.remove();
                r.cancel();
            }
            catch (Exception exception) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelQuery(long queryId, @Nullable UUID nodeId, boolean async) {
        CancelQueryFuture fut;
        block13: {
            this.lock.readLock().lock();
            try {
                ClusterNode node;
                if (this.stopped) {
                    throw new IgniteSQLException("Failed to cancel query due to node is stopped [nodeId=" + nodeId + ", qryId=" + queryId + "]");
                }
                ClusterNode clusterNode = node = nodeId != null ? this.ctx.discovery().node(nodeId) : this.ctx.discovery().localNode();
                if (node != null) {
                    fut = new CancelQueryFuture(nodeId, queryId);
                    long reqId = this.qryCancelReqCntr.incrementAndGet();
                    this.cancellationRuns.put(reqId, fut);
                    final GridQueryKillRequest request = new GridQueryKillRequest(reqId, queryId, async);
                    if (node.isLocal() && !async) {
                        this.locNodeMsgHnd.apply(node, request);
                        break block13;
                    }
                    try {
                        if (node.isLocal()) {
                            this.ctx.closure().runLocal(new GridPlainRunnable(){

                                @Override
                                public void run() {
                                    if (!RunningQueryManager.this.busyLock.enterBusy()) {
                                        return;
                                    }
                                    try {
                                        RunningQueryManager.this.locNodeMsgHnd.apply(node, request);
                                    }
                                    finally {
                                        RunningQueryManager.this.busyLock.leaveBusy();
                                    }
                                }
                            }, (byte)3);
                        } else {
                            this.ctx.io().sendGeneric(node, (Object)GridTopic.TOPIC_QUERY, GridTopic.TOPIC_QUERY.ordinal(), request, (byte)3);
                        }
                        break block13;
                    }
                    catch (IgniteCheckedException e) {
                        this.cancellationRuns.remove(reqId);
                        throw new IgniteSQLException("Failed to cancel query due communication problem [nodeId=" + node.id() + ",qryId=" + queryId + ", errMsg=" + e.getMessage() + "]");
                    }
                }
                throw new IgniteSQLException("Failed to cancel query, node is not alive [nodeId=" + nodeId + ", qryId=" + queryId + "]");
            }
            finally {
                this.lock.readLock().unlock();
            }
        }
        try {
            String err;
            String string = err = fut != null ? (String)fut.get() : null;
            if (err != null) {
                throw new IgniteSQLException("Failed to cancel query [nodeId=" + nodeId + ", qryId=" + queryId + ", err=" + err + "]");
            }
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSQLException("Failed to cancel query [nodeId=" + nodeId + ", qryId=" + queryId + ", err=" + e + "]", e);
        }
    }

    public void onDisconnected() {
        this.completeCancellationFutures("Failed to cancel query because local client node has been disconnected from the cluster");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeCancellationFutures(@Nullable String err) {
        this.lock.writeLock().lock();
        try {
            Iterator it = this.cancellationRuns.values().iterator();
            while (it.hasNext()) {
                CancelQueryFuture fut = (CancelQueryFuture)it.next();
                fut.onDone(err);
                it.remove();
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void onMessage(UUID nodeId, Object msg) {
        assert (msg != null);
        ClusterNode node = this.ctx.discovery().node(nodeId);
        if (node == null) {
            return;
        }
        boolean processed = true;
        if (msg instanceof GridQueryKillRequest) {
            this.onQueryKillRequest((GridQueryKillRequest)msg, node);
        }
        if (msg instanceof GridQueryKillResponse) {
            this.onQueryKillResponse((GridQueryKillResponse)msg);
        } else {
            processed = false;
        }
        if (processed && this.log.isDebugEnabled()) {
            this.log.debug("Processed response: " + nodeId + "->" + this.ctx.localNodeId() + " " + msg);
        }
    }

    private void onQueryKillRequest(GridQueryKillRequest msg, ClusterNode node) {
        long qryId = msg.nodeQryId();
        String err = null;
        GridRunningQueryInfo runningQryInfo = (GridRunningQueryInfo)this.runs.get(qryId);
        if (runningQryInfo == null) {
            err = "Query with provided ID doesn't exist [nodeId=" + this.ctx.localNodeId() + ", qryId=" + qryId + "]";
        } else if (!runningQryInfo.cancelable()) {
            err = "Query doesn't support cancellation [nodeId=" + this.ctx.localNodeId() + ", qryId=" + qryId + "]";
        }
        if (msg.asyncResponse() || err != null) {
            this.sendKillResponse(msg, node, err);
        }
        if (err == null) {
            try {
                runningQryInfo.cancel();
            }
            catch (Exception e) {
                U.warn(this.log, "Cancellation of query failed: [qryId=" + qryId + "]", e);
                if (!msg.asyncResponse()) {
                    this.sendKillResponse(msg, node, e.getMessage());
                }
                return;
            }
            if (!msg.asyncResponse()) {
                runningQryInfo.runningFuture().listen(f -> this.sendKillResponse(msg, node, (String)f.result()));
            }
        }
    }

    private void sendKillResponse(GridQueryKillRequest request, ClusterNode node, @Nullable String err) {
        GridQueryKillResponse response = new GridQueryKillResponse(request.requestId(), err);
        if (node.isLocal()) {
            this.locNodeMsgHnd.apply(node, response);
            return;
        }
        try {
            this.ctx.io().sendGeneric(node, (Object)GridTopic.TOPIC_QUERY, GridTopic.TOPIC_QUERY.ordinal(), response, (byte)3);
        }
        catch (IgniteCheckedException e) {
            U.warn(this.log, "Failed to send message [node=" + node + ", msg=" + response + ", errMsg=" + e.getMessage() + "]");
            U.warn(this.log, "Response on query cancellation wasn't send back: [qryId=" + request.nodeQryId() + "]");
        }
    }

    private void onQueryKillResponse(GridQueryKillResponse msg) {
        CancelQueryFuture fut;
        this.lock.readLock().lock();
        try {
            fut = (CancelQueryFuture)this.cancellationRuns.remove(msg.requestId());
        }
        finally {
            this.lock.readLock().unlock();
        }
        if (fut != null) {
            fut.onDone(msg.error());
        }
    }

    public Map<QueryHistoryKey, QueryHistory> queryHistoryMetrics() {
        return this.qryHistTracker.queryHistory();
    }

    @Nullable
    public GridRunningQueryInfo runningQueryInfo(long qryId) {
        return (GridRunningQueryInfo)this.runs.get(qryId);
    }

    public void resetQueryHistoryMetrics() {
        this.qryHistTracker = new QueryHistoryTracker(this.histSz);
    }

    public String toString() {
        return S.toString(RunningQueryManager.class, this);
    }

    private static class CancelQueryFuture
    extends GridFutureAdapter<String> {
        private final UUID nodeId;
        private final long nodeQryId;

        public CancelQueryFuture(UUID nodeId, long nodeQryId) {
            assert (nodeId != null);
            this.nodeId = nodeId;
            this.nodeQryId = nodeQryId;
        }

        public UUID nodeId() {
            return this.nodeId;
        }

        public long nodeQryId() {
            return this.nodeQryId;
        }
    }
}

