package io.activej.ot.uplink;

import io.activej.async.function.AsyncPredicate;
import io.activej.async.util.LogUtils;
import io.activej.common.Utils;
import io.activej.common.function.FunctionEx;
import io.activej.common.ref.Ref;
import io.activej.ot.OTAlgorithms;
import io.activej.ot.OTCommit;
import io.activej.ot.OTCommitFactory;
import io.activej.ot.PollSanitizer;
import io.activej.ot.reducers.DiffsReducer;
import io.activej.ot.repository.OTRepository;
import io.activej.ot.system.OTSystem;
import io.activej.ot.uplink.OTUplink;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/ot/uplink/OTUplinkImpl.class */
public final class OTUplinkImpl<K, D, PC> implements OTUplink<K, D, PC> {
    private static final Logger logger = LoggerFactory.getLogger(OTUplinkImpl.class);
    private final OTSystem<D> otSystem;
    private final OTRepository<K, D> repository;
    private final FunctionEx<OTCommit<K, D>, PC> protoCommitEncoder;
    private final FunctionEx<PC, OTCommit<K, D>> protoCommitDecoder;

    private OTUplinkImpl(OTRepository<K, D> oTRepository, OTSystem<D> oTSystem, FunctionEx<OTCommit<K, D>, PC> functionEx, FunctionEx<PC, OTCommit<K, D>> functionEx2) {
        this.otSystem = oTSystem;
        this.repository = oTRepository;
        this.protoCommitEncoder = functionEx;
        this.protoCommitDecoder = functionEx2;
    }

    public static <K, D, C> OTUplinkImpl<K, D, C> create(OTRepository<K, D> oTRepository, OTSystem<D> oTSystem, FunctionEx<OTCommit<K, D>, C> functionEx, FunctionEx<C, OTCommit<K, D>> functionEx2) {
        return new OTUplinkImpl<>(oTRepository, oTSystem, functionEx, functionEx2);
    }

    public static <K, D> OTUplinkImpl<K, D, OTCommit<K, D>> create(OTRepository<K, D> oTRepository, OTSystem<D> oTSystem) {
        return new OTUplinkImpl<>(oTRepository, oTSystem, oTCommit -> {
            return oTCommit;
        }, oTCommit2 -> {
            return oTCommit2;
        });
    }

    public OTRepository<K, D> getRepository() {
        return this.repository;
    }

    @Override // io.activej.ot.uplink.OTUplink
    public Promise<PC> createProtoCommit(K k, List<D> list, long j) {
        return this.repository.createCommit(k, new OTCommitFactory.DiffsWithLevel<>(j, list)).map(this.protoCommitEncoder).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k, list, Long.valueOf(j)}));
    }

    @Override // io.activej.ot.uplink.OTUplink
    public Promise<OTUplink.FetchData<K, D>> push(PC pc) {
        try {
            OTCommit<K, D> oTCommit = (OTCommit) this.protoCommitDecoder.apply(pc);
            Promise<Void> push = this.repository.push(oTCommit);
            OTRepository<K, D> oTRepository = this.repository;
            Objects.requireNonNull(oTRepository);
            return push.then(oTRepository::getHeads).then(set -> {
                return OTAlgorithms.excludeParents(this.repository, this.otSystem, Utils.union(set, Collections.singleton(oTCommit.getId()))).then(set -> {
                    return OTAlgorithms.mergeAndPush(this.repository, this.otSystem, set);
                }).then(obj -> {
                    Set<K> singleton = Collections.singleton(obj);
                    return this.repository.updateHeads(singleton, Utils.difference(set, singleton)).then(() -> {
                        return doFetch(singleton, oTCommit.getId());
                    });
                });
            }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{pc}));
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            return Promise.ofException(e2);
        }
    }

    @Override // io.activej.ot.uplink.OTUplink
    public Promise<OTUplink.FetchData<K, D>> checkout() {
        Ref ref = new Ref();
        return this.repository.getHeads().then(set -> {
            return OTAlgorithms.findParent(this.repository, this.otSystem, set, DiffsReducer.toList(), oTCommit -> {
                return this.repository.loadSnapshot(oTCommit.getId()).map(optional -> {
                    Object orElse = optional.orElse(null);
                    ref.value = orElse;
                    return Boolean.valueOf(orElse != null);
                });
            });
        }).then(findResult -> {
            return Promise.of(new OTUplink.FetchData(findResult.getChild(), findResult.getChildLevel().longValue(), Utils.concat((List) ref.value, (List) findResult.getAccumulatedDiffs())));
        }).then(fetchData -> {
            return fetch(fetchData.getCommitId()).map(fetchData -> {
                return new OTUplink.FetchData(fetchData.getCommitId(), fetchData.getLevel(), this.otSystem.squash(Utils.concat(fetchData.getDiffs(), fetchData.getDiffs())));
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[0]));
    }

    @Override // io.activej.ot.uplink.OTUplink
    public Promise<OTUplink.FetchData<K, D>> fetch(K k) {
        return this.repository.getHeads().then(set -> {
            return doFetch(set, k);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }

    @Override // io.activej.ot.uplink.OTUplink
    public Promise<OTUplink.FetchData<K, D>> poll(K k) {
        return Promises.retry(Promises.isResultOrError(set -> {
            return !set.contains(k);
        }), PollSanitizer.create(this.repository.pollHeads())).then(set2 -> {
            return doFetch(set2, k);
        });
    }

    private Promise<OTUplink.FetchData<K, D>> doFetch(Set<K> set, K k) {
        return OTAlgorithms.findParent(this.repository, this.otSystem, set, DiffsReducer.toSquashedList(this.otSystem), AsyncPredicate.of(oTCommit -> {
            return oTCommit.getId().equals(k);
        })).map(findResult -> {
            return new OTUplink.FetchData(findResult.getChild(), findResult.getChildLevel().longValue(), this.otSystem.squash((List) findResult.getAccumulatedDiffs()));
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }
}
