package io.activej.fs.tcp;

import io.activej.async.service.ReactiveService;
import io.activej.async.util.LogUtils;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.exception.UnexpectedDataException;
import io.activej.common.function.ConsumerEx;
import io.activej.common.function.FunctionEx;
import io.activej.common.ref.RefLong;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.binary.codec.ByteBufsCodecs;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.net.IMessaging;
import io.activej.csp.net.Messaging;
import io.activej.csp.process.transformer.ChannelConsumerTransformer;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.fs.FileMetadata;
import io.activej.fs.IFileSystem;
import io.activej.fs.exception.FileSystemException;
import io.activej.fs.tcp.messaging.FileSystemRequest;
import io.activej.fs.tcp.messaging.FileSystemResponse;
import io.activej.fs.util.RemoteFileSystemUtils;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.net.socket.tcp.TcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractNioReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.reactor.net.SocketSettings;
import io.activej.reactor.nio.NioReactor;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/fs/tcp/RemoteFileSystem.class */
public final class RemoteFileSystem extends AbstractNioReactive implements IFileSystem, ReactiveService, ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(RemoteFileSystem.class);
    public static final Duration DEFAULT_CONNECTION_TIMEOUT = ApplicationSettings.getDuration(RemoteFileSystem.class, "connectTimeout", Duration.ZERO);
    private static final ByteBufsCodec<FileSystemResponse, FileSystemRequest> SERIALIZER = ByteBufsCodecs.ofStreamCodecs(RemoteFileSystemUtils.FS_RESPONSE_CODEC, RemoteFileSystemUtils.FS_REQUEST_CODEC);
    private final InetSocketAddress address;
    private SocketSettings socketSettings;
    private SocketSettings socketSettingsStreaming;
    private int connectionTimeout;
    private final PromiseStats connectPromise;
    private final PromiseStats handshakePromise;
    private final PromiseStats uploadStartPromise;
    private final PromiseStats uploadFinishPromise;
    private final PromiseStats appendStartPromise;
    private final PromiseStats appendFinishPromise;
    private final PromiseStats downloadStartPromise;
    private final PromiseStats downloadFinishPromise;
    private final PromiseStats copyPromise;
    private final PromiseStats copyAllPromise;
    private final PromiseStats movePromise;
    private final PromiseStats moveAllPromise;
    private final PromiseStats listPromise;
    private final PromiseStats infoPromise;
    private final PromiseStats infoAllPromise;
    private final PromiseStats pingPromise;
    private final PromiseStats deletePromise;
    private final PromiseStats deleteAllPromise;

    /* loaded from: input_file:io/activej/fs/tcp/RemoteFileSystem$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, RemoteFileSystem> {
        private Builder() {
        }

        public Builder withSocketSettings(SocketSettings socketSettings) {
            checkNotBuilt(this);
            RemoteFileSystem.this.socketSettings = socketSettings;
            RemoteFileSystem.this.socketSettingsStreaming = RemoteFileSystem.createSocketSettingsForStreaming(socketSettings);
            return this;
        }

        public Builder withConnectionTimeout(Duration duration) {
            checkNotBuilt(this);
            RemoteFileSystem.this.connectionTimeout = (int) duration.toMillis();
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public RemoteFileSystem m27doBuild() {
            return RemoteFileSystem.this;
        }
    }

    private RemoteFileSystem(NioReactor nioReactor, InetSocketAddress inetSocketAddress) {
        super(nioReactor);
        this.socketSettings = SocketSettings.defaultInstance();
        this.socketSettingsStreaming = createSocketSettingsForStreaming(this.socketSettings);
        this.connectionTimeout = (int) DEFAULT_CONNECTION_TIMEOUT.toMillis();
        this.connectPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.handshakePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.uploadStartPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.uploadFinishPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.appendStartPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.appendFinishPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.downloadStartPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.downloadFinishPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.copyPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.copyAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.movePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.moveAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.listPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.infoPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.infoAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.pingPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.deletePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.deleteAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.address = inetSocketAddress;
    }

    public static RemoteFileSystem create(NioReactor nioReactor, InetSocketAddress inetSocketAddress) {
        return (RemoteFileSystem) builder(nioReactor, inetSocketAddress).build();
    }

    public static Builder builder(NioReactor nioReactor, InetSocketAddress inetSocketAddress) {
        return new Builder();
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<ChannelConsumer<ByteBuf>> upload(String str) {
        Reactive.checkInReactorThread(this);
        return connectForStreaming(this.address).then((v1) -> {
            return performHandshake(v1);
        }).then(iMessaging -> {
            return doUpload(iMessaging, str, null);
        }).whenComplete(this.uploadStartPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, "upload", new Object[]{str, this}));
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<ChannelConsumer<ByteBuf>> upload(String str, long j) {
        Reactive.checkInReactorThread(this);
        return connect(this.address).then((v1) -> {
            return performHandshake(v1);
        }).then(iMessaging -> {
            return doUpload(iMessaging, str, Long.valueOf(j));
        }).whenComplete(this.uploadStartPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, "upload", new Object[]{str, Long.valueOf(j), this}));
    }

    private Promise<ChannelConsumer<ByteBuf>> doUpload(IMessaging<FileSystemResponse, FileSystemRequest> iMessaging, String str, @Nullable Long l) {
        Promise send = iMessaging.send(new FileSystemRequest.Upload(str, l == null ? -1L : l.longValue()));
        Objects.requireNonNull(iMessaging);
        return send.then(iMessaging::receive).whenResult(validateFn(FileSystemResponse.UploadAck.class)).then(() -> {
            return Promise.of(((ChannelConsumer) iMessaging.sendBinaryStream().transformWith(l == null ? ChannelConsumerTransformer.identity() : RemoteFileSystemUtils.ofFixedSize(l.longValue()))).withAcknowledgement(promise -> {
                Objects.requireNonNull(iMessaging);
                Promise then = promise.then(iMessaging::receive);
                Objects.requireNonNull(iMessaging);
                return then.whenResult(iMessaging::close).whenResult(validateFn(FileSystemResponse.UploadFinished.class)).toVoid().whenException(exc -> {
                    iMessaging.closeEx(exc);
                    logger.warn("Cancelled while trying to upload file {}: {}", new Object[]{str, this, exc});
                }).whenComplete(this.uploadFinishPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "onUploadComplete", new Object[]{iMessaging, str, l, this}));
            }));
        }).whenException(exc -> {
            iMessaging.closeEx(exc);
            logger.warn("Error while trying to upload file {}: {}", new Object[]{str, this, exc});
        });
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<ChannelConsumer<ByteBuf>> append(String str, long j) {
        Reactive.checkInReactorThread(this);
        return connect(this.address).then((v1) -> {
            return performHandshake(v1);
        }).then(iMessaging -> {
            Promise send = iMessaging.send(new FileSystemRequest.Append(str, j));
            Objects.requireNonNull(iMessaging);
            Promise then = send.then(iMessaging::receive).whenResult(validateFn(FileSystemResponse.AppendAck.class)).then(() -> {
                return Promise.of(iMessaging.sendBinaryStream().withAcknowledgement(promise -> {
                    Objects.requireNonNull(iMessaging);
                    Promise then2 = promise.then(iMessaging::receive);
                    Objects.requireNonNull(iMessaging);
                    Promise promise = then2.whenResult(iMessaging::close).whenResult(validateFn(FileSystemResponse.AppendFinished.class)).toVoid();
                    Objects.requireNonNull(iMessaging);
                    return promise.whenException(iMessaging::closeEx).whenComplete(this.appendFinishPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "onAppendComplete", new Object[]{str, Long.valueOf(j), this}));
                }));
            });
            Objects.requireNonNull(iMessaging);
            return then.whenException(iMessaging::closeEx);
        }).whenComplete(this.appendStartPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "append", new Object[]{str, Long.valueOf(j), this}));
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<ChannelSupplier<ByteBuf>> download(String str, long j, long j2) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(j >= 0, "Data offset must be greater than or equal to zero");
        Checks.checkArgument(j2 >= 0, "Data limit must be greater than or equal to zero");
        return connect(this.address).then((v1) -> {
            return performHandshake(v1);
        }).then(iMessaging -> {
            Promise send = iMessaging.send(new FileSystemRequest.Download(str, j, j2));
            Objects.requireNonNull(iMessaging);
            return send.then(iMessaging::receive).map(castFn(FileSystemResponse.DownloadSize.class)).then(downloadSize -> {
                long size = downloadSize.size();
                if (size > j2) {
                    throw new UnexpectedDataException();
                }
                logger.trace("download size for file {} is {}: {}", new Object[]{str, Long.valueOf(size), this});
                RefLong refLong = new RefLong(0L);
                return Promise.of(iMessaging.receiveBinaryStream().peek(byteBuf -> {
                    refLong.inc(byteBuf.readRemaining());
                }).withEndOfStream(promise -> {
                    Objects.requireNonNull(iMessaging);
                    Promise whenComplete = promise.then(iMessaging::sendEndOfStream).whenResult(() -> {
                        if (refLong.get() == size) {
                            return;
                        }
                        logger.error("invalid stream size for file {} (offset {}, limit {}), expected: {} actual: {}", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(size), Long.valueOf(refLong.get())});
                        if (refLong.get() >= size) {
                            throw new UnexpectedDataException();
                        }
                    }).whenComplete(this.downloadFinishPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, "onDownloadComplete", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this}));
                    Objects.requireNonNull(iMessaging);
                    return whenComplete.whenResult(iMessaging::close);
                }));
            }).whenException(exc -> {
                iMessaging.closeEx(exc);
                logger.warn("error trying to download file {} (offset={}, limit={}) : {}", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this, exc});
            });
        }).whenComplete(LogUtils.toLogger(logger, "download", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this})).whenComplete(this.downloadStartPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> copy(String str, String str2) {
        Reactive.checkInReactorThread(this);
        return simpleCommand(new FileSystemRequest.Copy(str, str2), FileSystemResponse.CopyFinished.class).whenComplete(LogUtils.toLogger(logger, "copy", new Object[]{str, str2, this})).whenComplete(this.copyPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> copyAll(Map<String, String> map) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(Utils.isBijection(map), "Targets must be unique");
        return map.isEmpty() ? Promise.complete() : simpleCommand(new FileSystemRequest.CopyAll(map), FileSystemResponse.CopyAllFinished.class).whenComplete(LogUtils.toLogger(logger, "copyAll", new Object[]{map, this})).whenComplete(this.copyAllPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> move(String str, String str2) {
        Reactive.checkInReactorThread(this);
        return simpleCommand(new FileSystemRequest.Move(str, str2), FileSystemResponse.MoveFinished.class).whenComplete(LogUtils.toLogger(logger, "move", new Object[]{str, str2, this})).whenComplete(this.movePromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> moveAll(Map<String, String> map) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(Utils.isBijection(map), "Targets must be unique");
        return map.isEmpty() ? Promise.complete() : simpleCommand(new FileSystemRequest.MoveAll(map), FileSystemResponse.MoveAllFinished.class).whenComplete(LogUtils.toLogger(logger, "moveAll", new Object[]{map, this})).whenComplete(this.moveAllPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> delete(String str) {
        Reactive.checkInReactorThread(this);
        return simpleCommand(new FileSystemRequest.Delete(str), FileSystemResponse.DeleteFinished.class).whenComplete(LogUtils.toLogger(logger, "delete", new Object[]{str, this})).whenComplete(this.deletePromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> deleteAll(Set<String> set) {
        Reactive.checkInReactorThread(this);
        return set.isEmpty() ? Promise.complete() : simpleCommand(new FileSystemRequest.DeleteAll(set), FileSystemResponse.DeleteAllFinished.class).whenComplete(LogUtils.toLogger(logger, "deleteAll", new Object[]{set, this})).whenComplete(this.deleteAllPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Map<String, FileMetadata>> list(String str) {
        Reactive.checkInReactorThread(this);
        return simpleCommand(new FileSystemRequest.List(str), FileSystemResponse.ListFinished.class, (v0) -> {
            return v0.files();
        }).whenComplete(LogUtils.toLogger(logger, "list", new Object[]{str, this})).whenComplete(this.listPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<FileMetadata> info(String str) {
        Reactive.checkInReactorThread(this);
        return simpleCommand(new FileSystemRequest.Info(str), FileSystemResponse.InfoFinished.class, (v0) -> {
            return v0.fileMetadata();
        }).whenComplete(LogUtils.toLogger(logger, "info", new Object[]{str, this})).whenComplete(this.infoPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Map<String, FileMetadata>> infoAll(Set<String> set) {
        Reactive.checkInReactorThread(this);
        return set.isEmpty() ? Promise.of(Map.of()) : simpleCommand(new FileSystemRequest.InfoAll(set), FileSystemResponse.InfoAllFinished.class, (v0) -> {
            return v0.files();
        }).whenComplete(LogUtils.toLogger(logger, "infoAll", new Object[]{set, this})).whenComplete(this.infoAllPromise.recordStats());
    }

    @Override // io.activej.fs.IFileSystem
    public Promise<Void> ping() {
        Reactive.checkInReactorThread(this);
        return simpleCommand(new FileSystemRequest.Ping(), FileSystemResponse.Pong.class).whenComplete(LogUtils.toLogger(logger, "ping", new Object[]{this})).whenComplete(this.pingPromise.recordStats());
    }

    private Promise<Messaging<FileSystemResponse, FileSystemRequest>> connect(InetSocketAddress inetSocketAddress) {
        return doConnect(inetSocketAddress, this.socketSettings);
    }

    private Promise<Messaging<FileSystemResponse, FileSystemRequest>> connectForStreaming(InetSocketAddress inetSocketAddress) {
        return doConnect(inetSocketAddress, this.socketSettingsStreaming);
    }

    private Promise<Messaging<FileSystemResponse, FileSystemRequest>> doConnect(InetSocketAddress inetSocketAddress, SocketSettings socketSettings) {
        return TcpSocket.connect(this.reactor, inetSocketAddress, this.connectionTimeout, socketSettings).map(tcpSocket -> {
            return Messaging.create(tcpSocket, SERIALIZER);
        }).whenResult(() -> {
            logger.trace("connected to [{}]: {}", inetSocketAddress, this);
        }).whenException(exc -> {
            logger.warn("failed connecting to [{}] : {}", new Object[]{inetSocketAddress, this, exc});
        }).whenComplete(this.connectPromise.recordStats());
    }

    private Promise<IMessaging<FileSystemResponse, FileSystemRequest>> performHandshake(IMessaging<FileSystemResponse, FileSystemRequest> iMessaging) {
        Promise send = iMessaging.send(new FileSystemRequest.Handshake(FileSystemServer.VERSION));
        Objects.requireNonNull(iMessaging);
        return send.then(iMessaging::receive).map(castFn(FileSystemResponse.Handshake.class)).map(handshake -> {
            FileSystemResponse.HandshakeFailure handshakeFailure = handshake.handshakeFailure();
            if (handshakeFailure != null) {
                throw new FileSystemException(String.format("Handshake failed: %s. Minimal allowed version: %s", handshakeFailure.message(), handshakeFailure.minimalVersion()));
            }
            return iMessaging;
        }).whenComplete(LogUtils.toLogger(logger, "handshake", new Object[]{this})).whenComplete(this.handshakePromise.recordStats());
    }

    private static <T extends FileSystemResponse> ConsumerEx<FileSystemResponse> validateFn(Class<T> cls) {
        return fileSystemResponse -> {
            castFn(cls).apply(fileSystemResponse);
        };
    }

    private static <T extends FileSystemResponse> FunctionEx<FileSystemResponse, T> castFn(Class<T> cls) {
        return fileSystemResponse -> {
            if (fileSystemResponse instanceof FileSystemResponse.ServerError) {
                throw ((FileSystemResponse.ServerError) fileSystemResponse).exception();
            }
            if (fileSystemResponse.getClass() != cls) {
                throw new FileSystemException("Received request " + fileSystemResponse.getClass().getName() + " instead of " + cls);
            }
            return fileSystemResponse;
        };
    }

    private <T extends FileSystemResponse> Promise<Void> simpleCommand(FileSystemRequest fileSystemRequest, Class<T> cls) {
        return simpleCommand(fileSystemRequest, cls, fileSystemResponse -> {
            return null;
        });
    }

    private <T extends FileSystemResponse, U> Promise<U> simpleCommand(FileSystemRequest fileSystemRequest, Class<T> cls, FunctionEx<T, U> functionEx) {
        return connect(this.address).then((v1) -> {
            return performHandshake(v1);
        }).then(iMessaging -> {
            Promise send = iMessaging.send(fileSystemRequest);
            Objects.requireNonNull(iMessaging);
            Promise then = send.then(iMessaging::receive);
            Objects.requireNonNull(iMessaging);
            return then.whenResult(iMessaging::close).map(castFn(cls)).map(functionEx).whenException(exc -> {
                iMessaging.closeEx(exc);
                logger.warn("Error while processing command {} : {}", new Object[]{fileSystemRequest, this, exc});
            });
        });
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return ping();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    private static SocketSettings createSocketSettingsForStreaming(SocketSettings socketSettings) {
        return (SocketSettings) SocketSettings.builder().withLingerTimeout(Duration.ZERO).withKeepAlive(socketSettings.getKeepAlive()).withReuseAddress(socketSettings.getReuseAddress()).withTcpNoDelay(socketSettings.getTcpNoDelay()).withSendBufferSize(socketSettings.getSendBufferSize()).withReceiveBufferSize(socketSettings.getReceiveBufferSize()).withImplReadTimeout(socketSettings.getImplReadTimeout()).withImplWriteTimeout(socketSettings.getImplWriteTimeout()).withImplReadBufferSize(socketSettings.getImplReadBufferSize()).build();
    }

    public String toString() {
        return "RemoteFileSystem{address=" + this.address + "}";
    }

    @JmxAttribute
    public PromiseStats getConnectPromise() {
        return this.connectPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadStartPromise() {
        return this.uploadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadFinishPromise() {
        return this.uploadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getAppendStartPromise() {
        return this.appendStartPromise;
    }

    @JmxAttribute
    public PromiseStats getAppendFinishPromise() {
        return this.appendFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadStartPromise() {
        return this.downloadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadFinishPromise() {
        return this.downloadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyPromise() {
        return this.copyPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyAllPromise() {
        return this.copyAllPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getMoveAllPromise() {
        return this.moveAllPromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoPromise() {
        return this.infoPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoAllPromise() {
        return this.infoAllPromise;
    }

    @JmxAttribute
    public PromiseStats getPingPromise() {
        return this.pingPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }

    @JmxAttribute
    public PromiseStats getDeleteAllPromise() {
        return this.deleteAllPromise;
    }

    @JmxAttribute
    public PromiseStats getHandshakePromise() {
        return this.handshakePromise;
    }
}
