/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import java.util.Iterator;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.NonBlockingChannel;
import org.neo4j.causalclustering.messaging.NonBlockingChannels;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;

public class SenderService
extends LifecycleAdapter
implements Outbound<AdvertisedSocketAddress, Message> {
    private NonBlockingChannels nonBlockingChannels;
    private final ChannelInitializer<SocketChannel> channelInitializer;
    private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();
    private final Log log;
    private final Monitors monitors;
    private JobScheduler.JobHandle jobHandle;
    private boolean senderServiceRunning;
    private Bootstrap bootstrap;
    private NioEventLoopGroup eventLoopGroup;

    public SenderService(ChannelInitializer<SocketChannel> channelInitializer, LogProvider logProvider, Monitors monitors) {
        this.channelInitializer = channelInitializer;
        this.log = logProvider.getLog(this.getClass());
        this.monitors = monitors;
        this.nonBlockingChannels = new NonBlockingChannels();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(AdvertisedSocketAddress to, Message message, boolean block) {
        Future<Void> future;
        this.serviceLock.readLock().lock();
        try {
            if (!this.senderServiceRunning) {
                return;
            }
            future = this.channel(to).send(message);
        }
        finally {
            this.serviceLock.readLock().unlock();
        }
        if (block) {
            future.awaitUninterruptibly();
        }
    }

    private NonBlockingChannel channel(AdvertisedSocketAddress to) {
        MessageQueueMonitor monitor = (MessageQueueMonitor)this.monitors.newMonitor(MessageQueueMonitor.class, NonBlockingChannel.class, new String[0]);
        NonBlockingChannel nonBlockingChannel = this.nonBlockingChannels.get(to);
        if (nonBlockingChannel == null) {
            nonBlockingChannel = new NonBlockingChannel(this.bootstrap, this.eventLoopGroup.next(), (SocketAddress)to, this.log);
            nonBlockingChannel.start();
            NonBlockingChannel existingNonBlockingChannel = this.nonBlockingChannels.putIfAbsent(to, nonBlockingChannel);
            if (existingNonBlockingChannel != null) {
                nonBlockingChannel.dispose();
                nonBlockingChannel = existingNonBlockingChannel;
            } else {
                this.log.info("Creating channel to: [%s] ", new Object[]{to});
            }
        }
        monitor.register(to);
        return nonBlockingChannel;
    }

    public synchronized void start() {
        this.serviceLock.writeLock().lock();
        try {
            this.eventLoopGroup = new NioEventLoopGroup(0, (ThreadFactory)new NamedThreadFactory("sender-service"));
            this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)this.eventLoopGroup)).channel(NioSocketChannel.class)).handler(this.channelInitializer);
            this.senderServiceRunning = true;
        }
        finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public synchronized void stop() {
        this.serviceLock.writeLock().lock();
        try {
            this.senderServiceRunning = false;
            if (this.jobHandle != null) {
                this.jobHandle.cancel(true);
                this.jobHandle = null;
            }
            Iterator<NonBlockingChannel> itr = this.nonBlockingChannels.values().iterator();
            while (itr.hasNext()) {
                NonBlockingChannel timestampedChannel = itr.next();
                timestampedChannel.dispose();
                itr.remove();
            }
            try {
                this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
            }
            catch (InterruptedException e) {
                this.log.warn("Interrupted while stopping sender service.");
            }
        }
        finally {
            this.serviceLock.writeLock().unlock();
        }
    }
}

