package org.reveno.atp.clustering.core.components;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import org.reveno.atp.clustering.api.ClusterView;
import org.reveno.atp.clustering.api.InetAddress;
import org.reveno.atp.clustering.api.SyncMode;
import org.reveno.atp.clustering.core.RevenoClusterConfiguration;
import org.reveno.atp.clustering.core.api.ClusterExecutor;
import org.reveno.atp.clustering.core.messages.NodeState;
import org.reveno.atp.core.api.channel.Channel;
import org.reveno.atp.core.api.storage.JournalsStorage;
import org.reveno.atp.core.api.storage.SnapshotStorage;
import org.reveno.atp.utils.Exceptions;
import org.reveno.atp.utils.MeasureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/reveno/atp/clustering/core/components/StorageTransferModelSync.class */
public class StorageTransferModelSync implements ClusterExecutor<Boolean, TransferContext> {
    protected RevenoClusterConfiguration config;
    protected JournalsStorage storage;
    protected SnapshotStorage snapshots;
    protected static final Logger LOG = LoggerFactory.getLogger(StorageTransferModelSync.class);
    public static final byte TRANSACTIONS = 1;
    public static final byte EVENTS = 2;

    /* loaded from: input_file:org/reveno/atp/clustering/core/components/StorageTransferModelSync$TransferContext.class */
    public static class TransferContext {
        public final long transactionId;
        public final NodeState latestNode;

        public TransferContext(long j, NodeState nodeState) {
            this.transactionId = j;
            this.latestNode = nodeState;
        }
    }

    @Override // org.reveno.atp.clustering.core.api.ClusterExecutor
    public Boolean execute(ClusterView clusterView, TransferContext transferContext) {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(((InetAddress) transferContext.latestNode.address()).getHost(), transferContext.latestNode.syncPort);
        if (transferContext.latestNode.syncMode == SyncMode.JOURNALS.getType()) {
            JournalsStorage.JournalStore nextTempStore = this.storage.nextTempStore();
            JournalsStorage.JournalStore nextStore = this.storage.nextStore(transferContext.latestNode.transactionId);
            if (receiveStore(clusterView, transferContext, inetSocketAddress, (byte) 1, this.storage.channel(nextTempStore.getTransactionCommitsAddress())) && receiveStore(clusterView, transferContext, inetSocketAddress, (byte) 2, this.storage.channel(nextTempStore.getEventsCommitsAddress()))) {
                this.storage.mergeStores(new JournalsStorage.JournalStore[]{nextTempStore}, nextStore);
                return true;
            }
            this.storage.deleteStore(nextTempStore);
            this.storage.deleteStore(nextStore);
            return false;
        }
        if (transferContext.latestNode.syncMode == SyncMode.SNAPSHOT.getType()) {
            SnapshotStorage.SnapshotStore nextTempSnapshotStore = this.snapshots.nextTempSnapshotStore();
            SnapshotStorage.SnapshotStore nextSnapshotStore = this.snapshots.nextSnapshotStore();
            if (receiveStore(clusterView, transferContext, inetSocketAddress, (byte) 0, this.snapshots.snapshotChannel(nextTempSnapshotStore.getSnapshotPath()))) {
                this.snapshots.move(nextTempSnapshotStore, nextSnapshotStore);
                return true;
            }
            this.snapshots.removeSnapshotStore(nextTempSnapshotStore);
            this.snapshots.removeSnapshotStore(nextSnapshotStore);
        }
        throw new IllegalArgumentException("Unknown transfer mode.");
    }

    protected boolean receiveStore(ClusterView clusterView, TransferContext transferContext, SocketAddress socketAddress, byte b, Channel channel) {
        try {
            try {
                SocketChannel open = SocketChannel.open();
                if (!open.connect(socketAddress)) {
                    LOG.error("STF SYNC: can't establish connection to {}", socketAddress);
                    LOG.debug("STF SYNC: received latest store from StoreServer.");
                    channel.close();
                    return false;
                }
                open.configureBlocking(true);
                ByteBuffer allocate = ByteBuffer.allocate(17);
                allocate.putLong(clusterView.viewId());
                allocate.put(b);
                allocate.putLong(transferContext.transactionId);
                allocate.flip();
                LOG.debug("STF SYNC: sent {} to StorageTransfer server {}", Integer.valueOf(open.write(allocate)), open);
                ByteBuffer allocate2 = ByteBuffer.allocate(MeasureUtils.kb(64));
                int i = 0;
                while (i != -1) {
                    try {
                        i = open.read(allocate2);
                        allocate2.flip();
                        LOG.debug("STF SYNC: received next {} bytes from {}", Integer.valueOf(allocate2.limit()), socketAddress);
                        channel.write(buffer -> {
                            buffer.writeFromBuffer(allocate2);
                        }, true);
                        allocate2.clear();
                    } catch (IOException e) {
                        LOG.error(e.getMessage(), e);
                        throw Exceptions.runtime(e);
                    }
                }
                LOG.debug("STF SYNC: received latest store from StoreServer.");
                channel.close();
                return true;
            } catch (Throwable th) {
                LOG.error("STF SYNC: Can't sync with remote node " + socketAddress, th);
                LOG.debug("STF SYNC: received latest store from StoreServer.");
                channel.close();
                return false;
            }
        } catch (Throwable th2) {
            LOG.debug("STF SYNC: received latest store from StoreServer.");
            channel.close();
            throw th2;
        }
    }

    public StorageTransferModelSync(RevenoClusterConfiguration revenoClusterConfiguration, JournalsStorage journalsStorage, SnapshotStorage snapshotStorage) {
        this.config = revenoClusterConfiguration;
        this.storage = journalsStorage;
        this.snapshots = snapshotStorage;
    }
}
