/*
 * Decompiled with CFR 0.152.
 */
package org.tools4j.elara.plugin.replication;

import java.nio.ByteBuffer;
import java.util.Objects;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.tools4j.elara.flyweight.FlyweightEvent;
import org.tools4j.elara.log.MessageLog;
import org.tools4j.elara.logging.ElaraLogger;
import org.tools4j.elara.logging.Logger;
import org.tools4j.elara.plugin.base.BaseState;
import org.tools4j.elara.plugin.replication.Configuration;
import org.tools4j.elara.plugin.replication.Connection;
import org.tools4j.elara.plugin.replication.ReplicationMessages;
import org.tools4j.elara.plugin.replication.ReplicationState;

final class ConnectionHandler
implements Connection.Handler {
    public static final long RESPONSE_DELAY_NANOS = 60L;
    public static final long RESEND_DELAY_NANOS = 10000L;
    private final ElaraLogger logger;
    private final int serverId;
    private final BaseState baseState;
    private final ReplicationState.Volatile state;
    private final MessageLog.Appender eventLogAppender;
    private final Connection.Publisher responseSender;
    private final FlyweightEvent flyweightEvent = new FlyweightEvent();
    private final UnsafeBuffer bufferView = new UnsafeBuffer(0L, 0);
    private final MutableDirectBuffer sendBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(32));

    ConnectionHandler(Logger.Factory loggerFactory, Configuration configuration, BaseState baseState, ReplicationState.Volatile state, MessageLog.Appender eventLogAppender, Connection.Publisher responseSender) {
        this.logger = ElaraLogger.create(loggerFactory, this.getClass());
        this.serverId = configuration.serverId();
        this.baseState = Objects.requireNonNull(baseState);
        this.state = Objects.requireNonNull(state);
        this.eventLogAppender = Objects.requireNonNull(eventLogAppender);
        this.responseSender = Objects.requireNonNull(responseSender);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(int senderServerId, DirectBuffer buffer, int offset, int length) {
        this.bufferView.wrap(buffer, offset, length);
        try {
            byte type = ReplicationMessages.type((DirectBuffer)this.bufferView);
            byte version = ReplicationMessages.version((DirectBuffer)this.bufferView);
            if (version != 1) {
                this.logger.warn("Server {}: Ignoring message of type {} from {}: version {} found but expected {}").replace(this.serverId).replace(type).replace(senderServerId).replace(version).replace(1L).format();
                return;
            }
            if (this.isLeader()) {
                if (type != -96) {
                    this.logger.warn("Server {}: Ignoring message of type {} from sender {} in leader mode").replace(this.serverId).replace(type).replace(senderServerId).format();
                    return;
                }
                if (senderServerId == this.state.leaderId()) {
                    this.logger.warn("Server {}: Ignoring message of type {} in leader mode: response from sender {} is from myself?!").replace(this.serverId).replace(type).replace(senderServerId).format();
                    return;
                }
                this.handleAppendResponse(senderServerId, (DirectBuffer)this.bufferView);
            } else {
                if (type != -95) {
                    this.logger.warn("Server {}: Ignoring message of type {} from sender {} in follower mode").replace(this.serverId).replace(type).replace(senderServerId).format();
                    return;
                }
                this.handleAppendRequest(senderServerId, (DirectBuffer)this.bufferView);
            }
        }
        finally {
            this.bufferView.wrap(0L, 0);
        }
    }

    private void handleAppendRequest(int senderServerId, DirectBuffer buffer) {
        long nextSendingTime;
        boolean success;
        long nextEventLogIndex;
        int currentTerm;
        int senderTerm = ReplicationMessages.term(buffer);
        if (senderTerm < (currentTerm = this.state.currentTerm())) {
            this.logger.warn("Server {}: Ignoring append-request message in follower mode: term {} of sender {} is lower than current term {}").replace(this.serverId).replace(senderTerm).replace(senderServerId).replace(currentTerm).format();
            return;
        }
        int leaderId = this.state.leaderId();
        if (senderTerm == currentTerm && senderServerId != this.state.leaderId()) {
            this.logger.warn("Server {}: Ignoring append-request message in follower mode: leader is {} in term {} but message received from sender {}").replace(this.serverId).replace(leaderId).replace(currentTerm).replace(senderServerId).format();
            return;
        }
        long logIndex = ReplicationMessages.logIndex(buffer);
        if (logIndex == (nextEventLogIndex = this.state.eventLogSize())) {
            int payloadSize = ReplicationMessages.payloadSize(buffer);
            if (payloadSize < 32) {
                this.logger.warn("Server {}: Ignoring append-request message in follower mode: payload size {} is smaller than frame header length {}").replace(this.serverId).replace(payloadSize).replace(32L).format();
                return;
            }
            this.flyweightEvent.init(buffer, 32);
            if (this.baseState.eventApplied(this.flyweightEvent.id())) {
                this.logger.warn("Server {}: Ignoring append-request message in follower mode: event {}:{}.{} has already been applied").replace(this.serverId).replace(this.flyweightEvent.source()).replace(this.flyweightEvent.sequence()).replace(this.flyweightEvent.index()).format();
                return;
            }
            this.eventLogAppender.append(buffer, 32, payloadSize);
            if (this.logger.isEnabled(Logger.Level.DEBUG)) {
                this.logger.debug("Server {}: Processed append-request message {} in follower mode").replace(this.serverId).replace(nextEventLogIndex).format();
            }
            ++nextEventLogIndex;
        }
        boolean bl = success = logIndex <= nextEventLogIndex;
        if (!success && this.logger.isEnabled(Logger.Level.DEBUG)) {
            this.logger.debug("Server {}: Ignoring append-request message in follower mode: expected event log index {} but received {}").replace(this.serverId).replace(nextEventLogIndex).replace(logIndex).format();
        }
        long l = nextSendingTime = success ? 0L : this.state.nextNotBefore(senderServerId);
        if (nextSendingTime == 0L || System.nanoTime() - nextSendingTime >= 0L) {
            boolean sent = this.sendAppendResponse(senderServerId, nextEventLogIndex, success);
            if (success) {
                this.state.nextNotBefore(senderServerId, 0L);
            } else if (sent) {
                this.state.nextNotBefore(senderServerId, System.nanoTime() + 60L);
            }
        }
    }

    private boolean sendAppendResponse(int targetServerId, long nextEventLogIndex, boolean success) {
        int length = ReplicationMessages.appendResponse(this.sendBuffer, 0, this.state.currentTerm(), this.state.leaderId(), nextEventLogIndex, success);
        if (!this.responseSender.publish(targetServerId, (DirectBuffer)this.sendBuffer, 0, length)) {
            this.logger.warn("Server {}: Sending append response to {} for next event log index {} failed").replace(this.serverId).replace(targetServerId).replace(nextEventLogIndex).format();
            return false;
        }
        return true;
    }

    private void handleAppendResponse(int senderServerId, DirectBuffer buffer) {
        boolean appendSuccessful = ReplicationMessages.isAppendSuccess(buffer);
        long nextEventLogIndex = ReplicationMessages.logIndex(buffer);
        if (appendSuccessful && nextEventLogIndex > 0L) {
            this.state.confirmedEventLogIndex(senderServerId, nextEventLogIndex - 1L);
        }
        if (!(appendSuccessful && nextEventLogIndex <= this.state.nextEventLogIndex(senderServerId) || this.state.nextEventLogIndex(senderServerId) == nextEventLogIndex)) {
            this.state.nextEventLogIndex(senderServerId, nextEventLogIndex);
            if (this.logger.isEnabled(Logger.Level.DEBUG)) {
                this.logger.debug("Server {}: Reset next event log index to to {} for server {}").replace(this.serverId).replace(nextEventLogIndex).replace(senderServerId).format();
            }
        }
    }

    private boolean isLeader() {
        return this.serverId == this.state.leaderId();
    }
}

