/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
import org.neo4j.causalclustering.messaging.Channel;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.ReconnectingChannel;
import org.neo4j.causalclustering.messaging.ReconnectingChannels;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;

public class SenderService
extends LifecycleAdapter
implements Outbound<AdvertisedSocketAddress, Message> {
    private ReconnectingChannels channels;
    private final ChannelInitializer channelInitializer;
    private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();
    private final Log log;
    private JobScheduler.JobHandle jobHandle;
    private boolean senderServiceRunning;
    private Bootstrap bootstrap;
    private NioEventLoopGroup eventLoopGroup;

    public SenderService(ChannelInitializer channelInitializer, LogProvider logProvider) {
        this.channelInitializer = channelInitializer;
        this.log = logProvider.getLog(this.getClass());
        this.channels = new ReconnectingChannels();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(AdvertisedSocketAddress to, Message message, boolean block) {
        Future<Void> future;
        this.serviceLock.readLock().lock();
        try {
            if (!this.senderServiceRunning) {
                return;
            }
            future = this.channel(to).writeAndFlush(message);
        }
        finally {
            this.serviceLock.readLock().unlock();
        }
        if (block) {
            try {
                future.get();
            }
            catch (ExecutionException e) {
                this.log.error("Exception while sending to: " + to, (Throwable)e);
            }
            catch (InterruptedException e) {
                this.log.info("Interrupted while sending", (Throwable)e);
            }
        }
    }

    private Channel channel(AdvertisedSocketAddress destination) {
        ReconnectingChannel channel = this.channels.get(destination);
        if (channel == null) {
            channel = new ReconnectingChannel(this.bootstrap, this.eventLoopGroup.next(), (SocketAddress)destination, this.log);
            channel.start();
            ReconnectingChannel existingNonBlockingChannel = this.channels.putIfAbsent(destination, channel);
            if (existingNonBlockingChannel != null) {
                channel.dispose();
                channel = existingNonBlockingChannel;
            } else {
                this.log.info("Creating channel to: [%s] ", new Object[]{destination});
            }
        }
        return channel;
    }

    public synchronized void start() {
        this.serviceLock.writeLock().lock();
        try {
            this.eventLoopGroup = new NioEventLoopGroup(0, (ThreadFactory)new NamedThreadFactory("sender-service"));
            this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)this.eventLoopGroup)).channel(NioSocketChannel.class)).handler((ChannelHandler)this.channelInitializer);
            this.senderServiceRunning = true;
        }
        finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public synchronized void stop() {
        this.serviceLock.writeLock().lock();
        try {
            this.senderServiceRunning = false;
            if (this.jobHandle != null) {
                this.jobHandle.cancel(true);
                this.jobHandle = null;
            }
            Iterator<ReconnectingChannel> itr = this.channels.values().iterator();
            while (itr.hasNext()) {
                Channel timestampedChannel = itr.next();
                timestampedChannel.dispose();
                itr.remove();
            }
            try {
                this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
            }
            catch (InterruptedException e) {
                this.log.warn("Interrupted while stopping sender service.");
            }
        }
        finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public Stream<Pair<AdvertisedSocketAddress, ProtocolStack>> installedProtocols() {
        return this.channels.installedProtocols();
    }
}

