package org.teamapps.cluster.service;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ServerSocket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.cluster.crypto.AesCipher;
import org.teamapps.cluster.model.cluster.ClusterFileTransfer;
import org.teamapps.cluster.model.cluster.ClusterFileTransferResponse;
import org.teamapps.cluster.model.cluster.ClusterNodeData;
import org.teamapps.cluster.model.cluster.ClusterNodeInfo;
import org.teamapps.cluster.model.cluster.ClusterSchemaRegistry;
import org.teamapps.cluster.model.cluster.ClusterTopicInfo;
import org.teamapps.cluster.model.cluster.ClusterTopicMessage;
import org.teamapps.cluster.model.cluster.KeepAliveMessage;
import org.teamapps.cluster.model.cluster.ServiceClusterRequest;
import org.teamapps.cluster.model.cluster.ServiceClusterResponse;
import org.teamapps.cluster.network.ClusterNodeMessageHandler;
import org.teamapps.cluster.network.LocalClusterNode;
import org.teamapps.cluster.network.NodeAddress;
import org.teamapps.cluster.network.RemoteClusterNode;
import org.teamapps.cluster.utils.ExceptionUtil;
import org.teamapps.protocol.file.FileProvider;
import org.teamapps.protocol.message.Message;
import org.teamapps.protocol.message.MessageDecoder;
import org.teamapps.protocol.service.AbstractClusterService;
import org.teamapps.protocol.service.ServiceRegistry;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/teamapps/cluster/service/TeamAppsCluster.class */
public class TeamAppsCluster extends Thread implements ClusterNodeMessageHandler, FileProvider, ServiceRegistry {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String clusterSecret;
    private final LocalClusterNode localNode;
    private final AesCipher aesCipher;
    private int retryMaxAttempts;
    private Duration retryBackoffDuration;
    private Consumer<NodeAddress> onNodeConnectionListener;
    private final ExecutorService executor;
    private volatile boolean running;
    private final Map<String, RemoteClusterNode> remoteNodes;
    private final Map<String, AbstractClusterService> localServices;
    private Map<String, List<RemoteClusterNode>> clusterServices;
    private final Map<Long, CompletableFuture<ServiceClusterResponse>> serviceResponseFutureMap;
    private final Map<String, CompletableFuture<ClusterFileTransferResponse>> fileTransferFutureMap;
    private final AtomicLong requestIdGenerator;
    private final Map<String, ClusterTopic> clusterTopics;
    private final File fileTransferPath;
    private final Map<String, File> fileTransferMap;

    public TeamAppsCluster(String str, int i, NodeAddress... nodeAddressArr) {
        this(str, null, null, i, null, nodeAddressArr);
    }

    public TeamAppsCluster(String str, String str2, Consumer<NodeAddress> consumer, int i, NodeAddress... nodeAddressArr) {
        this(str, str2, consumer, i, null, nodeAddressArr);
    }

    public TeamAppsCluster(String str, String str2, Consumer<NodeAddress> consumer, int i, File file, NodeAddress... nodeAddressArr) {
        super("cluster-server-socket");
        this.retryMaxAttempts = 3;
        this.retryBackoffDuration = Duration.ofSeconds(3L);
        this.executor = Executors.newFixedThreadPool(16);
        this.running = true;
        this.remoteNodes = new ConcurrentHashMap();
        this.localServices = new ConcurrentHashMap();
        this.clusterServices = new HashMap();
        this.serviceResponseFutureMap = new ConcurrentHashMap();
        this.fileTransferFutureMap = new ConcurrentHashMap();
        this.requestIdGenerator = new AtomicLong();
        this.clusterTopics = new ConcurrentHashMap();
        this.fileTransferMap = Collections.synchronizedMap(new LinkedHashMap<String, File>() { // from class: org.teamapps.cluster.service.TeamAppsCluster.1
            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<String, File> entry) {
                boolean z = (((System.currentTimeMillis() - entry.getValue().lastModified()) > 86400000L ? 1 : ((System.currentTimeMillis() - entry.getValue().lastModified()) == 86400000L ? 0 : -1)) > 0) || size() > 100000;
                if (z) {
                    entry.getValue().delete();
                }
                return z;
            }
        });
        this.clusterSecret = str;
        this.aesCipher = new AesCipher(str);
        this.localNode = new LocalClusterNode(str2, i);
        this.fileTransferPath = file != null ? file : Utils.createTempDir();
        this.onNodeConnectionListener = consumer;
        start();
        connectNodes(nodeAddressArr);
    }

    public ClusterTopic createTopic(String str, Consumer<ClusterTopicMessage> consumer) {
        ClusterTopic computeIfAbsent = this.clusterTopics.computeIfAbsent(str, str2 -> {
            return new ClusterTopic(str, this.aesCipher, this.localNode.getNodeId());
        });
        computeIfAbsent.setMessageConsumer(consumer);
        sendClusterInfo();
        return computeIfAbsent;
    }

    public void shutDown() {
        Iterator<RemoteClusterNode> it = this.remoteNodes.values().iterator();
        while (it.hasNext()) {
            it.next().shutDown();
        }
        this.executor.shutdownNow();
    }

    private void connectNodes(NodeAddress[] nodeAddressArr) {
        if (nodeAddressArr == null) {
            return;
        }
        for (NodeAddress nodeAddress : nodeAddressArr) {
            new RemoteClusterNode(this, nodeAddress);
        }
    }

    public void registerService(AbstractClusterService abstractClusterService) {
        this.localServices.put(abstractClusterService.getServiceName(), abstractClusterService);
        sendNodeUpdate((List) this.remoteNodes.values().stream().filter((v0) -> {
            return v0.isConnected();
        }).collect(Collectors.toList()));
    }

    public boolean isServiceAvailable(String str) {
        return !this.clusterServices.getOrDefault(str, Collections.emptyList()).isEmpty();
    }

    public RemoteClusterNode getRandomServiceProvider(String str) {
        return (RemoteClusterNode) Utils.randomListEntry((List) this.clusterServices.getOrDefault(str, Collections.emptyList()).stream().filter((v0) -> {
            return v0.isConnected();
        }).collect(Collectors.toList()));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            ServerSocket serverSocket = new ServerSocket(this.localNode.getPort());
            while (this.running) {
                try {
                    new RemoteClusterNode(this, serverSocket.accept());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e2) {
            e2.printStackTrace();
        }
    }

    private void sendMessage(Message message, RemoteClusterNode remoteClusterNode) throws Exception {
        LOGGER.info("Send message:" + message);
        remoteClusterNode.sendMessage(this.aesCipher.encrypt(message.toBytes()));
    }

    private synchronized void handleClusterNodeInfo(ClusterNodeInfo clusterNodeInfo, RemoteClusterNode remoteClusterNode) throws Exception {
        RemoteClusterNode remoteClusterNode2;
        remoteClusterNode.setClusterNodeData(clusterNodeInfo.getLocalNode());
        RemoteClusterNode remoteClusterNode3 = this.remoteNodes.get(remoteClusterNode.getNodeId());
        if (remoteClusterNode3 != null && remoteClusterNode3 != remoteClusterNode) {
            remoteClusterNode3.merge(remoteClusterNode);
            remoteClusterNode = remoteClusterNode3;
        }
        for (ClusterNodeData clusterNodeData : clusterNodeInfo.getKnownRemoteNodes()) {
            if (!this.remoteNodes.containsKey(clusterNodeData.getNodeId()) && !clusterNodeData.getNodeId().equals(this.localNode.getNodeId())) {
                new RemoteClusterNode(this, new NodeAddress(clusterNodeData.getHost(), clusterNodeData.getPort()));
            }
        }
        for (ClusterTopicInfo clusterTopicInfo : clusterNodeInfo.getClusterTopics()) {
            ClusterTopic clusterTopic = this.clusterTopics.get(clusterTopicInfo.getTopicName());
            if (clusterTopic == null) {
                clusterTopic = new ClusterTopic(clusterTopicInfo.getTopicName(), this.aesCipher);
            }
            for (String str : clusterTopicInfo.getNodeIdsAsList()) {
                if (!clusterTopic.isRegistered(str)) {
                    clusterTopic.addRegisteredMember(str);
                }
                if (!clusterTopic.isAvailableMember(str) && (remoteClusterNode2 = this.remoteNodes.get(str)) != null) {
                    clusterTopic.addMember(remoteClusterNode2);
                }
            }
        }
        boolean z = !clusterNodeInfo.getResponse();
        if (!this.remoteNodes.containsKey(remoteClusterNode.getNodeId())) {
            remoteClusterNode.setConnected(true);
            z = true;
            LOGGER.info("New cluster node: {}", remoteClusterNode);
            this.remoteNodes.put(remoteClusterNode.getNodeId(), remoteClusterNode);
            for (ClusterTopic clusterTopic2 : this.clusterTopics.values()) {
                if (clusterTopic2.isRegistered(remoteClusterNode.getNodeId()) && !clusterTopic2.isAvailableMember(remoteClusterNode.getNodeId())) {
                    clusterTopic2.addMember(remoteClusterNode);
                }
            }
        } else if (!remoteClusterNode.isConnected()) {
            remoteClusterNode.setConnected(true);
            LOGGER.info("Reconnected cluster node: {}", remoteClusterNode);
        }
        if (z) {
            remoteClusterNode.sendMessage(createInfoMessage(true));
            sendNodeUpdate((List) this.remoteNodes.values().stream().filter((v0) -> {
                return v0.isConnected();
            }).collect(Collectors.toList()));
        }
        recreateClusterServiceMap();
        if (this.onNodeConnectionListener != null) {
            this.onNodeConnectionListener.accept(remoteClusterNode.getNodeAddress());
        }
    }

    private void recreateClusterServiceMap() {
        HashMap hashMap = new HashMap();
        for (RemoteClusterNode remoteClusterNode : this.remoteNodes.values()) {
            for (String str : remoteClusterNode.getServices()) {
                hashMap.putIfAbsent(str, new ArrayList());
                ((List) hashMap.get(str)).add(remoteClusterNode);
            }
        }
        this.clusterServices = hashMap;
    }

    private void sendNodeUpdate(List<RemoteClusterNode> list) {
        byte[] createInfoMessage = createInfoMessage(true);
        Iterator<RemoteClusterNode> it = list.iterator();
        while (it.hasNext()) {
            it.next().sendMessage(createInfoMessage);
        }
    }

    private void sendClusterInfo() {
        byte[] createInfoMessage = createInfoMessage(true);
        Iterator<RemoteClusterNode> it = this.remoteNodes.values().iterator();
        while (it.hasNext()) {
            it.next().sendMessageAsync(createInfoMessage);
        }
    }

    @Override // org.teamapps.cluster.network.ClusterNodeMessageHandler
    public void handleMessage(RemoteClusterNode remoteClusterNode, byte[] bArr) {
        try {
            byte[] decrypt = this.aesCipher.decrypt(bArr);
            int messageFieldId = Message.getMessageFieldId(decrypt);
            LOGGER.debug("Handle message: id: {}, field: {}, size: {}, node: {}", new Object[]{Integer.valueOf(messageFieldId), ClusterSchemaRegistry.SCHEMA.getFieldById(messageFieldId), Integer.valueOf(bArr.length), remoteClusterNode});
            if (messageFieldId == 101028) {
                handleFileTransfer(new ClusterFileTransfer(decrypt, this), remoteClusterNode);
            } else if (messageFieldId == 101010) {
                ClusterTopicMessage clusterTopicMessage = new ClusterTopicMessage(decrypt, this);
                ClusterTopic clusterTopic = this.clusterTopics.get(clusterTopicMessage.getTopic());
                if (clusterTopic != null) {
                    clusterTopic.handleMessage(clusterTopicMessage);
                } else {
                    LOGGER.error("Topic message but no handler: {}", clusterTopicMessage.getTopic());
                }
            } else {
                this.executor.submit(() -> {
                    try {
                        switch (messageFieldId) {
                            case ClusterNodeInfo.ROOT_FIELD_ID /* 101013 */:
                                handleClusterNodeInfo(new ClusterNodeInfo(decrypt, this), remoteClusterNode);
                                break;
                            case ServiceClusterRequest.ROOT_FIELD_ID /* 101018 */:
                                handleServiceClusterRequest(new ServiceClusterRequest(decrypt, this), remoteClusterNode);
                                break;
                            case ServiceClusterResponse.ROOT_FIELD_ID /* 101023 */:
                                handleServiceClusterResponse(new ServiceClusterResponse(decrypt, this), remoteClusterNode);
                                break;
                            case ClusterFileTransferResponse.ROOT_FIELD_ID /* 101035 */:
                                handleFileTransferResponse(new ClusterFileTransferResponse(decrypt, this), remoteClusterNode);
                                break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public <REQUEST extends Message, RESPONSE extends Message> Mono<RESPONSE> createServiceTask(String str, String str2, REQUEST request, MessageDecoder<RESPONSE> messageDecoder) {
        long incrementAndGet = this.requestIdGenerator.incrementAndGet();
        return Mono.create(monoSink -> {
            RemoteClusterNode randomServiceProvider = getRandomServiceProvider(str);
            if (randomServiceProvider == null) {
                LOGGER.warn("No cluster member available for service: {}, method: {}, with request: {}", new Object[]{str, str2, request});
                monoSink.error(new Exception("Error: no cluster member available!"));
                return;
            }
            LOGGER.debug("Create cluster task for member: {}", randomServiceProvider);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            monoSink.onDispose(() -> {
                atomicBoolean.set(true);
            });
            try {
                byte[] bytes = request.toBytes(file -> {
                    return (String) ExceptionUtil.softenExceptions(() -> {
                        return sendFile(file, randomServiceProvider, atomicBoolean);
                    });
                });
                if (atomicBoolean.get()) {
                    return;
                }
                ServiceClusterRequest requestData = new ServiceClusterRequest().setRequestId(incrementAndGet).setServiceName(str).setMethod(str2).setRequestData(bytes);
                CompletableFuture<ServiceClusterResponse> completableFuture = new CompletableFuture<>();
                this.serviceResponseFutureMap.put(Long.valueOf(incrementAndGet), completableFuture);
                sendMessage(requestData, randomServiceProvider);
                monoSink.success(completableFuture);
            } catch (Exception e) {
                monoSink.error(e);
            }
        }).flatMap(Mono::fromFuture).map(serviceClusterResponse -> {
            return messageDecoder.decode(serviceClusterResponse.getResponseData(), this);
        }).subscribeOn(Schedulers.boundedElastic()).retryWhen(Retry.backoff(this.retryMaxAttempts, this.retryBackoffDuration)).timeout(Duration.ofMinutes(5L)).doAfterTerminate(() -> {
            this.serviceResponseFutureMap.remove(Long.valueOf(incrementAndGet));
        });
    }

    private void handleServiceClusterRequest(ServiceClusterRequest serviceClusterRequest, RemoteClusterNode remoteClusterNode) throws Exception {
        LOGGER.debug("Handle cluster request: {}, node: {}", serviceClusterRequest, remoteClusterNode);
        AbstractClusterService abstractClusterService = this.localServices.get(serviceClusterRequest.getServiceName());
        ServiceClusterResponse requestId = new ServiceClusterResponse().setRequestId(serviceClusterRequest.getRequestId());
        if (abstractClusterService != null) {
            requestId.setResponseData(abstractClusterService.handleMessage(serviceClusterRequest.getMethod(), serviceClusterRequest.getRequestData(), this, file -> {
                return (String) ExceptionUtil.softenExceptions(() -> {
                    return sendFile(file, remoteClusterNode, null);
                });
            }));
        } else {
            LOGGER.error("Could not find requested service {}", serviceClusterRequest.getServiceName());
            requestId.setError(true).setErrorMessage("could not find requested service: " + serviceClusterRequest.getServiceName());
        }
        sendMessage(requestId, remoteClusterNode);
    }

    private void handleServiceClusterResponse(ServiceClusterResponse serviceClusterResponse, RemoteClusterNode remoteClusterNode) {
        LOGGER.debug("Handle cluster response: {}, node: {}", serviceClusterResponse, remoteClusterNode);
        CompletableFuture<ServiceClusterResponse> completableFuture = this.serviceResponseFutureMap.get(Long.valueOf(serviceClusterResponse.getRequestId()));
        if (completableFuture != null) {
            completableFuture.complete(serviceClusterResponse);
        }
    }

    @Override // org.teamapps.cluster.network.ClusterNodeMessageHandler
    public byte[] createInitMessage() {
        return createInfoMessage(false);
    }

    @Override // org.teamapps.cluster.network.ClusterNodeMessageHandler
    public byte[] getKeepAliveMessage() {
        try {
            return this.aesCipher.encrypt(new KeepAliveMessage().toBytes());
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    private byte[] createInfoMessage(boolean z) {
        try {
            ClusterNodeInfo clusterNodeInfo = new ClusterNodeInfo();
            clusterNodeInfo.setResponse(z);
            clusterNodeInfo.setLocalNode(new ClusterNodeData().setNodeId(this.localNode.getNodeId()).setAvailableServices((String[]) this.localServices.keySet().toArray(new String[0])));
            for (RemoteClusterNode remoteClusterNode : this.remoteNodes.values()) {
                if (remoteClusterNode.isOutgoing() && remoteClusterNode.isConnected()) {
                    clusterNodeInfo.addKnownRemoteNodes(remoteClusterNode.getClusterNodeData());
                }
            }
            Iterator<ClusterTopic> it = this.clusterTopics.values().iterator();
            while (it.hasNext()) {
                clusterNodeInfo.addClusterTopics(it.next().createTopicInfo());
            }
            return this.aesCipher.encrypt(clusterNodeInfo.toBytes());
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public File getFile(String str) {
        return this.fileTransferMap.get(str);
    }

    private File getTransferFile(String str) {
        return new File(this.fileTransferPath, str + ".tmp");
    }

    private String sendFile(File file, RemoteClusterNode remoteClusterNode, AtomicBoolean atomicBoolean) throws Exception {
        LOGGER.info("Send file: {}, node: {}", file.getName(), remoteClusterNode);
        String replace = UUID.randomUUID().toString().replace("-", ".");
        BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
        byte[] bArr = new byte[10000];
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        boolean z = true;
        int i = 0;
        while (true) {
            int read = bufferedInputStream.read(bArr);
            if (read < 0) {
                ClusterFileTransfer lastMessage = new ClusterFileTransfer().setFileId(replace).setData(byteArrayOutputStream.toByteArray()).setMessageIndex(i).setLastMessage(true);
                if (atomicBoolean != null && atomicBoolean.get()) {
                    return null;
                }
                CompletableFuture<ClusterFileTransferResponse> completableFuture = new CompletableFuture<>();
                this.fileTransferFutureMap.put(replace, completableFuture);
                sendMessage(lastMessage, remoteClusterNode);
                ClusterFileTransferResponse clusterFileTransferResponse = completableFuture.get(120L, TimeUnit.SECONDS);
                if (clusterFileTransferResponse.getReceivedData() == file.length()) {
                    return replace;
                }
                long length = file.length();
                clusterFileTransferResponse.getReceivedData();
                Exception exc = new Exception("Error sending file transfer, expected: " + length + ", actual received: " + exc);
                throw exc;
            }
            byteArrayOutputStream.write(bArr, 0, read);
            if (byteArrayOutputStream.size() >= 10000000) {
                int i2 = i;
                i++;
                ClusterFileTransfer initialMessage = new ClusterFileTransfer().setFileId(replace).setData(byteArrayOutputStream.toByteArray()).setLength(r0.length).setMessageIndex(i2).setInitialMessage(z);
                z = false;
                byteArrayOutputStream.reset();
                if (atomicBoolean != null && atomicBoolean.get()) {
                    return null;
                }
                sendMessage(initialMessage, remoteClusterNode);
            }
        }
    }

    private void handleFileTransfer(ClusterFileTransfer clusterFileTransfer, RemoteClusterNode remoteClusterNode) throws Exception {
        LOGGER.info("Handle file transfer: {}, length: {}, last: {}, messageIndex: {}, node: {}", new Object[]{clusterFileTransfer.getFileId(), Long.valueOf(clusterFileTransfer.getLength()), Boolean.valueOf(clusterFileTransfer.getLastMessage()), Integer.valueOf(clusterFileTransfer.getMessageIndex()), remoteClusterNode});
        if (clusterFileTransfer.getData().length != clusterFileTransfer.getLength()) {
            LOGGER.error("Wrong length of file transfer, expected: {}, actual: {}", Long.valueOf(clusterFileTransfer.getLength()), Integer.valueOf(clusterFileTransfer.getData().length));
        }
        long appendFileTransferData = appendFileTransferData(clusterFileTransfer.getFileId(), clusterFileTransfer.getData(), clusterFileTransfer.getInitialMessage());
        if (clusterFileTransfer.getLastMessage()) {
            this.fileTransferMap.put(clusterFileTransfer.getFileId(), getTransferFile(clusterFileTransfer.getFileId()));
            sendMessage(new ClusterFileTransferResponse().setReceivedData(appendFileTransferData).setFileId(clusterFileTransfer.getFileId()), remoteClusterNode);
        }
    }

    private long appendFileTransferData(String str, byte[] bArr, boolean z) throws IOException {
        File transferFile = getTransferFile(str);
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(transferFile, !z), 32000);
        bufferedOutputStream.write(bArr);
        bufferedOutputStream.close();
        return transferFile.length();
    }

    private void handleFileTransferResponse(ClusterFileTransferResponse clusterFileTransferResponse, RemoteClusterNode remoteClusterNode) {
        CompletableFuture<ClusterFileTransferResponse> remove = this.fileTransferFutureMap.remove(clusterFileTransferResponse.getFileId());
        if (remove != null) {
            remove.complete(clusterFileTransferResponse);
        }
    }
}
