package org.apache.iotdb.db.sync.transport.client;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.transport.conf.TransportConfig;
import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.transport.thrift.MetaInfo;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.ResponseType;
import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.service.transport.thrift.TransportService;
import org.apache.iotdb.service.transport.thrift.Type;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/transport/client/TransportClient.class */
public class TransportClient implements ITransportClient {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TransportClient.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1048576;
    private final ClientWrapper serviceClient;
    private final ClientWrapper heartbeatClient;
    private final String ipAddress;
    private final int port;
    private final String localIP;
    private final Pipe pipe;
    private final Object waitLock;

    public TransportClient(Pipe pipe, String str, int i, String str2) {
        RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
        this.pipe = pipe;
        this.ipAddress = str;
        this.port = i;
        this.waitLock = new Object();
        this.localIP = str2;
        this.serviceClient = new ClientWrapper(pipe, str, i, str2);
        this.heartbeatClient = new ClientWrapper(pipe, str, i, str2);
    }

    public Object getWaitLock() {
        return this.waitLock;
    }

    public synchronized boolean handshake() throws SyncConnectionException {
        for (int i = 0; i < config.getMaxNumberOfSyncFileRetry(); i++) {
            try {
                return this.serviceClient.handshakeWithVersion();
            } catch (SyncConnectionException e) {
                logger.warn(String.format("Handshake error, retry %d/%d.", Integer.valueOf(i), Integer.valueOf(config.getMaxNumberOfSyncFileRetry())));
            }
        }
        if (this.serviceClient.handshakeWithVersion()) {
            return true;
        }
        logger.info(String.format("Handshake failed %s times!", Integer.valueOf(config.getMaxNumberOfSyncFileRetry())));
        return false;
    }

    public boolean senderTransport(PipeData pipeData) throws SyncConnectionException {
        if (pipeData instanceof TsFilePipeData) {
            try {
                Iterator<File> it = ((TsFilePipeData) pipeData).getTsFiles(true).iterator();
                while (it.hasNext()) {
                    transportSingleFile(it.next());
                }
            } catch (IOException e) {
                logger.error(String.format("Get tsfiles error, because %s.", e), (Throwable) e);
                return false;
            } catch (NoSuchAlgorithmException e2) {
                logger.error(String.format("Wrong message digest, because %s.", e2), (Throwable) e2);
                return false;
            }
        }
        int i = 0;
        do {
            i++;
            if (i > config.getMaxNumberOfSyncFileRetry()) {
                logger.error(String.format("After %s tries, stop the transport of current pipeData!", Integer.valueOf(i)));
                throw new SyncConnectionException(String.format("Can not connect to receiver when transferring pipedata %s.", pipeData));
            }
            try {
                transportPipeData(pipeData);
                logger.info(String.format("Finish pipeData %s transport.", pipeData));
                return true;
            } catch (NoSuchAlgorithmException e3) {
                logger.error("Transport failed. ", (Throwable) e3);
                return false;
            } catch (SyncConnectionException e4) {
                try {
                } catch (SyncConnectionException e5) {
                    logger.error(String.format("Reconnect to receiver %s:%d error when transfer pipe data %s.", this.ipAddress, Integer.valueOf(this.port), pipeData));
                    throw new SyncConnectionException(String.format("Reconnect to receiver error when transferring pipedata %s.", pipeData));
                }
            }
        } while (handshake());
        logger.error(String.format("Handshake to receiver %s:%d error when transfer pipe data %s.", this.ipAddress, Integer.valueOf(this.port), pipeData));
        return false;
    }

    private void transportSingleFile(File file) throws SyncConnectionException, NoSuchAlgorithmException {
        MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
        int i = 0;
        while (true) {
            i++;
            if (i <= config.getMaxNumberOfSyncFileRetry()) {
                try {
                    transportSingleFilePieceByPiece(file, messageDigest);
                } catch (SyncConnectionException e) {
                    try {
                        if (!handshake()) {
                            throw new SyncConnectionException(String.format("Handshake with receiver error when transferring file %s.", file.getName()));
                        }
                    } catch (SyncConnectionException e2) {
                        throw new SyncConnectionException(String.format("Connect to receiver error when transferring file %s.", file.getName()));
                    }
                }
                if (!TransportConfig.isCheckFileDegistAgain) {
                    break;
                }
                try {
                    if (checkFileDigest(file, messageDigest)) {
                        break;
                    }
                } catch (IOException e3) {
                    logger.warn(String.format("Read from disk to make digest error, skip check file %s, because %s.", file.getName(), e3));
                    break;
                }
            } else {
                throw new SyncConnectionException(String.format("Connect to receiver error when transferring file %s.", file.getName()));
            }
        }
        logger.info("Receiver has received {} successfully.", file.getAbsoluteFile());
    }

    /* JADX WARN: Code restructure failed: missing block: B:23:0x0113, code lost:
    
        if (r0.code != (-2)) goto L42;
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x014c, code lost:
    
        if (r0.code != (-3)) goto L53;
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x018e, code lost:
    
        if (r0.code == 1) goto L57;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x01b1, code lost:
    
        r12 = r12 + r0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x0191, code lost:
    
        org.apache.iotdb.db.sync.transport.client.TransportClient.logger.info("Receiver failed to receive data from {} because {}, abort.", r10.getAbsoluteFile(), r0.msg);
     */
    /* JADX WARN: Code restructure failed: missing block: B:48:0x01b0, code lost:
    
        throw new org.apache.iotdb.db.exception.SyncConnectionException(r0.msg);
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x014f, code lost:
    
        org.apache.iotdb.db.sync.transport.client.TransportClient.logger.info("Receiver failed to receive data from {} because {}, retry.", r10.getAbsoluteFile(), r0.msg);
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x0164, code lost:
    
        if (r0 == null) goto L52;
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x0169, code lost:
    
        if (0 == 0) goto L51;
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x0180, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x016c, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:59:0x0174, code lost:
    
        r24 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:60:0x0176, code lost:
    
        r0.addSuppressed(r24);
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x0116, code lost:
    
        r12 = java.lang.Long.parseLong(r0.msg);
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x0121, code lost:
    
        if (r0 == null) goto L41;
     */
    /* JADX WARN: Code restructure failed: missing block: B:67:0x0126, code lost:
    
        if (0 == 0) goto L40;
     */
    /* JADX WARN: Code restructure failed: missing block: B:68:0x013d, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:70:0x0129, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:72:0x0131, code lost:
    
        r24 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:73:0x0133, code lost:
    
        r0.addSuppressed(r24);
     */
    /* JADX WARN: Finally extract failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void transportSingleFilePieceByPiece(java.io.File r10, java.security.MessageDigest r11) throws org.apache.iotdb.db.exception.SyncConnectionException {
        /*
            Method dump skipped, instructions count: 577
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.sync.transport.client.TransportClient.transportSingleFilePieceByPiece(java.io.File, java.security.MessageDigest):void");
    }

    private long getFileSizeLimit(File file) {
        File file2 = new File(file.getPath() + SyncConstant.MODS_OFFSET_FILE_SUFFIX);
        if (file2.exists()) {
            try {
                BufferedReader bufferedReader = new BufferedReader(new FileReader(file2));
                Throwable th = null;
                try {
                    try {
                        long parseLong = Long.parseLong(bufferedReader.readLine());
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                        return parseLong;
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                logger.error(String.format("Deserialize offset of file %s error, because %s.", file.getPath(), e));
            }
        }
        return file.length();
    }

    private boolean checkFileDigest(File file, MessageDigest messageDigest) throws SyncConnectionException, IOException {
        messageDigest.reset();
        FileInputStream fileInputStream = new FileInputStream(file);
        Throwable th = null;
        try {
            try {
                byte[] bArr = new byte[TransportConstant.DATA_CHUNK_SIZE];
                while (true) {
                    int read = fileInputStream.read(bArr);
                    if (read <= 0) {
                        break;
                    }
                    messageDigest.update(bArr, 0, read);
                }
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), 0L);
                int i = 0;
                while (true) {
                    i++;
                    if (i > config.getMaxNumberOfSyncFileRetry()) {
                        throw new SyncConnectionException(String.format("Can not sync file %s after %s tries.", file.getAbsoluteFile(), Integer.valueOf(config.getMaxNumberOfSyncFileRetry())));
                    }
                    try {
                        if (this.serviceClient.getClient().checkFileDigest(metaInfo, ByteBuffer.wrap(messageDigest.digest())).code == 1) {
                            return true;
                        }
                        logger.error("Digest check of tsfile {} failed, retry", file.getAbsoluteFile());
                        return false;
                    } catch (TException e) {
                        logger.error("TException happens! ", (Throwable) e);
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    private void transportPipeData(PipeData pipeData) throws SyncConnectionException, NoSuchAlgorithmException {
        byte[] serialize;
        MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
        int i = 0;
        while (true) {
            i++;
            if (i > config.getMaxNumberOfSyncFileRetry()) {
                throw new SyncConnectionException(String.format("Can not sync pipe data after %s tries.", Integer.valueOf(config.getMaxNumberOfSyncFileRetry())));
            }
            try {
                serialize = pipeData.serialize();
                messageDigest.reset();
                messageDigest.update(serialize);
            } catch (IOException | TException e) {
                logger.error("Exception happened!", e);
            }
            if (this.serviceClient.getClient().transportData(new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0L), ByteBuffer.wrap(serialize), ByteBuffer.wrap(messageDigest.digest())).code == 1) {
                return;
            } else {
                logger.error("Digest check of pipeData failed, retry");
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    try {
                        if (!handshake()) {
                            SenderService.getInstance().receiveMsg(new SyncResponse(ResponseType.ERROR, String.format("Can not handshake with %s:%d.", this.ipAddress, Integer.valueOf(this.port))));
                        }
                        SenderService.getInstance().receiveMsg(heartbeat(new SyncRequest(RequestType.START, this.pipe.getName(), this.localIP, this.pipe.getCreateTime())));
                        while (!Thread.currentThread().isInterrupted()) {
                            PipeData take = this.pipe.take();
                            if (senderTransport(take)) {
                                this.pipe.commit();
                            } else {
                                logger.error(String.format("Can not transfer pipedata %s, skip it.", take));
                                SenderService.getInstance().receiveMsg(new SyncResponse(ResponseType.WARN, String.format("Transfer piepdata %s error, skip it.", Long.valueOf(take.getSerialNumber()))));
                            }
                        }
                    } catch (SyncConnectionException e) {
                        logger.error(String.format("Connect to receiver %s:%d error, because %s.", this.ipAddress, Integer.valueOf(this.port), e));
                        synchronized (this.waitLock) {
                            SenderService.getInstance().setConnecting(true);
                            this.waitLock.wait();
                        }
                    }
                } catch (InterruptedException e2) {
                    logger.info("Interrupted by pipe, exit transport.");
                    close();
                    return;
                }
            } catch (Throwable th) {
                close();
                throw th;
            }
        }
        close();
    }

    @Override // org.apache.iotdb.db.sync.transport.client.ITransportClient
    public SyncResponse heartbeat(SyncRequest syncRequest) throws SyncConnectionException {
        Throwable th;
        SyncResponse heartbeat;
        if (syncRequest.getType().equals(RequestType.HEARTBEAT)) {
            return requestHeartbeat(syncRequest);
        }
        int i = 0;
        while (true) {
            i++;
            if (i > config.getMaxNumberOfSyncFileRetry()) {
                throw new SyncConnectionException(String.format("%s request connects to receiver %s:%d error.", syncRequest.type.name(), this.ipAddress, Integer.valueOf(this.port)));
            }
            try {
                TTransport transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(TConfigurationConst.defaultTConfiguration, this.ipAddress, this.port, 100000, 1000));
                Throwable th2 = null;
                try {
                    try {
                        TransportService.Client client = new TransportService.Client(config.isRpcThriftCompressionEnable() ? new TCompactProtocol(transport) : new TBinaryProtocol(transport));
                        if (!transport.isOpen()) {
                            transport.open();
                        }
                        heartbeat = client.heartbeat(syncRequest);
                        if (transport != null) {
                            if (0 == 0) {
                                transport.close();
                                break;
                            }
                            try {
                                transport.close();
                                break;
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            break;
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                        break;
                    }
                } finally {
                    if (transport == null) {
                        break;
                    }
                    if (th == null) {
                        break;
                    }
                    try {
                        break;
                    } catch (Throwable th5) {
                    }
                }
            } catch (TException e) {
                logger.info(String.format("Heartbeat connect to receiver %s:%d error, retry %d/%d.", this.ipAddress, Integer.valueOf(this.port), Integer.valueOf(i), Integer.valueOf(config.getMaxNumberOfSyncFileRetry())));
            }
        }
        return heartbeat;
    }

    private SyncResponse requestHeartbeat(SyncRequest syncRequest) throws SyncConnectionException {
        if (this.heartbeatClient.getClient() == null) {
            synchronized (this.heartbeatClient) {
                if (this.heartbeatClient.getClient() == null && !this.heartbeatClient.handshakeWithVersion()) {
                    throw new SyncConnectionException("Handshake with receiver error when heartbeat.");
                }
            }
        }
        int i = 0;
        while (true) {
            i++;
            if (i > config.getMaxNumberOfSyncFileRetry()) {
                throw new SyncConnectionException(String.format("%s request connects to receiver %s:%d error.", syncRequest.type.name(), this.ipAddress, Integer.valueOf(this.port)));
            }
            try {
                return this.heartbeatClient.getClient().heartbeat(syncRequest);
            } catch (TException e) {
                if (!this.heartbeatClient.handshakeWithVersion()) {
                    throw new SyncConnectionException("Handshake with receiver error when heartbeat.");
                }
                logger.info(String.format("Heartbeat connect to receiver %s:%d error, retry %d/%d.", this.ipAddress, Integer.valueOf(this.port), Integer.valueOf(i), Integer.valueOf(config.getMaxNumberOfSyncFileRetry())));
            }
        }
    }

    public void close() {
        this.serviceClient.close();
        this.heartbeatClient.close();
    }
}
