package io.activej.crdt.storage.local;

import io.activej.async.service.EventloopService;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.initializer.WithInitializer;
import io.activej.crdt.CrdtData;
import io.activej.crdt.function.CrdtFilter;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.CrdtDataSerializer;
import io.activej.crdt.util.Utils;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.StreamFilter;
import io.activej.datastream.processor.StreamReducer;
import io.activej.datastream.processor.StreamReducers;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.fs.ActiveFs;
import io.activej.fs.ActiveFsAdapters;
import io.activej.fs.FileMetadata;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import java.lang.Comparable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/crdt/storage/local/CrdtStorageFs.class */
public final class CrdtStorageFs<K extends Comparable<K>, S> implements CrdtStorage<K, S>, WithInitializer<CrdtStorageFs<K, S>>, EventloopService, EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CrdtStorageFs.class);
    private final Eventloop eventloop;
    private final ActiveFs fs;
    private final CrdtFunction<S> function;
    private final CrdtDataSerializer<K, S> serializer;
    private ActiveFs consolidationFolderFs;
    private ActiveFs tombstoneFolderFs;
    private boolean detailedStats;
    private Function<String, String> namingStrategy = str -> {
        return UUID.randomUUID() + "." + str;
    };
    private Duration consolidationMargin = Duration.ofMinutes(30);
    private CrdtFilter<S> filter = obj -> {
        return true;
    };
    private final StreamStatsBasic<CrdtData<K, S>> uploadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> uploadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<CrdtData<K, S>> downloadStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> downloadStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();
    private final PromiseStats consolidationStats = PromiseStats.create(Duration.ofMinutes(5));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/crdt/storage/local/CrdtStorageFs$CrdtAccumulator.class */
    public static class CrdtAccumulator<S> {

        @Nullable
        S state;
        long maxAppendTimestamp;
        long maxRemoveTimestamp;

        CrdtAccumulator(@Nullable S s, long j, long j2) {
            this.state = s;
            this.maxAppendTimestamp = j;
            this.maxRemoveTimestamp = j2;
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/local/CrdtStorageFs$CrdtReducer.class */
    class CrdtReducer implements StreamReducers.Reducer<K, CrdtReducingData<K, S>, CrdtData<K, S>, CrdtAccumulator<S>> {
        CrdtReducer() {
        }

        public CrdtAccumulator<S> onFirstItem(StreamDataAcceptor<CrdtData<K, S>> streamDataAcceptor, K k, CrdtReducingData<K, S> crdtReducingData) {
            return crdtReducingData.state != null ? new CrdtAccumulator<>(crdtReducingData.state, crdtReducingData.timestamp, 0L) : new CrdtAccumulator<>(null, 0L, crdtReducingData.timestamp);
        }

        public CrdtAccumulator<S> onNextItem(StreamDataAcceptor<CrdtData<K, S>> streamDataAcceptor, K k, CrdtReducingData<K, S> crdtReducingData, CrdtAccumulator<S> crdtAccumulator) {
            if (crdtReducingData.state != null) {
                crdtAccumulator.state = crdtAccumulator.state != null ? (S) CrdtStorageFs.this.function.merge(crdtAccumulator.state, crdtReducingData.state) : crdtReducingData.state;
                if (crdtReducingData.timestamp > crdtAccumulator.maxAppendTimestamp) {
                    crdtAccumulator.maxAppendTimestamp = crdtReducingData.timestamp;
                }
            } else if (crdtReducingData.timestamp > crdtAccumulator.maxRemoveTimestamp) {
                crdtAccumulator.maxRemoveTimestamp = crdtReducingData.timestamp;
            }
            return crdtAccumulator;
        }

        public void onComplete(StreamDataAcceptor<CrdtData<K, S>> streamDataAcceptor, K k, CrdtAccumulator<S> crdtAccumulator) {
            if (crdtAccumulator.state == null || crdtAccumulator.maxRemoveTimestamp >= crdtAccumulator.maxAppendTimestamp || !CrdtStorageFs.this.filter.test(crdtAccumulator.state)) {
                return;
            }
            streamDataAcceptor.accept(new CrdtData(k, crdtAccumulator.state));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void onComplete(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2) {
            onComplete((StreamDataAcceptor<CrdtData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtAccumulator) obj2);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object onNextItem(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2, Object obj3) {
            return onNextItem((StreamDataAcceptor<CrdtData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtReducingData<StreamDataAcceptor, S>) obj2, (CrdtAccumulator) obj3);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object onFirstItem(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2) {
            return onFirstItem((StreamDataAcceptor<CrdtData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtReducingData<StreamDataAcceptor, S>) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/crdt/storage/local/CrdtStorageFs$CrdtReducingData.class */
    public static class CrdtReducingData<K extends Comparable<K>, S> {
        final K key;

        @Nullable
        final S state;
        final long timestamp;

        CrdtReducingData(K k, @Nullable S s, long j) {
            this.key = k;
            this.state = s;
            this.timestamp = j;
        }
    }

    private CrdtStorageFs(Eventloop eventloop, ActiveFs activeFs, ActiveFs activeFs2, ActiveFs activeFs3, CrdtDataSerializer<K, S> crdtDataSerializer, CrdtFunction<S> crdtFunction) {
        this.eventloop = eventloop;
        this.fs = activeFs;
        this.function = crdtFunction;
        this.serializer = crdtDataSerializer;
        this.consolidationFolderFs = activeFs2;
        this.tombstoneFolderFs = activeFs3;
    }

    public static <K extends Comparable<K>, S> CrdtStorageFs<K, S> create(Eventloop eventloop, ActiveFs activeFs, CrdtDataSerializer<K, S> crdtDataSerializer, CrdtFunction<S> crdtFunction) {
        return new CrdtStorageFs<>(eventloop, activeFs, ActiveFsAdapters.subdirectory(activeFs, ".consolidation"), ActiveFsAdapters.subdirectory(activeFs, ".tombstones"), crdtDataSerializer, crdtFunction);
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> CrdtStorageFs<K, S> create(Eventloop eventloop, ActiveFs activeFs, CrdtDataSerializer<K, S> crdtDataSerializer) {
        return new CrdtStorageFs<>(eventloop, activeFs, ActiveFsAdapters.subdirectory(activeFs, ".consolidation"), ActiveFsAdapters.subdirectory(activeFs, ".tombstones"), crdtDataSerializer, CrdtFunction.ofCrdtType());
    }

    public CrdtStorageFs<K, S> withConsolidationMargin(Duration duration) {
        this.consolidationMargin = duration;
        return this;
    }

    public CrdtStorageFs<K, S> withNamingStrategy(Function<String, String> function) {
        this.namingStrategy = function;
        return this;
    }

    public CrdtStorageFs<K, S> withConsolidationFolder(String str) {
        this.consolidationFolderFs = ActiveFsAdapters.subdirectory(this.fs, str);
        return this;
    }

    public CrdtStorageFs<K, S> withTombstoneFolder(String str) {
        this.tombstoneFolderFs = ActiveFsAdapters.subdirectory(this.fs, str);
        return this;
    }

    public CrdtStorageFs<K, S> withConsolidationFolderClient(ActiveFs activeFs) {
        this.consolidationFolderFs = activeFs;
        return this;
    }

    public CrdtStorageFs<K, S> withFilter(CrdtFilter<S> crdtFilter) {
        this.filter = crdtFilter;
        return this;
    }

    public CrdtStorageFs<K, S> withTombstoneFolderClient(ActiveFs activeFs) {
        this.tombstoneFolderFs = activeFs;
        return this;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        return this.fs.upload(this.namingStrategy.apply("bin")).map(channelConsumer -> {
            return StreamConsumer.ofSupplier(streamSupplier -> {
                return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats)).transformWith(ChannelSerializer.create(this.serializer))).streamTo(channelConsumer);
            }).withAcknowledgement(promise -> {
                return promise.thenEx(Utils.wrapException(() -> {
                    return "Error while uploading CRDT data to file";
                }));
            });
        }).thenEx(Utils.wrapException(() -> {
            return "Failed to upload CRDT data to file";
        }));
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        return Promises.toTuple(this.fs.list("*"), this.tombstoneFolderFs.list("*")).map(tuple2 -> {
            StreamReducer create = StreamReducer.create();
            Promises.all(Stream.concat(((Map) tuple2.getValue1()).entrySet().stream().filter(entry -> {
                return j == 0 || ((FileMetadata) entry.getValue()).getTimestamp() >= j;
            }).map(entry2 -> {
                return ((StreamSupplier) ((StreamSupplier) ((StreamSupplier) ChannelSupplier.ofPromise(this.fs.download((String) entry2.getKey())).transformWith(ChannelDeserializer.create(this.serializer))).transformWith(StreamFilter.mapper(crdtData -> {
                    Object extract = this.function.extract(crdtData.getState(), j);
                    if (extract != null) {
                        return new CrdtReducingData(crdtData.getKey(), extract, ((FileMetadata) entry2.getValue()).getTimestamp());
                    }
                    return null;
                }))).transformWith(StreamFilter.create((v0) -> {
                    return Objects.nonNull(v0);
                }))).streamTo(create.newInput(crdtReducingData -> {
                    return crdtReducingData.key;
                }, new CrdtReducer()));
            }), ((Map) tuple2.getValue2()).entrySet().stream().filter(entry3 -> {
                return j == 0 || ((FileMetadata) entry3.getValue()).getTimestamp() >= j;
            }).map(entry4 -> {
                return ((StreamSupplier) ((StreamSupplier) ChannelSupplier.ofPromise(this.tombstoneFolderFs.download((String) entry4.getKey())).transformWith(ChannelDeserializer.create(this.serializer.getKeySerializer()))).transformWith(StreamFilter.mapper(comparable -> {
                    return new CrdtReducingData(comparable, null, ((FileMetadata) entry4.getValue()).getTimestamp());
                }))).streamTo(create.newInput(crdtReducingData -> {
                    return crdtReducingData.key;
                }, new CrdtReducer()));
            })));
            return ((StreamSupplier) create.getOutput().transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats)).withEndOfStream(promise -> {
                return promise.thenEx(Utils.wrapException(() -> {
                    return "Error while downloading CRDT data";
                }));
            });
        }).thenEx(Utils.wrapException(() -> {
            return "Failed to download CRDT data";
        }));
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<StreamConsumer<K>> remove() {
        return this.tombstoneFolderFs.upload(this.namingStrategy.apply("tomb")).map(channelConsumer -> {
            return StreamConsumer.ofSupplier(streamSupplier -> {
                return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats)).transformWith(ChannelSerializer.create(this.serializer.getKeySerializer()))).streamTo(channelConsumer);
            }).withAcknowledgement(promise -> {
                return promise.thenEx(Utils.wrapException(() -> {
                    return "Error while removing CRDT data";
                }));
            });
        }).thenEx(Utils.wrapException(() -> {
            return "Failed to remove CRDT data";
        }));
    }

    @Override // io.activej.crdt.storage.CrdtStorage
    public Promise<Void> ping() {
        return this.fs.ping().thenEx(Utils.wrapException(() -> {
            return "Failed to PING file system";
        }));
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public Promise<Void> consolidate() {
        long epochMilli = this.eventloop.currentInstant().minus((TemporalAmount) this.consolidationMargin).toEpochMilli();
        HashSet hashSet = new HashSet();
        return this.consolidationFolderFs.list("*").then(map -> {
            return Promises.all(map.entrySet().stream().filter(entry -> {
                return ((FileMetadata) entry.getValue()).getTimestamp() > epochMilli;
            }).map(entry2 -> {
                return ChannelSupplier.ofPromise(this.consolidationFolderFs.download((String) entry2.getKey())).toCollector(ByteBufs.collector()).whenResult(byteBuf -> {
                    hashSet.addAll(Arrays.asList(byteBuf.asString(StandardCharsets.UTF_8).split("\n")));
                }).toVoid();
            }));
        }).then(() -> {
            return this.fs.list("*");
        }).then(map2 -> {
            String apply = this.namingStrategy.apply("bin");
            List list = (List) map2.keySet().stream().filter(str -> {
                return !hashSet.contains(str);
            }).collect(Collectors.toList());
            String join = String.join("\n", list);
            logger.info("started consolidating into {} from {}", apply, list);
            String apply2 = this.namingStrategy.apply("dump");
            return this.consolidationFolderFs.upload(apply2).then(channelConsumer -> {
                return ChannelSupplier.of(ByteBuf.wrapForReading(join.getBytes(StandardCharsets.UTF_8))).streamTo(channelConsumer);
            }).then(() -> {
                return download();
            }).then(streamSupplier -> {
                return ((ChannelSupplier) streamSupplier.transformWith(ChannelSerializer.create(this.serializer))).streamTo(ChannelConsumer.ofPromise(this.fs.upload(apply)));
            }).then(() -> {
                return this.tombstoneFolderFs.list("*").map(map2 -> {
                    return this.tombstoneFolderFs.deleteAll(map2.keySet());
                });
            }).then(() -> {
                return this.consolidationFolderFs.delete(apply2);
            }).then(() -> {
                return this.fs.deleteAll(new HashSet(list));
            });
        }).thenEx(Utils.wrapException(() -> {
            return "Consolidation failed";
        })).whenComplete(this.consolidationStats.recordStats());
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public StreamStatsBasic getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public PromiseStats getConsolidationStats() {
        return this.consolidationStats;
    }
}
