package org.reveno.atp.clustering.core.components;

import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.reveno.atp.clustering.api.ClusterView;
import org.reveno.atp.clustering.api.SyncMode;
import org.reveno.atp.clustering.core.RevenoClusterConfiguration;
import org.reveno.atp.clustering.core.api.StorageTransferServer;
import org.reveno.atp.clustering.util.Utils;
import org.reveno.atp.commons.NamedThreadFactory;
import org.reveno.atp.core.api.storage.JournalsStorage;
import org.reveno.atp.core.api.storage.SnapshotStorage;
import org.reveno.atp.core.storage.FileSystemStorage;
import org.reveno.atp.utils.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/reveno/atp/clustering/core/components/FileStorageTransferServer.class */
public class FileStorageTransferServer implements StorageTransferServer {
    protected RevenoClusterConfiguration config;
    protected FileSystemStorage storage;
    protected ExecutorService executor;
    protected ServerSocketChannel listener;
    protected static final Logger LOG = LoggerFactory.getLogger(FileStorageTransferServer.class);
    protected Long2ObjectMap<JournalsStorage.JournalStore[]> journals = new Long2ObjectOpenHashMap();
    protected Long2ObjectMap<SnapshotStorage.SnapshotStore> snapshots = new Long2ObjectOpenHashMap();
    protected ExecutorService mainListener = Executors.newSingleThreadExecutor(new NamedThreadFactory("stf-main"));

    @Override // org.reveno.atp.clustering.core.api.StorageTransferServer
    public void startup() {
        this.mainListener.execute(() -> {
            try {
                InetSocketAddress inetSocketAddress = new InetSocketAddress(this.config.revenoDataSync().port());
                try {
                    this.listener = listen(inetSocketAddress);
                    LOG.debug("FSTF: transfer Server is started on {}", inetSocketAddress);
                    while (!Thread.interrupted()) {
                        SocketChannel accept = accept(this.listener);
                        this.executor.execute(() -> {
                            sendStoragesToNode(accept);
                        });
                    }
                    LOG.debug("FSTF: transfer Server stopped on {}", inetSocketAddress);
                } catch (IOException e) {
                    LOG.error("FSTF: failed to open server socket.", e);
                }
            } catch (Throwable th) {
                LOG.error("FSTF: file server executor error.", th);
            }
        });
    }

    @Override // org.reveno.atp.clustering.core.api.StorageTransferServer
    public void shutdown() {
        try {
            if (this.listener != null) {
                this.listener.close();
            }
        } catch (IOException e) {
            LOG.error("FSTF: can't close file storage transfer server.", e);
        }
        this.mainListener.shutdown();
        this.executor.shutdown();
    }

    @Override // org.reveno.atp.clustering.core.api.StorageTransferServer
    public void fixJournals(ClusterView clusterView) {
        this.journals.put(clusterView.viewId(), this.storage.getAllStores());
        this.snapshots.put(clusterView.viewId(), this.storage.getLastSnapshotStore());
    }

    protected void sendStoragesToNode(SocketChannel socketChannel) {
        try {
            try {
                ByteBuffer allocate = ByteBuffer.allocate(17);
                if (waitForData(socketChannel, allocate)) {
                    allocate.rewind();
                    long j = allocate.getLong();
                    if (this.config.revenoDataSync().mode() == SyncMode.SNAPSHOT) {
                        transfer(socketChannel, ((SnapshotStorage.SnapshotStore) this.snapshots.get(j)).getSnapshotPath());
                    } else {
                        byte b = allocate.get();
                        if (b != 1 && b != 2) {
                            throw new IllegalArgumentException(String.format("Unknown transfer type %s", Byte.valueOf(b)));
                        }
                        select(allocate.getLong(), j).stream().map(journalStore -> {
                            return b == 1 ? journalStore.getTransactionCommitsAddress() : journalStore.getEventsCommitsAddress();
                        }).forEach(str -> {
                            transfer(socketChannel, str);
                        });
                    }
                } else {
                    LOG.error("Can't receive data from {}", socketChannel.getRemoteAddress());
                }
                LOG.info("FSTF: Closing transfer server connection.");
                close(socketChannel);
            } catch (IOException e) {
                LOG.error("FSTF: Failed to accept incoming connection", e);
                LOG.info("FSTF: Closing transfer server connection.");
                close(socketChannel);
            }
        } catch (Throwable th) {
            LOG.info("FSTF: Closing transfer server connection.");
            close(socketChannel);
            throw th;
        }
    }

    protected void transfer(SocketChannel socketChannel, String str) {
        try {
            File file = new File(this.storage.getBaseDir(), str);
            LOG.debug("FSTF: transfering {} to {}", file, socketChannel.getRemoteAddress());
            FileChannel channel = new FileInputStream(file).getChannel();
            channel.transferTo(0L, channel.size(), socketChannel);
            channel.close();
        } catch (Exception e) {
            throw Exceptions.runtime(e);
        }
    }

    protected boolean waitForData(SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException {
        short[] sArr = {0};
        return Utils.waitFor(() -> {
            int readSilent = readSilent(socketChannel, byteBuffer);
            if (readSilent != -1) {
                sArr[0] = (short) (sArr[0] + readSilent);
            }
            LOG.debug("FSTF: received next {} bytes from {}", Integer.valueOf(readSilent), socketChannel);
            return Boolean.valueOf(sArr[0] == 17);
        }, this.config.revenoElectionTimeouts().syncTimeoutNanos());
    }

    protected int readSilent(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        try {
            return socketChannel.read(byteBuffer);
        } catch (IOException e) {
            throw Exceptions.runtime(e);
        }
    }

    protected SocketChannel accept(ServerSocketChannel serverSocketChannel) {
        try {
            SocketChannel accept = serverSocketChannel.accept();
            accept.configureBlocking(true);
            LOG.info("FSTF: Accepted new connection: {}", accept);
            return accept;
        } catch (Exception e) {
            throw Exceptions.runtime(e);
        }
    }

    protected void close(SocketChannel socketChannel) {
        try {
            socketChannel.close();
        } catch (Exception e) {
            throw Exceptions.runtime(e);
        }
    }

    protected ServerSocketChannel listen(InetSocketAddress inetSocketAddress) throws IOException {
        ServerSocketChannel open = ServerSocketChannel.open();
        ServerSocket socket = open.socket();
        socket.setReuseAddress(true);
        socket.bind(inetSocketAddress);
        return open;
    }

    protected Set<JournalsStorage.JournalStore> select(long j, long j2) {
        JournalsStorage.JournalStore[] journalStoreArr = (JournalsStorage.JournalStore[]) this.journals.get(j2);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (int i = 0; i < journalStoreArr.length; i++) {
            if (journalStoreArr[i].getLastTransactionId() > j) {
                if (i - 1 > 0) {
                    linkedHashSet.add(journalStoreArr[i - 1]);
                    linkedHashSet.add(journalStoreArr[i]);
                } else {
                    linkedHashSet.add(journalStoreArr[i]);
                }
            } else if (journalStoreArr[i].getLastTransactionId() == j) {
                linkedHashSet.add(journalStoreArr[i]);
            }
        }
        return linkedHashSet;
    }

    public FileStorageTransferServer(RevenoClusterConfiguration revenoClusterConfiguration, FileSystemStorage fileSystemStorage) {
        this.config = revenoClusterConfiguration;
        this.storage = fileSystemStorage;
        this.executor = Executors.newFixedThreadPool(revenoClusterConfiguration.revenoDataSync().threadPoolSize(), new NamedThreadFactory("stf"));
    }
}
