package io.activej.fs.cluster;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.service.EventloopService;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.common.api.WithInitializer;
import io.activej.common.collection.CollectionUtils;
import io.activej.common.collection.Try;
import io.activej.common.exception.parse.ParseException;
import io.activej.common.ref.RefBoolean;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.ChannelConsumerTransformer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.fs.ActiveFs;
import io.activej.fs.FileMetadata;
import io.activej.fs.exception.FsIOException;
import io.activej.fs.util.RemoteFsUtils;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/fs/cluster/ClusterActiveFs.class */
public final class ClusterActiveFs implements ActiveFs, WithInitializer<ClusterActiveFs>, EventloopService, EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(ClusterActiveFs.class);
    private final FsPartitions partitions;
    private int deadPartitionsThreshold = 0;
    private int uploadTargetsMin = 1;
    private int uploadTargetsMax = 1;
    private final PromiseStats uploadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats uploadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats appendStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats appendFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadStartPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats downloadFinishPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats listPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats infoPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats infoAllPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats copyPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats copyAllPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats movePromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats moveAllPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats deletePromise = PromiseStats.create(Duration.ofMinutes(5));
    private final PromiseStats deleteAllPromise = PromiseStats.create(Duration.ofMinutes(5));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/fs/cluster/ClusterActiveFs$Container.class */
    public static class Container<T> {
        final Object id;
        final T value;

        Container(Object obj, T t) {
            this.id = obj;
            this.value = t;
        }
    }

    private ClusterActiveFs(FsPartitions fsPartitions) {
        this.partitions = fsPartitions;
    }

    public static ClusterActiveFs create(FsPartitions fsPartitions) {
        return new ClusterActiveFs(fsPartitions);
    }

    public ClusterActiveFs withReplicationCount(int i) {
        Checks.checkArgument(1 <= i && i <= this.partitions.getPartitions().size(), "Replication count cannot be less than one or greater than number of partitions");
        this.deadPartitionsThreshold = i - 1;
        this.uploadTargetsMin = i;
        this.uploadTargetsMax = i;
        return this;
    }

    public ClusterActiveFs withPersistenceOptions(int i, int i2, int i3) {
        Checks.checkArgument(0 <= i && i < this.partitions.getPartitions().size(), "Dead partitions threshold cannot be less than zero or greater than number of partitions");
        Checks.checkArgument(0 <= i2, "Minimum number of upload targets should not be less than zero");
        Checks.checkArgument(0 < i3 && i2 <= i3 && i3 <= this.partitions.getPartitions().size(), "Maximum number of upload targets should be greater than zero, should not be less than minimum number of upload targets andshould not exceed total number of partitions");
        this.deadPartitionsThreshold = i;
        this.uploadTargetsMin = i2;
        this.uploadTargetsMax = i3;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.partitions.getEventloop();
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String str) {
        return doUpload(str, activeFs -> {
            return activeFs.upload(str);
        }, ChannelConsumerTransformer.identity(), this.uploadStartPromise, this.uploadFinishPromise);
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String str, long j) {
        return doUpload(str, activeFs -> {
            return activeFs.upload(str, j);
        }, RemoteFsUtils.ofFixedSize(j), this.uploadStartPromise, this.uploadFinishPromise);
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelConsumer<ByteBuf>> append(@NotNull String str, long j) {
        return doUpload(str, activeFs -> {
            return activeFs.append(str, j);
        }, ChannelConsumerTransformer.identity(), this.appendStartPromise, this.appendFinishPromise);
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String str, long j, long j2) {
        return broadcast((obj, activeFs) -> {
            logger.trace("downloading file {} from {}", str, obj);
            return activeFs.download(str, j, j2).whenException(th -> {
                logger.warn("Failed to connect to a server with key " + obj + " to download file " + str, th);
            }).map(channelSupplier -> {
                return channelSupplier.withEndOfStream(promise -> {
                    return promise.thenEx(this.partitions.wrapDeath(obj));
                });
            });
        }, (v0) -> {
            v0.close();
        }).then(filterErrors(() -> {
            return ofFailure("Could not download file '" + str + "' from any server");
        })).then(list -> {
            ChannelByteCombiner create = ChannelByteCombiner.create();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                create.addInput().set((ChannelSupplier) it.next());
            }
            return Promise.of(create.getOutput().getSupplier());
        }).whenComplete(this.downloadStartPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> copy(@NotNull String str, @NotNull String str2) {
        return super.copy(str, str2).whenComplete(this.copyPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> copyAll(Map<String, String> map) {
        return super.copyAll(map).whenComplete(this.copyAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> move(@NotNull String str, @NotNull String str2) {
        return super.move(str, str2).whenComplete(this.movePromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> moveAll(Map<String, String> map) {
        return super.moveAll(map).whenComplete(this.moveAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> delete(@NotNull String str) {
        return broadcast(activeFs -> {
            return activeFs.delete(str);
        }).whenComplete(this.deletePromise.recordStats()).toVoid();
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> deleteAll(Set<String> set) {
        return broadcast(activeFs -> {
            return activeFs.deleteAll(set);
        }).whenComplete(this.deleteAllPromise.recordStats()).toVoid();
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Map<String, FileMetadata>> list(@NotNull String str) {
        return broadcast(activeFs -> {
            return activeFs.list(str);
        }).then(filterErrors()).map(list -> {
            return FileMetadata.flatten(list.stream());
        }).whenComplete(this.listPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<FileMetadata> info(@NotNull String str) {
        return broadcast(activeFs -> {
            return activeFs.info(str);
        }).then(filterErrors()).map(list -> {
            return (FileMetadata) list.stream().max(FileMetadata.COMPARATOR).orElse(null);
        }).whenComplete(this.infoPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Map<String, FileMetadata>> infoAll(@NotNull Set<String> set) {
        return set.isEmpty() ? Promise.of(Collections.emptyMap()) : broadcast(activeFs -> {
            return activeFs.infoAll(set);
        }).then(filterErrors()).map(list -> {
            return FileMetadata.flatten(list.stream());
        }).whenComplete(this.infoAllPromise.recordStats());
    }

    @Override // io.activej.fs.ActiveFs
    public Promise<Void> ping() {
        return this.partitions.checkAllPartitions().then(this::checkNotDead);
    }

    @NotNull
    public Promise<Void> start() {
        return ping();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public String toString() {
        return "ClusterActiveFs{partitions=" + this.partitions + '}';
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Promise<T> ofFailure(String str) {
        return Promise.ofException(new FsIOException(ClusterActiveFs.class, str));
    }

    private <T> Promise<T> checkStillNotDead(T t) {
        Map<Object, ActiveFs> deadPartitions = this.partitions.getDeadPartitions();
        return deadPartitions.size() > this.deadPartitionsThreshold ? ofFailure("There are more dead partitions than allowed(" + deadPartitions.size() + " dead, threshold is " + this.deadPartitionsThreshold + "), aborting") : Promise.of(t);
    }

    private Promise<Void> checkNotDead() {
        return checkStillNotDead(null);
    }

    private Promise<ChannelConsumer<ByteBuf>> doUpload(String str, Function<ActiveFs, Promise<ChannelConsumer<ByteBuf>>> function, ChannelConsumerTransformer<ByteBuf, ChannelConsumer<ByteBuf>> channelConsumerTransformer, PromiseStats promiseStats, PromiseStats promiseStats2) {
        return checkNotDead().then(() -> {
            return collect(str, function);
        }).then(list -> {
            ChannelByteSplitter create = ChannelByteSplitter.create(this.uploadTargetsMin);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                create.addOutput().set((ChannelConsumer) ((Container) it.next()).value);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("uploading file {} to {}, {}", new Object[]{str, list.stream().map(container -> {
                    return container.id.toString();
                }).collect(Collectors.joining(", ", "[", "]")), this});
            }
            return Promise.of((ChannelConsumer) create.getInput().getConsumer().transformWith(channelConsumerTransformer)).whenComplete(promiseStats2.recordStats());
        }).whenComplete(promiseStats.recordStats());
    }

    private Promise<List<Container<ChannelConsumer<ByteBuf>>>> collect(String str, Function<ActiveFs, Promise<ChannelConsumer<ByteBuf>>> function) {
        Iterator<Object> it = this.partitions.select(str).iterator();
        HashSet hashSet = new HashSet();
        RefBoolean refBoolean = new RefBoolean(false);
        return Promises.toList(Stream.generate(() -> {
            return Promises.first(CollectionUtils.transformIterator(it, obj -> {
                return call(obj, function).whenResult(channelConsumer -> {
                    if (refBoolean.get()) {
                        channelConsumer.close();
                    } else {
                        hashSet.add(channelConsumer);
                    }
                }).map(channelConsumer2 -> {
                    return new Container(obj, channelConsumer2.withAcknowledgement(promise -> {
                        return promise.thenEx(this.partitions.wrapDeath(obj));
                    }));
                });
            }));
        }).limit(this.uploadTargetsMax)).thenEx((list, th) -> {
            if (th == null) {
                return Promise.of(list);
            }
            hashSet.forEach((v0) -> {
                v0.close();
            });
            refBoolean.set(true);
            return ofFailure("Didn't connect to enough partitions to upload '" + str + '\'');
        });
    }

    private <T> Promise<T> call(Object obj, Function<ActiveFs, Promise<T>> function) {
        return call(obj, (obj2, activeFs) -> {
            return (Promise) function.apply(activeFs);
        });
    }

    private <T> Promise<T> call(Object obj, BiFunction<Object, ActiveFs, Promise<T>> biFunction) {
        ActiveFs activeFs = this.partitions.get(obj);
        return activeFs == null ? Promise.ofException(new FsIOException(ClusterActiveFs.class, "Partition '" + obj + "' is not alive")) : biFunction.apply(obj, activeFs).thenEx(this.partitions.wrapDeath(obj));
    }

    private <T> Promise<List<Try<T>>> broadcast(BiFunction<Object, ActiveFs, Promise<T>> biFunction, Consumer<T> consumer) {
        return checkNotDead().then(() -> {
            return Promise.ofCallback(settablePromise -> {
                Promises.toList(this.partitions.getAlivePartitions().entrySet().stream().map(entry -> {
                    return ((Promise) biFunction.apply(entry.getKey(), (ActiveFs) entry.getValue())).thenEx(this.partitions.wrapDeath(entry.getKey())).whenResult(obj -> {
                        if (settablePromise.isComplete()) {
                            consumer.accept(obj);
                        }
                    }).toTry().then(r6 -> {
                        return checkStillNotDead(r6).whenException(th -> {
                            r6.ifSuccess(consumer);
                        });
                    });
                })).whenComplete(settablePromise);
            });
        });
    }

    private <T> Function<List<Try<T>>, Promise<List<T>>> filterErrors() {
        return filterErrors(() -> {
            return Promise.of(Collections.emptyList());
        });
    }

    private <T> Function<List<Try<T>>, Promise<List<T>>> filterErrors(AsyncSupplier<List<T>> asyncSupplier) {
        return list -> {
            List list = (List) list.stream().filter((v0) -> {
                return v0.isSuccess();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                return Promise.of(list);
            }
            List list2 = (List) list.stream().filter((v0) -> {
                return v0.isException();
            }).map((v0) -> {
                return v0.getException();
            }).collect(Collectors.toList());
            if (!list2.isEmpty()) {
                Throwable th = (Throwable) list2.get(0);
                if (list2.stream().skip(1L).allMatch(th2 -> {
                    return th2 == th;
                })) {
                    return Promise.ofException(th);
                }
            }
            return asyncSupplier.get();
        };
    }

    private <T> Promise<List<Try<T>>> broadcast(Function<ActiveFs, Promise<T>> function) {
        return broadcast((obj, activeFs) -> {
            return (Promise) function.apply(activeFs);
        }, obj2 -> {
        });
    }

    @JmxAttribute
    public int getDeadPartitionsThreshold() {
        return this.deadPartitionsThreshold;
    }

    @JmxAttribute
    public int getUploadTargetsMin() {
        return this.uploadTargetsMin;
    }

    @JmxAttribute
    public int getUploadTargetsMax() {
        return this.uploadTargetsMax;
    }

    @JmxOperation
    public void setReplicationCount(int i) {
        withReplicationCount(i);
    }

    @JmxAttribute
    public void setDeadPartitionsThreshold(int i) {
        withPersistenceOptions(i, this.uploadTargetsMin, this.uploadTargetsMax);
    }

    @JmxAttribute
    public void setUploadTargetsMin(int i) {
        withPersistenceOptions(this.deadPartitionsThreshold, i, this.uploadTargetsMax);
    }

    @JmxAttribute
    public void setUploadTargetsMax(int i) {
        withPersistenceOptions(this.deadPartitionsThreshold, this.uploadTargetsMin, i);
    }

    @JmxAttribute
    public int getAlivePartitionCount() {
        return this.partitions.getAlivePartitions().size();
    }

    @JmxAttribute
    public int getDeadPartitionCount() {
        return this.partitions.getDeadPartitions().size();
    }

    @JmxAttribute
    public String[] getAlivePartitions() {
        return (String[]) this.partitions.getAlivePartitions().keySet().stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @JmxAttribute
    public String[] getDeadPartitions() {
        return (String[]) this.partitions.getDeadPartitions().keySet().stream().map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @JmxAttribute
    public PromiseStats getUploadStartPromise() {
        return this.uploadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getUploadFinishPromise() {
        return this.uploadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getAppendStartPromise() {
        return this.appendStartPromise;
    }

    @JmxAttribute
    public PromiseStats getAppendFinishPromise() {
        return this.appendFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadStartPromise() {
        return this.downloadStartPromise;
    }

    @JmxAttribute
    public PromiseStats getDownloadFinishPromise() {
        return this.downloadFinishPromise;
    }

    @JmxAttribute
    public PromiseStats getListPromise() {
        return this.listPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoPromise() {
        return this.infoPromise;
    }

    @JmxAttribute
    public PromiseStats getInfoAllPromise() {
        return this.infoAllPromise;
    }

    @JmxAttribute
    public PromiseStats getDeletePromise() {
        return this.deletePromise;
    }

    @JmxAttribute
    public PromiseStats getDeleteAllPromise() {
        return this.deleteAllPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyPromise() {
        return this.copyPromise;
    }

    @JmxAttribute
    public PromiseStats getCopyAllPromise() {
        return this.copyAllPromise;
    }

    @JmxAttribute
    public PromiseStats getMovePromise() {
        return this.movePromise;
    }

    @JmxAttribute
    public PromiseStats getMoveAllPromise() {
        return this.moveAllPromise;
    }

    @JmxAttribute(name = "")
    public FsPartitions getPartitions() {
        return this.partitions;
    }

    @JmxOperation
    public void setPartitions(String str) throws ParseException {
        this.partitions.setPartitions((List) Arrays.stream(str.split(";")).map((v0) -> {
            return v0.trim();
        }).filter(str2 -> {
            return !str2.isEmpty();
        }).collect(Collectors.toList()));
    }
}
