package io.activej.fs.tcp;

import io.activej.async.util.LogUtils;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.eventloop.Eventloop;
import io.activej.fs.ActiveFs;
import io.activej.fs.exception.FsIOException;
import io.activej.fs.exception.scalar.FileNotFoundException;
import io.activej.fs.tcp.RemoteFsCommands;
import io.activej.fs.tcp.RemoteFsResponses;
import io.activej.fs.util.RemoteFsUtils;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.net.AbstractServer;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.net.InetAddress;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

/* loaded from: input_file:io/activej/fs/tcp/ActiveFsServer.class */
public final class ActiveFsServer extends AbstractServer<ActiveFsServer> {
    private static final ByteBufsCodec<RemoteFsCommands.FsCommand, RemoteFsResponses.FsResponse> SERIALIZER = RemoteFsUtils.nullTerminatedJson(RemoteFsCommands.CODEC, RemoteFsResponses.CODEC);
    public static final FsIOException NO_HANDLER_FOR_MESSAGE = new FsIOException(ActiveFsServer.class, "No handler for received message type");
    private final Map<Class<?>, MessagingHandler<RemoteFsCommands.FsCommand>> handlers;
    private final ActiveFs fs;
    private final PromiseStats handleRequestPromise;
    private final PromiseStats uploadPromise;
    private final PromiseStats downloadPromise;
    private final PromiseStats copyPromise;
    private final PromiseStats copyAllPromise;
    private final PromiseStats movePromise;
    private final PromiseStats moveAllPromise;
    private final PromiseStats listPromise;
    private final PromiseStats infoPromise;
    private final PromiseStats infoAllPromise;
    private final PromiseStats pingPromise;
    private final PromiseStats deletePromise;
    private final PromiseStats deleteAllPromise;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/activej/fs/tcp/ActiveFsServer$MessagingHandler.class */
    public interface MessagingHandler<T extends RemoteFsCommands.FsCommand> {
        Promise<Void> onMessage(Messaging<RemoteFsCommands.FsCommand, RemoteFsResponses.FsResponse> messaging, T t);
    }

    private ActiveFsServer(Eventloop eventloop, ActiveFs activeFs) {
        super(eventloop);
        this.handlers = new HashMap();
        this.handleRequestPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.uploadPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.downloadPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.copyPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.copyAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.movePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.moveAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.listPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.infoPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.infoAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.pingPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.deletePromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.deleteAllPromise = PromiseStats.create(Duration.ofMinutes(5L));
        this.fs = activeFs;
        addHandlers();
    }

    public static ActiveFsServer create(Eventloop eventloop, ActiveFs activeFs) {
        return new ActiveFsServer(eventloop, activeFs);
    }

    public ActiveFs getFs() {
        return this.fs;
    }

    protected void serve(AsyncTcpSocket asyncTcpSocket, InetAddress inetAddress) {
        MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocket, SERIALIZER);
        create.receive().then(fsCommand -> {
            MessagingHandler<RemoteFsCommands.FsCommand> messagingHandler = this.handlers.get(fsCommand.getClass());
            if (messagingHandler != null) {
                return messagingHandler.onMessage(create, fsCommand);
            }
            this.logger.warn("received a message with no associated handler, type: {}", fsCommand.getClass());
            return Promise.ofException(NO_HANDLER_FOR_MESSAGE);
        }).whenComplete(this.handleRequestPromise.recordStats()).whenException(th -> {
            this.logger.warn("got an error while handling message : {}", this, th);
            Promise send = create.send(new RemoteFsResponses.ServerError(RemoteFsUtils.castError(th)));
            Objects.requireNonNull(create);
            Promise then = send.then(create::sendEndOfStream);
            Objects.requireNonNull(create);
            then.whenResult(create::close);
        });
    }

    private void addHandlers() {
        onMessage(RemoteFsCommands.Upload.class, (messaging, upload) -> {
            String name = upload.getName();
            Long size = upload.getSize();
            Promise then = (size == null ? this.fs.upload(name) : this.fs.upload(name, size.longValue())).map(channelConsumer -> {
                return size == null ? channelConsumer : (ChannelConsumer) channelConsumer.transformWith(RemoteFsUtils.ofFixedSize(size.longValue()));
            }).then(channelConsumer2 -> {
                return messaging.send(new RemoteFsResponses.UploadAck()).then(() -> {
                    return messaging.receiveBinaryStream().streamTo(channelConsumer2);
                });
            }).then(() -> {
                return messaging.send(new RemoteFsResponses.UploadFinished());
            });
            Objects.requireNonNull(messaging);
            Promise then2 = then.then(messaging::sendEndOfStream);
            Objects.requireNonNull(messaging);
            return then2.whenResult(messaging::close).whenComplete(this.uploadPromise.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, "receiving data", new Object[]{upload, this}));
        });
        onMessage(RemoteFsCommands.Append.class, (messaging2, append) -> {
            Promise then = this.fs.append(append.getName(), append.getOffset()).then(channelConsumer -> {
                return messaging2.send(new RemoteFsResponses.AppendAck()).then(() -> {
                    return messaging2.receiveBinaryStream().streamTo(channelConsumer);
                });
            }).then(() -> {
                return messaging2.send(new RemoteFsResponses.AppendFinished());
            });
            Objects.requireNonNull(messaging2);
            Promise then2 = then.then(messaging2::sendEndOfStream);
            Objects.requireNonNull(messaging2);
            return then2.whenResult(messaging2::close);
        });
        onMessage(RemoteFsCommands.Download.class, (messaging3, download) -> {
            String name = download.getName();
            long offset = download.getOffset();
            long limit = download.getLimit();
            return this.fs.info(name).then(fileMetadata -> {
                if (fileMetadata == null) {
                    return Promise.ofException(new FileNotFoundException(ActiveFsServer.class));
                }
                long max = Math.max(0L, Math.min(fileMetadata.getSize() - offset, limit));
                return this.fs.download(name, offset, max).then(channelSupplier -> {
                    Promise send = messaging3.send(new RemoteFsResponses.DownloadSize(max));
                    Objects.requireNonNull(channelSupplier);
                    return send.whenException(channelSupplier::closeEx).then(() -> {
                        return channelSupplier.streamTo(messaging3.sendBinaryStream());
                    });
                }).whenComplete(LogUtils.toLogger(this.logger, "sending data", new Object[]{fileMetadata, Long.valueOf(offset), Long.valueOf(max), this}));
            }).whenComplete(this.downloadPromise.recordStats());
        });
        onMessage(RemoteFsCommands.Copy.class, simpleHandler(copy -> {
            return this.fs.copy(copy.getName(), copy.getTarget());
        }, r3 -> {
            return new RemoteFsResponses.CopyFinished();
        }, this.copyPromise));
        onMessage(RemoteFsCommands.CopyAll.class, simpleHandler(copyAll -> {
            return this.fs.copyAll(copyAll.getSourceToTarget());
        }, r32 -> {
            return new RemoteFsResponses.CopyAllFinished();
        }, this.copyAllPromise));
        onMessage(RemoteFsCommands.Move.class, simpleHandler(move -> {
            return this.fs.move(move.getName(), move.getTarget());
        }, r33 -> {
            return new RemoteFsResponses.MoveFinished();
        }, this.movePromise));
        onMessage(RemoteFsCommands.MoveAll.class, simpleHandler(moveAll -> {
            return this.fs.moveAll(moveAll.getSourceToTarget());
        }, r34 -> {
            return new RemoteFsResponses.MoveAllFinished();
        }, this.moveAllPromise));
        onMessage(RemoteFsCommands.Delete.class, simpleHandler(delete -> {
            return this.fs.delete(delete.getName());
        }, r35 -> {
            return new RemoteFsResponses.DeleteFinished();
        }, this.deletePromise));
        onMessage(RemoteFsCommands.DeleteAll.class, simpleHandler(deleteAll -> {
            return this.fs.deleteAll(deleteAll.getFilesToDelete());
        }, r36 -> {
            return new RemoteFsResponses.DeleteAllFinished();
        }, this.deleteAllPromise));
        onMessage(RemoteFsCommands.List.class, simpleHandler(list -> {
            return this.fs.list(list.getGlob());
        }, RemoteFsResponses.ListFinished::new, this.listPromise));
        onMessage(RemoteFsCommands.Info.class, simpleHandler(info -> {
            return this.fs.info(info.getName());
        }, RemoteFsResponses.InfoFinished::new, this.infoPromise));
        onMessage(RemoteFsCommands.InfoAll.class, simpleHandler(infoAll -> {
            return this.fs.infoAll(infoAll.getNames());
        }, RemoteFsResponses.InfoAllFinished::new, this.infoAllPromise));
        onMessage(RemoteFsCommands.Ping.class, simpleHandler(ping -> {
            return this.fs.ping();
        }, r37 -> {
            return new RemoteFsResponses.PingFinished();
        }, this.pingPromise));
    }

    private <T extends RemoteFsCommands.FsCommand, R> MessagingHandler<T> simpleHandler(Function<T, Promise<R>> function, Function<R, RemoteFsResponses.FsResponse> function2, PromiseStats promiseStats) {
        return (messaging, fsCommand) -> {
            Promise then = ((Promise) function.apply(fsCommand)).then(obj -> {
                return messaging.send((RemoteFsResponses.FsResponse) function2.apply(obj));
            });
            Objects.requireNonNull(messaging);
            return then.then(messaging::sendEndOfStream).whenComplete(promiseStats.recordStats());
        };
    }

    private <T extends RemoteFsCommands.FsCommand> void onMessage(Class<T> cls, MessagingHandler<T> messagingHandler) {
        this.handlers.put(cls, messagingHandler);
    }

    public String toString() {
        return "ActiveFsServer(" + this.fs + ')';
    }

    @JmxAttribute
    public PromiseStats getUploadPromise() {
        return this.uploadPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadPromise() {
        return this.downloadPromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoPromise() {
        return this.infoPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoAllPromise() {
        return this.infoAllPromise;
    }

    @JmxAttribute
    public PromiseStats getPingPromise() {
        return this.pingPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyPromise() {
        return this.copyPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyAllPromise() {
        return this.copyAllPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getMoveAllPromise() {
        return this.moveAllPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }

    @JmxAttribute
    public PromiseStats getDeleteAllPromise() {
        return this.deleteAllPromise;
    }

    @JmxAttribute
    public PromiseStats getHandleRequestPromise() {
        return this.handleRequestPromise;
    }
}
