/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.igfs;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsIpcEndpointType;
import org.apache.ignite.internal.processors.igfs.IgfsManager;
import org.apache.ignite.internal.processors.igfs.IgfsServer;
import org.apache.ignite.internal.util.ipc.IpcEndpointBindException;
import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.thread.IgniteThread;

public class IgfsServerManager
extends IgfsManager {
    private static final long REBIND_INTERVAL = 3000L;
    private Collection<IgfsServer> srvrs;
    private BindWorker bindWorker;
    private CountDownLatch kernalStartLatch = new CountDownLatch(1);

    @Override
    protected void start0() throws IgniteCheckedException {
        FileSystemConfiguration igfsCfg = this.igfsCtx.configuration();
        if (igfsCfg.isIpcEndpointEnabled()) {
            IgfsIpcEndpointConfiguration ipcCfg = igfsCfg.getIpcEndpointConfiguration();
            if (ipcCfg == null) {
                ipcCfg = new IgfsIpcEndpointConfiguration();
            }
            this.bind(ipcCfg, false);
        }
        if (igfsCfg.getManagementPort() >= 0) {
            IgfsIpcEndpointConfiguration mgmtIpcCfg = new IgfsIpcEndpointConfiguration();
            mgmtIpcCfg.setType(IgfsIpcEndpointType.TCP);
            mgmtIpcCfg.setPort(igfsCfg.getManagementPort());
            this.bind(mgmtIpcCfg, true);
        }
        if (this.bindWorker != null) {
            new IgniteThread(this.bindWorker).start();
        }
    }

    private void bind(IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) throws IgniteCheckedException {
        if (this.srvrs == null) {
            this.srvrs = new ConcurrentLinkedQueue<IgfsServer>();
        }
        IgfsServer ipcSrv = new IgfsServer(this.igfsCtx, endpointCfg, mgmt);
        try {
            ipcSrv.start();
            this.srvrs.add(ipcSrv);
        }
        catch (IpcEndpointBindException ignored) {
            int port = ipcSrv.getIpcServerEndpoint().getPort();
            String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : "";
            U.warn(this.log, "Failed to start IGFS " + (mgmt ? "management " : "") + "endpoint (will retry every " + 3L + "s)." + portMsg);
            if (this.bindWorker == null) {
                this.bindWorker = new BindWorker();
            }
            this.bindWorker.addConfiguration(endpointCfg, mgmt);
        }
    }

    public Collection<IpcServerEndpoint> endpoints() {
        return F.viewReadOnly(this.srvrs, new C1<IgfsServer, IpcServerEndpoint>(){

            @Override
            public IpcServerEndpoint apply(IgfsServer e) {
                return e.getIpcServerEndpoint();
            }
        }, new IgnitePredicate[0]);
    }

    @Override
    protected void onKernalStart0() throws IgniteCheckedException {
        if (!F.isEmpty(this.srvrs)) {
            for (IgfsServer srv : this.srvrs) {
                srv.onKernalStart();
            }
        }
        this.kernalStartLatch.countDown();
    }

    @Override
    protected void stop0(boolean cancel) {
        this.kernalStartLatch.countDown();
        if (this.bindWorker != null) {
            this.bindWorker.cancel();
            U.join(this.bindWorker, this.log);
        }
        if (!F.isEmpty(this.srvrs)) {
            for (IgfsServer srv : this.srvrs) {
                srv.stop(cancel);
            }
        }
    }

    private class BindWorker
    extends GridWorker {
        private Collection<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>> bindCfgs;

        private BindWorker() {
            super(IgfsServerManager.this.igfsCtx.kernalContext().igniteInstanceName(), "bind-worker", IgfsServerManager.this.igfsCtx.kernalContext().log(IgfsServerManager.class));
            this.bindCfgs = new LinkedList<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>>();
        }

        public void addConfiguration(IgfsIpcEndpointConfiguration cfg, boolean mgmt) {
            this.bindCfgs.add(F.t(cfg, mgmt));
        }

        @Override
        protected void body() throws InterruptedException {
            IgfsServerManager.this.kernalStartLatch.await();
            while (!this.isCancelled()) {
                Thread.sleep(3000L);
                Iterator<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>> it = this.bindCfgs.iterator();
                while (it.hasNext()) {
                    IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean> cfg = it.next();
                    IgfsServer ipcSrv = new IgfsServer(IgfsServerManager.this.igfsCtx, cfg.get1(), cfg.get2());
                    try {
                        ipcSrv.start();
                        ipcSrv.onKernalStart();
                        IgfsServerManager.this.srvrs.add(ipcSrv);
                        it.remove();
                    }
                    catch (IgniteCheckedException e) {
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Failed to bind IGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']');
                    }
                }
                if (!this.bindCfgs.isEmpty()) continue;
                break;
            }
        }
    }
}

