/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.datastreamer;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.Nullable;

public class DataStreamProcessor<K, V>
extends GridProcessorAdapter {
    private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<DataStreamerImpl>();
    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
    private Thread flusher;
    private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new DelayQueue();
    private final Marshaller marsh;
    private byte[] marshErrBytes;

    public DataStreamProcessor(GridKernalContext ctx) {
        super(ctx);
        if (!ctx.clientNode()) {
            ctx.io().addMessageListener(GridTopic.TOPIC_DATASTREAM, new GridMessageListener(){

                @Override
                public void onMessage(UUID nodeId, Object msg, byte plc) {
                    assert (msg instanceof DataStreamerRequest);
                    DataStreamProcessor.this.processRequest(nodeId, (DataStreamerRequest)msg);
                }
            });
        }
        this.marsh = ctx.config().getMarshaller();
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.marshErrBytes = U.marshal(this.marsh, (Object)new IgniteCheckedException("Failed to marshal response error, see node log for details."));
        this.flusher = new IgniteThread(new GridWorker(this.ctx.igniteInstanceName(), "grid-data-loader-flusher", this.log){

            @Override
            protected void body() throws InterruptedException {
                while (!this.isCancelled()) {
                    DataStreamerImpl ldr = (DataStreamerImpl)DataStreamProcessor.this.flushQ.take();
                    if (!DataStreamProcessor.this.busyLock.enterBusy()) {
                        return;
                    }
                    try {
                        if (ldr.isClosed()) continue;
                        ldr.tryFlush();
                        DataStreamProcessor.this.flushQ.offer(ldr);
                    }
                    finally {
                        DataStreamProcessor.this.busyLock.leaveBusy();
                    }
                }
            }
        });
        this.flusher.setUncaughtExceptionHandler(new OomExceptionHandler(this.ctx));
        this.flusher.start();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started data streamer processor.");
        }
    }

    @Override
    public void onKernalStop(boolean cancel) {
        if (!this.ctx.clientNode()) {
            this.ctx.io().removeMessageListener(GridTopic.TOPIC_DATASTREAM);
        }
        this.busyLock.block();
        U.interrupt(this.flusher);
        U.join(this.flusher, this.log);
        for (DataStreamerImpl ldr : this.ldrs) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
            }
            try {
                ldr.closeEx(cancel);
            }
            catch (IgniteInterruptedCheckedException e) {
                U.warn(this.log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to close data streamer: " + ldr, e);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stopped data streamer processor.");
        }
    }

    @Override
    public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
        for (DataStreamerImpl ldr : this.ldrs) {
            ldr.onDisconnected(reconnectFut);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
        if (!this.busyLock.enterBusy()) {
            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
        }
        try {
            final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<K, V>(this.ctx, cacheName, this.flushQ);
            this.ldrs.add(ldr);
            ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> f) {
                    boolean b = DataStreamProcessor.this.ldrs.remove(ldr);
                    assert (b) : "Loader has not been added to set: " + ldr;
                    if (DataStreamProcessor.this.log.isDebugEnabled()) {
                        DataStreamProcessor.this.log.debug("Loader has been completed: " + ldr);
                    }
                }
            });
            DataStreamerImpl<K, V> dataStreamerImpl = ldr;
            return dataStreamerImpl;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processRequest(final UUID nodeId, final DataStreamerRequest req) {
        if (!this.busyLock.enterBusy()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Ignoring data load request (node is stopping): " + req);
            }
            return;
        }
        try {
            StreamReceiver updater;
            ClassLoader clsLdr;
            Object topic;
            AffinityTopologyVersion rmtAffVer;
            AffinityTopologyVersion locAffVer;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing data load request: " + req);
            }
            if ((locAffVer = this.ctx.cache().context().exchange().readyAffinityVersion()).compareTo(rmtAffVer = req.topologyVersion()) < 0) {
                IgniteInternalFuture<AffinityTopologyVersion> fut;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received request has higher affinity topology version [request=" + req + ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']');
                }
                if ((fut = this.ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer)) != null && !fut.isDone()) {
                    final byte plc = DataStreamProcessor.threadIoPolicy();
                    fut.listen(new CI1<IgniteInternalFuture<?>>(){

                        @Override
                        public void apply(IgniteInternalFuture<?> t) {
                            DataStreamProcessor.this.ctx.closure().runLocalSafe((Runnable)new GridPlainRunnable(){

                                @Override
                                public void run() {
                                    DataStreamProcessor.this.processRequest(nodeId, req);
                                }
                            }, plc);
                        }
                    });
                    return;
                }
            }
            try {
                topic = U.unmarshal(this.marsh, req.responseTopicBytes(), U.resolveClassLoader(null, this.ctx.config()));
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to unmarshal topic from request: " + req, e);
                this.busyLock.leaveBusy();
                return;
            }
            if (req.forceLocalDeployment()) {
                clsLdr = U.gridClassLoader();
            } else {
                GridDeployment dep = this.ctx.deploy().getGlobalDeployment(req.deploymentMode(), req.sampleClassName(), req.sampleClassName(), req.userVersion(), nodeId, req.classLoaderId(), req.participants(), null);
                if (dep == null) {
                    this.sendResponse(nodeId, topic, req.requestId(), new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + ", req=" + req + ']'), false);
                    return;
                }
                clsLdr = dep.classLoader();
            }
            try {
                updater = (StreamReceiver)U.unmarshal(this.marsh, req.updaterBytes(), U.resolveClassLoader(clsLdr, this.ctx.config()));
                if (updater != null) {
                    this.ctx.resource().injectGeneric(updater);
                }
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
                this.sendResponse(nodeId, topic, req.requestId(), e, false);
                this.busyLock.leaveBusy();
                return;
            }
            this.localUpdate(nodeId, req, updater, topic);
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void localUpdate(final UUID nodeId, final DataStreamerRequest req, final StreamReceiver<K, V> updater, final Object topic) {
        block22: {
            boolean allowOverwrite = !(updater instanceof DataStreamerImpl.IsolatedUpdater);
            try {
                GridCacheAdapter cache = this.ctx.cache().internalCache(req.cacheName());
                if (cache == null) {
                    throw new IgniteCheckedException("Cache not created or already destroyed: " + req.cacheName());
                }
                GridCacheContext cctx = cache.context();
                DataStreamerUpdateJob job = null;
                GridFutureAdapter waitFut = null;
                if (!allowOverwrite) {
                    cctx.topology().readLock();
                }
                GridDhtTopologyFuture topWaitFut = null;
                try {
                    ClusterTopologyCheckedException remapErr = null;
                    AffinityTopologyVersion streamerFutTopVer = null;
                    if (!allowOverwrite) {
                        AffinityTopologyVersion topVer;
                        GridDhtTopologyFuture topFut = cctx.topologyVersionFuture();
                        AffinityTopologyVersion affinityTopologyVersion = topVer = topFut.isDone() ? topFut.topologyVersion() : topFut.initialVersion();
                        if (topVer.compareTo(req.topologyVersion()) > 0) {
                            remapErr = new ClusterTopologyCheckedException("DataStreamer will retry data transfer at stable topology [reqTop=" + req.topologyVersion() + ", topVer=" + topFut.initialVersion() + ", node=remote]");
                        } else if (!topFut.isDone()) {
                            topWaitFut = topFut;
                        } else {
                            streamerFutTopVer = topFut.topologyVersion();
                        }
                    }
                    if (remapErr != null) {
                        this.sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment());
                        return;
                    }
                    if (topWaitFut == null) {
                        job = new DataStreamerUpdateJob(this.ctx, this.log, req.cacheName(), req.entries(), req.ignoreDeploymentOwnership(), req.skipStore(), req.keepBinary(), updater);
                        waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(streamerFutTopVer);
                    }
                }
                finally {
                    if (!allowOverwrite) {
                        cctx.topology().readUnlock();
                    }
                }
                if (topWaitFut != null) {
                    topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>(){

                        @Override
                        public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
                            DataStreamProcessor.this.localUpdate(nodeId, req, updater, topic);
                        }
                    });
                    return;
                }
                try {
                    job.call();
                    this.sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
                }
                finally {
                    if (waitFut != null) {
                        waitFut.onDone();
                    }
                }
            }
            catch (Throwable e) {
                this.sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment());
                if (!(e instanceof Error)) break block22;
                throw (Error)e;
            }
        }
    }

    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, boolean forceLocDep) {
        block5: {
            byte[] errBytes;
            try {
                errBytes = err != null ? U.marshal(this.marsh, (Object)err) : null;
            }
            catch (Exception e) {
                U.error(this.log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
                errBytes = this.marshErrBytes;
            }
            DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
            try {
                this.ctx.io().sendToCustomTopic(nodeId, resTopic, (Message)res, DataStreamProcessor.threadIoPolicy());
            }
            catch (IgniteCheckedException e) {
                if (this.ctx.discovery().alive(nodeId)) {
                    U.error(this.log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
                }
                if (!this.log.isDebugEnabled()) break block5;
                this.log.debug("Node has left the grid: " + nodeId);
            }
        }
    }

    private static byte threadIoPolicy() {
        Byte plc = GridIoManager.currentPolicy();
        if (plc == null) {
            plc = 9;
        }
        return plc;
    }

    public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode node) {
        assert (node != null);
        Byte res = null;
        if (rslvr != null) {
            res = rslvr.apply(node);
        }
        if (res == null) {
            res = 9;
        }
        return res;
    }

    @Override
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> Data streamer processor memory stats [igniteInstanceName=" + this.ctx.igniteInstanceName() + ']', new Object[0]);
        X.println(">>>   ldrsSize: " + this.ldrs.size(), new Object[0]);
    }
}

