/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.neo4j.logging.Log;

class NonBlockingChannel {
    private static final int CONNECT_BACKOFF_MS = 250;
    private final Log log;
    private final Bootstrap bootstrap;
    private final EventLoop eventLoop;
    private final org.neo4j.helpers.SocketAddress destination;
    private volatile Channel channel;
    private volatile ChannelFuture fChannel;
    private volatile boolean disposed;

    NonBlockingChannel(Bootstrap bootstrap, EventLoop eventLoop, org.neo4j.helpers.SocketAddress destination, Log log) {
        this.bootstrap = bootstrap;
        this.eventLoop = eventLoop;
        this.destination = destination;
        this.log = log;
    }

    void start() {
        this.tryConnect();
    }

    private synchronized void tryConnect() {
        if (this.disposed) {
            return;
        }
        if (this.fChannel != null && !this.fChannel.isDone()) {
            return;
        }
        this.fChannel = this.bootstrap.connect((SocketAddress)this.destination.socketAddress());
        this.channel = this.fChannel.channel();
        this.fChannel.addListener(f -> {
            if (!f.isSuccess()) {
                f.channel().eventLoop().schedule(this::tryConnect, 250L, TimeUnit.MILLISECONDS);
            } else {
                this.log.info("Connected: " + f.channel());
                f.channel().closeFuture().addListener(closed -> {
                    this.log.warn(String.format("Lost connection to: %s (%s)", this.destination, this.channel.remoteAddress()));
                    f.channel().eventLoop().schedule(this::tryConnect, 250L, TimeUnit.MILLISECONDS);
                });
            }
        });
    }

    public synchronized void dispose() {
        this.disposed = true;
        this.channel.close();
    }

    public Future<Void> send(Object msg) {
        if (this.disposed) {
            throw new IllegalStateException("sending on disposed channel");
        }
        if (this.channel.isActive()) {
            return this.channel.writeAndFlush(msg);
        }
        Promise promise = this.eventLoop.newPromise();
        this.deferredWrite(msg, this.fChannel, promise, true);
        return promise;
    }

    private void deferredWrite(Object msg, ChannelFuture channelFuture, Promise<?> promise, boolean firstAttempt) {
        channelFuture.addListener((GenericFutureListener)((ChannelFutureListener)f -> {
            if (f.isSuccess()) {
                f.channel().writeAndFlush(msg).addListener(x -> promise.setSuccess(null));
            } else if (firstAttempt) {
                this.tryConnect();
                this.deferredWrite(msg, this.fChannel, promise, false);
            } else {
                promise.setFailure(f.cause());
            }
        }));
    }
}

