/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.client;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.client.JobSinkLocator;
import io.mantisrx.client.SinkClient;
import io.mantisrx.client.SinkConnection;
import io.mantisrx.client.SinkConnectionFunc;
import io.mantisrx.client.SinkConnectionsStatus;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func1;

public class SinkClientImpl<T>
implements SinkClient<T> {
    private static final Logger logger = LoggerFactory.getLogger(SinkClientImpl.class);
    final String jobId;
    final SinkConnectionFunc<T> sinkConnectionFunc;
    final JobSinkLocator jobSinkLocator;
    private final AtomicBoolean nowClosed = new AtomicBoolean(false);
    private final SinkConnections<T> sinkConnections = new SinkConnections();
    private final String sinkGuageName = "SinkConnections";
    private final String expectedSinksGaugeName = "ExpectedSinkConnections";
    private final String sinkReceivingDataGaugeName = "sinkRecvngData";
    private final String clientNotConnectedToAllSourcesGaugeName = "clientNotConnectedToAllSources";
    private final Gauge sinkGauge;
    private final Gauge expectedSinksGauge;
    private final Gauge sinkReceivingDataGauge;
    private final Gauge clientNotConnectedToAllSourcesGauge;
    private final AtomicInteger numSinkWorkers = new AtomicInteger();
    private final AtomicInteger numSinkRunningWorkers = new AtomicInteger();
    private final Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver;
    private final long dataRecvTimeoutSecs;
    private final Metrics metrics;
    private final boolean disablePingFiltering;

    SinkClientImpl(String jobId, SinkConnectionFunc<T> sinkConnectionFunc, JobSinkLocator jobSinkLocator, Observable<MasterClientWrapper.JobSinkNumWorkers> numSinkWorkersObservable, Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver, long dataRecvTimeoutSecs) {
        this(jobId, sinkConnectionFunc, jobSinkLocator, numSinkWorkersObservable, sinkConnectionsStatusObserver, dataRecvTimeoutSecs, false);
    }

    SinkClientImpl(String jobId, SinkConnectionFunc<T> sinkConnectionFunc, JobSinkLocator jobSinkLocator, Observable<MasterClientWrapper.JobSinkNumWorkers> numSinkWorkersObservable, Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver, long dataRecvTimeoutSecs, boolean disablePingFiltering) {
        this.jobId = jobId;
        this.sinkConnectionFunc = sinkConnectionFunc;
        this.jobSinkLocator = jobSinkLocator;
        BasicTag jobIdTag = new BasicTag("jobId", Optional.ofNullable(jobId).orElse("NullJobId"));
        MetricGroupId metricGroupId = new MetricGroupId(SinkClientImpl.class.getCanonicalName(), new Tag[]{jobIdTag});
        this.metrics = new Metrics.Builder().id(metricGroupId).addGauge("SinkConnections").addGauge("ExpectedSinkConnections").addGauge("sinkRecvngData").addGauge("clientNotConnectedToAllSources").build();
        this.sinkGauge = this.metrics.getGauge("SinkConnections");
        this.expectedSinksGauge = this.metrics.getGauge("ExpectedSinkConnections");
        this.sinkReceivingDataGauge = this.metrics.getGauge("sinkRecvngData");
        this.clientNotConnectedToAllSourcesGauge = this.metrics.getGauge("clientNotConnectedToAllSources");
        numSinkWorkersObservable.doOnNext(jobSinkNumWorkers -> {
            this.numSinkWorkers.set(jobSinkNumWorkers.getNumSinkWorkers());
            this.numSinkRunningWorkers.set(jobSinkNumWorkers.getNumSinkRunningWorkers());
        }).takeWhile(integer -> !this.nowClosed.get()).subscribe();
        this.sinkConnectionsStatusObserver = sinkConnectionsStatusObserver;
        this.dataRecvTimeoutSecs = dataRecvTimeoutSecs;
        this.disablePingFiltering = disablePingFiltering;
    }

    private String toSinkName(String host, int port) {
        return host + "-" + port;
    }

    @Override
    public boolean hasError() {
        return false;
    }

    @Override
    public String getError() {
        return null;
    }

    @Override
    public Observable<Observable<T>> getResults() {
        return this.getPartitionedResults(-1, 0);
    }

    @Override
    public Observable<Observable<T>> getPartitionedResults(int forIndex, int totalPartitions) {
        return this.internalGetResults(forIndex, totalPartitions);
    }

    private <T> Observable<Observable<T>> internalGetResults(int forIndex, int totalPartitions) {
        return this.jobSinkLocator.locatePartitionedSinkForJob(this.jobId, forIndex, totalPartitions).map(new Func1<EndpointChange, Observable<T>>(){

            public Observable<T> call(EndpointChange endpointChange) {
                if (SinkClientImpl.this.nowClosed.get()) {
                    return Observable.empty();
                }
                if (endpointChange.getType() == EndpointChange.Type.complete) {
                    return SinkClientImpl.this.handleEndpointClose(endpointChange);
                }
                return SinkClientImpl.this.handleEndpointConnect(endpointChange);
            }
        }).doOnUnsubscribe(() -> {
            try {
                logger.warn("Closing connections to sink of job {}", (Object)this.jobId);
                this.closeAllConnections();
            }
            catch (Exception e) {
                logger.warn("Error closing all connections to sink of job {}", (Object)this.jobId, (Object)e);
            }
        }).share();
    }

    private <T> Observable<T> handleEndpointConnect(EndpointChange endpoint) {
        logger.info("Opening connection to sink at " + endpoint.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost((String)endpoint.getEndpoint().getHost());
        SinkConnection<T> sinkConnection = this.sinkConnectionFunc.call(unwrappedHost, endpoint.getEndpoint().getPort(), new Action1<Boolean>(){

            public void call(Boolean flag) {
                SinkClientImpl.this.updateSinkConx(flag);
            }
        }, new Action1<Boolean>(){

            public void call(Boolean flag) {
                SinkClientImpl.this.updateSinkDataReceivingStatus(flag);
            }
        }, this.dataRecvTimeoutSecs, this.disablePingFiltering);
        if (this.nowClosed.get()) {
            try {
                sinkConnection.close();
            }
            catch (Exception e2) {
                logger.warn("Error closing sink connection " + sinkConnection.getName() + " - " + e2.getMessage(), (Throwable)e2);
            }
            return Observable.empty();
        }
        ((SinkConnections)this.sinkConnections).put(this.toSinkName(unwrappedHost, endpoint.getEndpoint().getPort()), sinkConnection);
        if (this.nowClosed.get()) {
            try {
                sinkConnection.close();
                ((SinkConnections)this.sinkConnections).remove(this.toSinkName(unwrappedHost, endpoint.getEndpoint().getPort()));
                return Observable.empty();
            }
            catch (Exception e3) {
                logger.warn("Error closing sink connection - " + e3.getMessage());
            }
        }
        return ((Observable)sinkConnection.call()).takeWhile(e -> !this.nowClosed.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSinkDataReceivingStatus(Boolean flag) {
        if (flag.booleanValue()) {
            this.sinkReceivingDataGauge.increment();
        } else {
            this.sinkReceivingDataGauge.decrement();
        }
        this.expectedSinksGauge.set((long)this.numSinkWorkers.get());
        if (this.expectedSinksGauge.value() != this.sinkReceivingDataGauge.value()) {
            this.clientNotConnectedToAllSourcesGauge.set(1L);
        } else {
            this.clientNotConnectedToAllSourcesGauge.set(0L);
        }
        if (this.sinkConnectionsStatusObserver != null) {
            Observer<SinkConnectionsStatus> observer = this.sinkConnectionsStatusObserver;
            synchronized (observer) {
                this.sinkConnectionsStatusObserver.onNext((Object)new SinkConnectionsStatus(this.sinkReceivingDataGauge.value(), this.sinkGauge.value(), this.numSinkWorkers.get(), this.numSinkRunningWorkers.get()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSinkConx(Boolean flag) {
        if (flag.booleanValue()) {
            this.sinkGauge.increment();
        } else {
            this.sinkGauge.decrement();
        }
        this.expectedSinksGauge.set((long)this.numSinkWorkers.get());
        if (this.expectedSinksGauge.value() != this.sinkReceivingDataGauge.value()) {
            this.clientNotConnectedToAllSourcesGauge.set(1L);
        } else {
            this.clientNotConnectedToAllSourcesGauge.set(0L);
        }
        if (this.sinkConnectionsStatusObserver != null) {
            Observer<SinkConnectionsStatus> observer = this.sinkConnectionsStatusObserver;
            synchronized (observer) {
                this.sinkConnectionsStatusObserver.onNext((Object)new SinkConnectionsStatus(this.sinkReceivingDataGauge.value(), this.sinkGauge.value(), this.numSinkWorkers.get(), this.numSinkRunningWorkers.get()));
            }
        }
    }

    private <T> Observable<T> handleEndpointClose(EndpointChange endpoint) {
        logger.info("Closed connection to sink at " + endpoint.toString());
        String unwrappedHost = MasterClientWrapper.getUnwrappedHost((String)endpoint.getEndpoint().getHost());
        SinkConnection removed = ((SinkConnections)this.sinkConnections).remove(this.toSinkName(unwrappedHost, endpoint.getEndpoint().getPort()));
        if (removed != null) {
            try {
                removed.close();
            }
            catch (Exception e) {
                logger.error("Unexpected exception on closing sinkConnection: " + e.getMessage(), (Throwable)e);
            }
        } else {
            logger.error("SinkConnections does not contain endpoint to be removed. host: {}, sinkConnections: {}", (Object)unwrappedHost, this.sinkConnections);
        }
        return Observable.empty();
    }

    private void closeAllConnections() throws Exception {
        this.nowClosed.set(true);
        ((SinkConnections)this.sinkConnections).closeOut(tSinkConnection -> {
            try {
                tSinkConnection.close();
            }
            catch (Exception e) {
                SinkClientImpl.logger.warn("Error closing sink connection " + tSinkConnection.getName() + " - " + e.getMessage(), (Throwable)e);
            }
        });
    }

    class SinkConnections<T> {
        private final Map<String, SinkConnection<T>> sinkConnections = new HashMap<String, SinkConnection<T>>();
        private boolean isClosed = false;

        SinkConnections() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void put(String key, SinkConnection<T> val) {
            Map<String, SinkConnection<T>> map = this.sinkConnections;
            synchronized (map) {
                if (this.isClosed) {
                    return;
                }
                this.sinkConnections.put(key, val);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private SinkConnection<T> remove(String key) {
            Map<String, SinkConnection<T>> map = this.sinkConnections;
            synchronized (map) {
                return this.sinkConnections.remove(key);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeOut(Action1<SinkConnection<T>> onClose) {
            Map<String, SinkConnection<T>> map = this.sinkConnections;
            synchronized (map) {
                this.isClosed = true;
            }
            for (SinkConnection sinkConnection : this.sinkConnections.values()) {
                logger.info("Closing " + sinkConnection.getName());
                onClose.call((Object)sinkConnection);
            }
        }

        public String toString() {
            return "SinkClientImpl.SinkConnections(sinkConnections=" + this.sinkConnections + ", isClosed=" + this.isClosed + ")";
        }
    }
}

