package org.opendaylight.mdsal.replicate.netty;

import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opendaylight/mdsal/replicate/netty/SinkSingletonService.class */
final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
    private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
    private static final ServiceGroupIdentifier SGID = ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
    private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.of());
    private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
    private static final ByteBuf TREE_REQUEST;
    private final BootstrapSupport bootstrapSupport;
    private final DOMDataBroker dataBroker;
    private final InetSocketAddress sourceAddress;
    private final Duration reconnectDelay;
    private final int maxMissedKeepalives;
    private final Duration keepaliveInterval;
    private ChannelFuture futureChannel;
    private boolean closingInstance;
    private Bootstrap bs;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SinkSingletonService(BootstrapSupport bootstrapSupport, DOMDataBroker dOMDataBroker, InetSocketAddress inetSocketAddress, Duration duration, Duration duration2, int i) {
        this.bootstrapSupport = (BootstrapSupport) Objects.requireNonNull(bootstrapSupport);
        this.dataBroker = (DOMDataBroker) Objects.requireNonNull(dOMDataBroker);
        this.sourceAddress = (InetSocketAddress) Objects.requireNonNull(inetSocketAddress);
        this.reconnectDelay = (Duration) Objects.requireNonNull(duration);
        this.keepaliveInterval = (Duration) Objects.requireNonNull(duration2);
        this.maxMissedKeepalives = i;
        LOG.info("Replication sink from {} waiting for cluster-wide mastership", inetSocketAddress);
    }

    /* renamed from: getIdentifier, reason: merged with bridge method [inline-methods] */
    public ServiceGroupIdentifier m9getIdentifier() {
        return SGID;
    }

    public synchronized void instantiateServiceInstance() {
        LOG.info("Replication sink started with source {}", this.sourceAddress);
        this.bs = this.bootstrapSupport.newBootstrap();
        doConnect();
    }

    @Holding({"this"})
    private void doConnect() {
        LOG.info("Connecting to Source");
        EventLoopGroup group = this.bs.config().group();
        this.futureChannel = this.bs.option(ChannelOption.SO_KEEPALIVE, true).handler(this).connect(this.sourceAddress, (SocketAddress) null);
        this.futureChannel.addListener(channelFuture -> {
            channelResolved(channelFuture, group);
        });
    }

    public synchronized ListenableFuture<?> closeServiceInstance() {
        this.closingInstance = true;
        return this.futureChannel == null ? FluentFutures.immediateNullFluentFuture() : FluentFutures.immediateBooleanFluentFuture(disconnect());
    }

    private synchronized void reconnect() {
        disconnect();
        doConnect();
    }

    private synchronized boolean disconnect() {
        boolean z = true;
        Channel channel = this.futureChannel.channel();
        if (channel != null && channel.isActive()) {
            try {
                channel.close().await(CHANNEL_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.error("The channel didn't close properly within {} seconds", Long.valueOf(CHANNEL_CLOSE_TIMEOUT_S));
                z = false;
            }
        }
        boolean cancel = z & this.futureChannel.cancel(true);
        this.futureChannel = null;
        return cancel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initChannel(SocketChannel socketChannel) {
        socketChannel.pipeline().addLast("frameDecoder", new MessageFrameDecoder()).addLast("idleStateHandler", new IdleStateHandler(this.keepaliveInterval.toNanos() * this.maxMissedKeepalives, 0L, 0L, TimeUnit.NANOSECONDS)).addLast("keepaliveHandler", new SinkKeepaliveHandler()).addLast("requestHandler", new SinkRequestHandler(TREE, this.dataBroker.createMergingTransactionChain(new SinkTransactionChainListener(socketChannel)))).addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
    }

    private synchronized void channelResolved(ChannelFuture channelFuture, ScheduledExecutorService scheduledExecutorService) {
        if (this.futureChannel == null || this.futureChannel.channel() != channelFuture.channel()) {
            return;
        }
        if (!channelFuture.isSuccess()) {
            LOG.info("Failed to connect to source {}, reconnecting in {}", new Object[]{this.sourceAddress, Long.valueOf(this.reconnectDelay.getSeconds()), channelFuture.cause()});
            scheduledExecutorService.schedule(() -> {
                reconnect();
            }, this.reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
        } else {
            Channel channel = channelFuture.channel();
            LOG.info("Channel {} established", channel);
            channel.closeFuture().addListener(channelFuture2 -> {
                channelClosed(channelFuture2, scheduledExecutorService);
            });
            channel.writeAndFlush(TREE_REQUEST);
        }
    }

    private synchronized void channelClosed(ChannelFuture channelFuture, ScheduledExecutorService scheduledExecutorService) {
        if (this.futureChannel == null || this.futureChannel.channel() != channelFuture.channel() || this.closingInstance) {
            return;
        }
        LOG.info("Channel {} lost connection to source {}, reconnecting in {}", new Object[]{channelFuture.channel(), this.sourceAddress, Long.valueOf(this.reconnectDelay.getSeconds())});
        scheduledExecutorService.schedule(this::reconnect, this.reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
    }

    private static ByteBuf requestTree(DOMDataTreeIdentifier dOMDataTreeIdentifier) throws IOException {
        ByteBuf buffer = Unpooled.buffer();
        ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(buffer);
        try {
            byteBufOutputStream.writeByte(1);
            NormalizedNodeDataOutput newDataOutput = NormalizedNodeStreamVersion.current().newDataOutput(byteBufOutputStream);
            try {
                dOMDataTreeIdentifier.getDatastoreType().writeTo(newDataOutput);
                newDataOutput.writeYangInstanceIdentifier(dOMDataTreeIdentifier.getRootIdentifier());
                if (newDataOutput != null) {
                    newDataOutput.close();
                }
                byteBufOutputStream.close();
                return buffer;
            } finally {
            }
        } catch (Throwable th) {
            try {
                byteBufOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    static {
        try {
            TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE));
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
