package io.activej.fs.tcp;

import io.activej.async.service.EventloopService;
import io.activej.async.util.LogUtils;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.exception.UnexpectedDataException;
import io.activej.common.function.FunctionEx;
import io.activej.common.initializer.WithInitializer;
import io.activej.common.ref.RefLong;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.binary.Utils;
import io.activej.csp.dsl.ChannelConsumerTransformer;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.eventloop.net.SocketSettings;
import io.activej.fs.ActiveFs;
import io.activej.fs.FileMetadata;
import io.activej.fs.exception.FsIOException;
import io.activej.fs.tcp.RemoteFsCommands;
import io.activej.fs.tcp.RemoteFsResponses;
import io.activej.fs.util.RemoteFsUtils;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/fs/tcp/RemoteActiveFs.class */
public final class RemoteActiveFs implements ActiveFs, EventloopService, EventloopJmxBeanWithStats, WithInitializer<RemoteActiveFs> {
    private static final Logger logger = LoggerFactory.getLogger(RemoteActiveFs.class);
    public static final Duration DEFAULT_CONNECTION_TIMEOUT = ApplicationSettings.getDuration(RemoteActiveFs.class, "connectTimeout", Duration.ZERO);
    private static final ByteBufsCodec<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand> SERIALIZER = Utils.nullTerminated().andThen(byteBuf -> {
        try {
            return (RemoteFsResponses.FsResponse) RemoteFsUtils.fromJson(RemoteFsResponses.FsResponse.class, byteBuf);
        } finally {
            byteBuf.recycle();
        }
    }, fsCommand -> {
        return RemoteFsUtils.toJson((Class<? super RemoteFsCommands.FsCommand>) RemoteFsCommands.FsCommand.class, fsCommand);
    });
    private final Eventloop eventloop;
    private final InetSocketAddress address;
    private SocketSettings socketSettings = SocketSettings.createDefault();
    private int connectionTimeout = (int) DEFAULT_CONNECTION_TIMEOUT.toMillis();
    private final PromiseStats connectPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats appendStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats appendFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats copyPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats copyAllPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats movePromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats moveAllPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats listPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats infoPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats infoAllPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats pingPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats deletePromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats deleteAllPromise = PromiseStats.create(Duration.ofMinutes(5));

    private RemoteActiveFs(Eventloop eventloop, InetSocketAddress inetSocketAddress) {
        this.eventloop = eventloop;
        this.address = inetSocketAddress;
    }

    public static RemoteActiveFs create(Eventloop eventloop, InetSocketAddress inetSocketAddress) {
        return new RemoteActiveFs(eventloop, inetSocketAddress);
    }

    public RemoteActiveFs withSocketSettings(SocketSettings socketSettings) {
        this.socketSettings = socketSettings;
        return this;
    }

    public RemoteActiveFs withConnectionTimeout(Duration duration) {
        this.connectionTimeout = (int) duration.toMillis();
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String str) {
        return connectForStreaming(this.address).then(messagingWithBinaryStreaming -> {
            return doUpload(messagingWithBinaryStreaming, str, null);
        }).whenComplete(this.uploadStartPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, "upload", new Object[]{str, this}));
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String str, long j) {
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            return doUpload(messagingWithBinaryStreaming, str, Long.valueOf(j));
        }).whenComplete(this.uploadStartPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, "upload", new Object[]{str, Long.valueOf(j), this}));
    }

    @NotNull
    private Promise<ChannelConsumer<ByteBuf>> doUpload(MessagingWithBinaryStreaming<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand> messagingWithBinaryStreaming, @NotNull String str, @Nullable Long l) {
        Promise send = messagingWithBinaryStreaming.send(new RemoteFsCommands.Upload(str, l));
        Objects.requireNonNull(messagingWithBinaryStreaming);
        return send.then(messagingWithBinaryStreaming::receive).map(castFn(RemoteFsResponses.UploadAck.class)).then(() -> {
            return Promise.of(((ChannelConsumer) messagingWithBinaryStreaming.sendBinaryStream().transformWith(l == null ? ChannelConsumerTransformer.identity() : RemoteFsUtils.ofFixedSize(l.longValue()))).withAcknowledgement(promise -> {
                Objects.requireNonNull(messagingWithBinaryStreaming);
                Promise then = promise.then(messagingWithBinaryStreaming::receive);
                Objects.requireNonNull(messagingWithBinaryStreaming);
                return then.whenResult(messagingWithBinaryStreaming::close).map(castFn(RemoteFsResponses.UploadFinished.class)).toVoid().whenException(exc -> {
                    messagingWithBinaryStreaming.closeEx(exc);
                    logger.warn("Cancelled while trying to upload file {}: {}", new Object[]{str, this, exc});
                }).whenComplete(this.uploadFinishPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "onUploadComplete", new Object[]{messagingWithBinaryStreaming, str, l, this}));
            }));
        }).whenException(exc -> {
            messagingWithBinaryStreaming.closeEx(exc);
            logger.warn("Error while trying to upload file {}: {}", new Object[]{str, this, exc});
        });
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelConsumer<ByteBuf>> append(@NotNull String str, long j) {
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            Promise send = messagingWithBinaryStreaming.send(new RemoteFsCommands.Append(str, j));
            Objects.requireNonNull(messagingWithBinaryStreaming);
            Promise then = send.then(messagingWithBinaryStreaming::receive).map(castFn(RemoteFsResponses.AppendAck.class)).then(() -> {
                return Promise.of(messagingWithBinaryStreaming.sendBinaryStream().withAcknowledgement(promise -> {
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    Promise then2 = promise.then(messagingWithBinaryStreaming::receive);
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    Promise promise = then2.whenResult(messagingWithBinaryStreaming::close).map(castFn(RemoteFsResponses.AppendFinished.class)).toVoid();
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    return promise.whenException(messagingWithBinaryStreaming::closeEx).whenComplete(this.appendFinishPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "onAppendComplete", new Object[]{str, Long.valueOf(j), this}));
                }));
            });
            Objects.requireNonNull(messagingWithBinaryStreaming);
            return then.whenException(messagingWithBinaryStreaming::closeEx);
        }).whenComplete(this.appendStartPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, "append", new Object[]{str, Long.valueOf(j), this}));
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String str, long j, long j2) {
        Checks.checkArgument(j >= 0, "Data offset must be greater than or equal to zero");
        Checks.checkArgument(j2 >= 0, "Data limit must be greater than or equal to zero");
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            Promise send = messagingWithBinaryStreaming.send(new RemoteFsCommands.Download(str, j, j2));
            Objects.requireNonNull(messagingWithBinaryStreaming);
            return send.then(messagingWithBinaryStreaming::receive).map(castFn(RemoteFsResponses.DownloadSize.class)).then(downloadSize -> {
                long size = downloadSize.getSize();
                if (size > j2) {
                    throw new UnexpectedDataException();
                }
                logger.trace("download size for file {} is {}: {}", new Object[]{str, Long.valueOf(size), this});
                RefLong refLong = new RefLong(0L);
                return Promise.of(messagingWithBinaryStreaming.receiveBinaryStream().peek(byteBuf -> {
                    refLong.inc(byteBuf.readRemaining());
                }).withEndOfStream(promise -> {
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    Promise whenComplete = promise.then(messagingWithBinaryStreaming::sendEndOfStream).whenResult(() -> {
                        if (refLong.get() == size) {
                            return;
                        }
                        logger.error("invalid stream size for file {} (offset {}, limit {}), expected: {} actual: {}", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(size), Long.valueOf(refLong.get())});
                        if (refLong.get() >= size) {
                            throw new UnexpectedDataException();
                        }
                    }).whenComplete(this.downloadFinishPromise.recordStats()).whenComplete(LogUtils.toLogger(logger, "onDownloadComplete", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this}));
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    return whenComplete.whenResult(messagingWithBinaryStreaming::close);
                }));
            }).whenException(exc -> {
                messagingWithBinaryStreaming.closeEx(exc);
                logger.warn("error trying to download file {} (offset={}, limit={}) : {}", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this, exc});
            });
        }).whenComplete(LogUtils.toLogger(logger, "download", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), this})).whenComplete(this.downloadStartPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> copy(@NotNull String str, @NotNull String str2) {
        return simpleCommand(new RemoteFsCommands.Copy(str, str2), RemoteFsResponses.CopyFinished.class, copyFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "copy", new Object[]{str, str2, this})).whenComplete(this.copyPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> copyAll(Map<String, String> map) {
        Checks.checkArgument(io.activej.common.Utils.isBijection(map), "Targets must be unique");
        return map.isEmpty() ? Promise.complete() : simpleCommand(new RemoteFsCommands.CopyAll(map), RemoteFsResponses.CopyAllFinished.class, copyAllFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "copyAll", new Object[]{map, this})).whenComplete(this.copyAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> move(@NotNull String str, @NotNull String str2) {
        return simpleCommand(new RemoteFsCommands.Move(str, str2), RemoteFsResponses.MoveFinished.class, moveFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "move", new Object[]{str, str2, this})).whenComplete(this.movePromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> moveAll(Map<String, String> map) {
        Checks.checkArgument(io.activej.common.Utils.isBijection(map), "Targets must be unique");
        return map.isEmpty() ? Promise.complete() : simpleCommand(new RemoteFsCommands.MoveAll(map), RemoteFsResponses.MoveAllFinished.class, moveAllFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "moveAll", new Object[]{map, this})).whenComplete(this.moveAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> delete(@NotNull String str) {
        return simpleCommand(new RemoteFsCommands.Delete(str), RemoteFsResponses.DeleteFinished.class, deleteFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "delete", new Object[]{str, this})).whenComplete(this.deletePromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> deleteAll(Set<String> set) {
        return set.isEmpty() ? Promise.complete() : simpleCommand(new RemoteFsCommands.DeleteAll(set), RemoteFsResponses.DeleteAllFinished.class, deleteAllFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "deleteAll", new Object[]{set, this})).whenComplete(this.deleteAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Map<String, FileMetadata>> list(@NotNull String str) {
        return simpleCommand(new RemoteFsCommands.List(str), RemoteFsResponses.ListFinished.class, (v0) -> {
            return v0.getFiles();
        }).whenComplete(LogUtils.toLogger(logger, "list", new Object[]{str, this})).whenComplete(this.listPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<FileMetadata> info(@NotNull String str) {
        return simpleCommand(new RemoteFsCommands.Info(str), RemoteFsResponses.InfoFinished.class, (v0) -> {
            return v0.getMetadata();
        }).whenComplete(LogUtils.toLogger(logger, "info", new Object[]{str, this})).whenComplete(this.infoPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Map<String, FileMetadata>> infoAll(@NotNull Set<String> set) {
        return set.isEmpty() ? Promise.of(Collections.emptyMap()) : simpleCommand(new RemoteFsCommands.InfoAll(set), RemoteFsResponses.InfoAllFinished.class, (v0) -> {
            return v0.getMetadataMap();
        }).whenComplete(LogUtils.toLogger(logger, "infoAll", new Object[]{set, this})).whenComplete(this.infoAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> ping() {
        return simpleCommand(new RemoteFsCommands.Ping(), RemoteFsResponses.PingFinished.class, pingFinished -> {
            return (Void) null;
        }).whenComplete(LogUtils.toLogger(logger, "ping", new Object[]{this})).whenComplete(this.pingPromise.recordStats());
    }

    private Promise<MessagingWithBinaryStreaming<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand>> connect(InetSocketAddress inetSocketAddress) {
        return doConnect(inetSocketAddress, this.socketSettings);
    }

    private Promise<MessagingWithBinaryStreaming<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand>> connectForStreaming(InetSocketAddress inetSocketAddress) {
        return doConnect(inetSocketAddress, this.socketSettings.withLingerTimeout(Duration.ZERO));
    }

    private Promise<MessagingWithBinaryStreaming<RemoteFsResponses.FsResponse, RemoteFsCommands.FsCommand>> doConnect(InetSocketAddress inetSocketAddress, SocketSettings socketSettings) {
        return AsyncTcpSocketNio.connect(inetSocketAddress, this.connectionTimeout, socketSettings).map(asyncTcpSocketNio -> {
            return MessagingWithBinaryStreaming.create(asyncTcpSocketNio, SERIALIZER);
        }).whenResult(() -> {
            logger.trace("connected to [{}]: {}", inetSocketAddress, this);
        }).whenException(exc -> {
            logger.warn("failed connecting to [{}] : {}", new Object[]{inetSocketAddress, this, exc});
        }).whenComplete(this.connectPromise.recordStats());
    }

    private static <T extends RemoteFsResponses.FsResponse> FunctionEx<RemoteFsResponses.FsResponse, T> castFn(Class<T> cls) {
        return fsResponse -> {
            if (cls == fsResponse.getClass()) {
                return (RemoteFsResponses.FsResponse) cls.cast(fsResponse);
            }
            if (fsResponse instanceof RemoteFsResponses.ServerError) {
                throw ((RemoteFsResponses.ServerError) fsResponse).getError();
            }
            throw new FsIOException("Invalid or unexpected message received");
        };
    }

    private <T, R extends RemoteFsResponses.FsResponse> Promise<T> simpleCommand(RemoteFsCommands.FsCommand fsCommand, Class<R> cls, FunctionEx<R, T> functionEx) {
        return connect(this.address).then(messagingWithBinaryStreaming -> {
            Promise send = messagingWithBinaryStreaming.send(fsCommand);
            Objects.requireNonNull(messagingWithBinaryStreaming);
            Promise then = send.then(messagingWithBinaryStreaming::receive);
            Objects.requireNonNull(messagingWithBinaryStreaming);
            return then.whenResult(messagingWithBinaryStreaming::close).map(castFn(cls)).map(functionEx).whenException(exc -> {
                messagingWithBinaryStreaming.closeEx(exc);
                logger.warn("Error while processing command {} : {}", new Object[]{fsCommand, this, exc});
            });
        });
    }

    @NotNull
    public Promise<Void> start() {
        return ping();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public String toString() {
        return "RemoteActiveFs{address=" + this.address + '}';
    }

    @JmxAttribute
    public PromiseStats getConnectPromise() {
        return this.connectPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadStartPromise() {
        return this.uploadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadFinishPromise() {
        return this.uploadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getAppendStartPromise() {
        return this.appendStartPromise;
    }

    @JmxAttribute
    public PromiseStats getAppendFinishPromise() {
        return this.appendFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadStartPromise() {
        return this.downloadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadFinishPromise() {
        return this.downloadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyPromise() {
        return this.copyPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyAllPromise() {
        return this.copyAllPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getMoveAllPromise() {
        return this.moveAllPromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoPromise() {
        return this.infoPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoAllPromise() {
        return this.infoAllPromise;
    }

    @JmxAttribute
    public PromiseStats getPingPromise() {
        return this.pingPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }

    @JmxAttribute
    public PromiseStats getDeleteAllPromise() {
        return this.deleteAllPromise;
    }
}
