package io.activej.crdt;

import io.activej.async.service.EventloopService;
import io.activej.common.ApplicationSettings;
import io.activej.crdt.CrdtMessaging;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.CrdtDataSerializer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.binary.Utils;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.eventloop.net.SocketSettings;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promise;
import io.activej.serializer.BinarySerializer;
import java.lang.Comparable;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/crdt/CrdtStorageClient.class */
public final class CrdtStorageClient<K extends Comparable<K>, S> implements CrdtStorage<K, S>, EventloopService, EventloopJmxBeanEx {
    public static final SocketSettings DEFAULT_SOCKET_SETTINGS = SocketSettings.createDefault();
    public static final Duration DEFAULT_CONNECT_TIMEOUT = ApplicationSettings.getDuration(CrdtStorageClient.class, "connectTimeout", Duration.ZERO);
    private static final ByteBufsCodec<CrdtMessaging.CrdtResponse, CrdtMessaging.CrdtMessage> SERIALIZER = Utils.nullTerminated().andThen(byteBuf -> {
        try {
            return (CrdtMessaging.CrdtResponse) io.activej.crdt.util.Utils.fromJson(CrdtMessaging.CrdtResponse.class, byteBuf);
        } finally {
            byteBuf.recycle();
        }
    }, crdtMessage -> {
        return io.activej.crdt.util.Utils.toJson(CrdtMessaging.CrdtMessage.class, crdtMessage);
    });
    private final Eventloop eventloop;
    private final InetSocketAddress address;
    private final CrdtDataSerializer<K, S> serializer;
    private final BinarySerializer<K> keySerializer;
    private boolean detailedStats;
    private long connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT.toMillis();
    private SocketSettings socketSettings = DEFAULT_SOCKET_SETTINGS;
    private final StreamStatsBasic<CrdtData<K, S>> uploadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> uploadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<CrdtData<K, S>> downloadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> downloadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();

    private CrdtStorageClient(Eventloop eventloop, InetSocketAddress inetSocketAddress, CrdtDataSerializer<K, S> crdtDataSerializer) {
        this.eventloop = eventloop;
        this.address = inetSocketAddress;
        this.serializer = crdtDataSerializer;
        this.keySerializer = crdtDataSerializer.getKeySerializer();
    }

    public static <K extends Comparable<K>, S> CrdtStorageClient<K, S> create(Eventloop eventloop, InetSocketAddress inetSocketAddress, CrdtDataSerializer<K, S> crdtDataSerializer) {
        return new CrdtStorageClient<>(eventloop, inetSocketAddress, crdtDataSerializer);
    }

    public static <K extends Comparable<K>, S> CrdtStorageClient<K, S> create(Eventloop eventloop, InetSocketAddress inetSocketAddress, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2) {
        return new CrdtStorageClient<>(eventloop, inetSocketAddress, new CrdtDataSerializer(binarySerializer, binarySerializer2));
    }

    public CrdtStorageClient<K, S> withConnectTimeout(Duration duration) {
        this.connectTimeoutMillis = duration.toMillis();
        return this;
    }

    public CrdtStorageClient<K, S> withSocketSettings(SocketSettings socketSettings) {
        this.socketSettings = socketSettings;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(CrdtMessaging.CrdtMessages.UPLOAD).thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                return "Failed to send 'Upload' message";
            })).map(r6 -> {
                ChannelConsumer withAcknowledgement = messagingWithBinaryStreaming.sendBinaryStream().withAcknowledgement(promise -> {
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    return promise.then(messagingWithBinaryStreaming::receive).then(simpleHandler(CrdtMessaging.CrdtResponses.UPLOAD_FINISHED));
                });
                return StreamConsumer.ofSupplier(streamSupplier -> {
                    return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats)).transformWith(ChannelSerializer.create(this.serializer))).streamTo(withAcknowledgement);
                }).withAcknowledgement(promise2 -> {
                    return promise2.thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                        return "Upload failed";
                    }));
                });
            });
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(new CrdtMessaging.Download(j)).thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                return "Failed to send 'Download' message";
            })).then(() -> {
                return messagingWithBinaryStreaming.receive().thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                    return "Failed to receive response";
                }));
            }).then(crdtResponse -> {
                return crdtResponse == CrdtMessaging.CrdtResponses.DOWNLOAD_STARTED ? Promise.complete() : crdtResponse.getClass() == CrdtMessaging.ServerError.class ? Promise.ofException(new CrdtException(((CrdtMessaging.ServerError) crdtResponse).getMsg())) : Promise.ofException(new CrdtException("Received message " + crdtResponse + " instead of " + CrdtMessaging.CrdtResponses.DOWNLOAD_STARTED));
            }).map(r5 -> {
                return ((StreamSupplier) ((StreamSupplier) messagingWithBinaryStreaming.receiveBinaryStream().transformWith(ChannelDeserializer.create(this.serializer))).transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats)).withEndOfStream(promise -> {
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    return promise.then(messagingWithBinaryStreaming::sendEndOfStream).thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                        return "Download failed";
                    })).whenComplete((r4, th) -> {
                        if (th == null) {
                            messagingWithBinaryStreaming.close();
                        } else {
                            messagingWithBinaryStreaming.closeEx(th);
                        }
                    });
                });
            });
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<K>> remove() {
        return connect().then(messagingWithBinaryStreaming -> {
            return messagingWithBinaryStreaming.send(CrdtMessaging.CrdtMessages.REMOVE).thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                return "Failed to send 'Remove' message";
            })).map(r6 -> {
                ChannelConsumer withAcknowledgement = messagingWithBinaryStreaming.sendBinaryStream().withAcknowledgement(promise -> {
                    Objects.requireNonNull(messagingWithBinaryStreaming);
                    return promise.then(messagingWithBinaryStreaming::receive).then(simpleHandler(CrdtMessaging.CrdtResponses.REMOVE_FINISHED));
                });
                return StreamConsumer.ofSupplier(streamSupplier -> {
                    return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats)).transformWith(ChannelSerializer.create(this.keySerializer))).streamTo(withAcknowledgement);
                }).withAcknowledgement(promise2 -> {
                    return promise2.thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                        return "Remove operation failed";
                    }));
                });
            });
        });
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<Void> ping() {
        return connect().then(messagingWithBinaryStreaming -> {
            Promise then = messagingWithBinaryStreaming.send(CrdtMessaging.CrdtMessages.PING).thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                return "Failed to send 'Ping'";
            })).then(() -> {
                return messagingWithBinaryStreaming.receive().thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
                    return "Failed to receive 'Pong'";
                }));
            }).then(simpleHandler(CrdtMessaging.CrdtResponses.PONG));
            Objects.requireNonNull(messagingWithBinaryStreaming);
            Promise whenResult = then.whenResult(messagingWithBinaryStreaming::close);
            Objects.requireNonNull(messagingWithBinaryStreaming);
            return whenResult.whenException(messagingWithBinaryStreaming::closeEx);
        });
    }

    @NotNull
    public Promise<Void> start() {
        return ping();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    private Function<CrdtMessaging.CrdtResponse, Promise<Void>> simpleHandler(CrdtMessaging.CrdtResponse crdtResponse) {
        return crdtResponse2 -> {
            return crdtResponse2 == crdtResponse ? Promise.complete() : crdtResponse2 instanceof CrdtMessaging.ServerError ? Promise.ofException(new CrdtException(((CrdtMessaging.ServerError) crdtResponse2).getMsg())) : Promise.ofException(new CrdtException("Received message " + crdtResponse2 + " instead of " + crdtResponse));
        };
    }

    private Promise<MessagingWithBinaryStreaming<CrdtMessaging.CrdtResponse, CrdtMessaging.CrdtMessage>> connect() {
        return AsyncTcpSocketNio.connect(this.address, this.connectTimeoutMillis, this.socketSettings).map(asyncTcpSocketNio -> {
            return MessagingWithBinaryStreaming.create(asyncTcpSocketNio, SERIALIZER);
        }).thenEx(io.activej.crdt.util.Utils.wrapException(() -> {
            return "Failed to connect to " + this.address;
        }));
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public StreamStatsBasic getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }
}
