package org.janusgraph.graphdb.log;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.log.Change;
import org.janusgraph.core.log.ChangeProcessor;
import org.janusgraph.core.log.LogProcessorBuilder;
import org.janusgraph.core.log.LogProcessorFramework;
import org.janusgraph.core.schema.JanusGraphSchemaElement;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.ReadBuffer;
import org.janusgraph.diskstorage.log.Log;
import org.janusgraph.diskstorage.log.Message;
import org.janusgraph.diskstorage.log.MessageReader;
import org.janusgraph.diskstorage.log.ReadMarker;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.log.LogTxMeta;
import org.janusgraph.graphdb.database.log.TransactionLogHeader;
import org.janusgraph.graphdb.database.serialize.Serializer;
import org.janusgraph.graphdb.internal.ElementLifeCycle;
import org.janusgraph.graphdb.internal.InternalRelation;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.BaseKey;
import org.janusgraph.graphdb.vertices.StandardVertex;
import org.janusgraph.util.datastructures.ExceptionWrapper;
import org.janusgraph.util.system.ExecuteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/janusgraph/graphdb/log/StandardLogProcessorFramework.class */
public class StandardLogProcessorFramework implements LogProcessorFramework {
    private static final Logger logger = LoggerFactory.getLogger(StandardLogProcessorFramework.class);
    private final StandardJanusGraph graph;
    private final Serializer serializer;
    private final TimestampProvider times;
    private final Map<String, Log> processorLogs;
    private boolean isOpen = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/janusgraph/graphdb/log/StandardLogProcessorFramework$Builder.class */
    public class Builder implements LogProcessorBuilder {
        private final String userLogName;
        private final List<ChangeProcessor> processors;
        private String readMarkerName;
        private Instant startTime;
        private int retryAttempts;

        private Builder(String str) {
            this.readMarkerName = null;
            this.startTime = null;
            this.retryAttempts = 1;
            Preconditions.checkArgument(StringUtils.isNotBlank(str));
            this.userLogName = str;
            this.processors = new ArrayList();
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public String getLogIdentifier() {
            return this.userLogName;
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public LogProcessorBuilder setProcessorIdentifier(String str) {
            Preconditions.checkArgument(StringUtils.isNotBlank(str));
            this.readMarkerName = str;
            return this;
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public LogProcessorBuilder setStartTime(Instant instant) {
            this.startTime = instant;
            return this;
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public LogProcessorBuilder setStartTimeNow() {
            this.startTime = null;
            return this;
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public LogProcessorBuilder addProcessor(ChangeProcessor changeProcessor) {
            Preconditions.checkArgument(changeProcessor != null);
            this.processors.add(changeProcessor);
            return this;
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public LogProcessorBuilder setRetryAttempts(int i) {
            Preconditions.checkArgument(i > 0, "Invalid number: %s", i);
            this.retryAttempts = i;
            return this;
        }

        @Override // org.janusgraph.core.log.LogProcessorBuilder
        public void build() {
            Preconditions.checkArgument(!this.processors.isEmpty(), "Must add at least one processor");
            ReadMarker fromNow = (this.startTime == null && this.readMarkerName == null) ? ReadMarker.fromNow() : this.readMarkerName == null ? ReadMarker.fromTime(this.startTime) : this.startTime == null ? ReadMarker.fromIdentifierOrNow(this.readMarkerName) : ReadMarker.fromIdentifierOrTime(this.readMarkerName, this.startTime);
            synchronized (StandardLogProcessorFramework.this) {
                Preconditions.checkArgument(!StandardLogProcessorFramework.this.processorLogs.containsKey(this.userLogName), "Processors have already been registered for user log: %s", this.userLogName);
                try {
                    Log userLog = StandardLogProcessorFramework.this.graph.getBackend().getUserLog(this.userLogName);
                    userLog.registerReaders(fromNow, Iterables.transform(this.processors, new Function<ChangeProcessor, MessageReader>() { // from class: org.janusgraph.graphdb.log.StandardLogProcessorFramework.Builder.1
                        @Nullable
                        public MessageReader apply(@Nullable ChangeProcessor changeProcessor) {
                            return new MsgReaderConverter(Builder.this.userLogName, changeProcessor, Builder.this.retryAttempts);
                        }
                    }));
                    StandardLogProcessorFramework.this.processorLogs.put(this.userLogName, userLog);
                } catch (BackendException e) {
                    throw new JanusGraphException("Could not open user transaction log for name: " + this.userLogName, e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/janusgraph/graphdb/log/StandardLogProcessorFramework$MsgReaderConverter.class */
    public class MsgReaderConverter implements MessageReader {
        private final String userlogName;
        private final ChangeProcessor processor;
        private final int retryAttempts;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MsgReaderConverter(String str, ChangeProcessor changeProcessor, int i) {
            this.userlogName = str;
            this.processor = changeProcessor;
            this.retryAttempts = i;
        }

        private void readRelations(TransactionLogHeader.Entry entry, StandardJanusGraphTx standardJanusGraphTx, StandardChangeState standardChangeState) {
            for (TransactionLogHeader.Modification modification : entry.getContentAsModifications(StandardLogProcessorFramework.this.serializer)) {
                InternalRelation parseRelation = ModificationDeserializer.parseRelation(modification, standardJanusGraphTx);
                Change change = modification.state;
                if (parseRelation.getType().equals(BaseKey.VertexExists) && !(parseRelation.getVertex(0) instanceof JanusGraphSchemaElement)) {
                    if (change == Change.REMOVED) {
                        ((StandardVertex) parseRelation.getVertex(0)).updateLifeCycle(ElementLifeCycle.Event.REMOVED);
                    }
                    standardChangeState.addVertex(parseRelation.getVertex(0), change);
                } else if (!parseRelation.isInvisible()) {
                    standardChangeState.addRelation(parseRelation, change);
                }
            }
        }

        @Override // org.janusgraph.diskstorage.log.MessageReader
        public void read(Message message) {
            for (int i = 1; i <= this.retryAttempts; i++) {
                StandardJanusGraphTx standardJanusGraphTx = (StandardJanusGraphTx) StandardLogProcessorFramework.this.graph.newTransaction();
                StandardChangeState standardChangeState = new StandardChangeState();
                try {
                    ReadBuffer asReadBuffer = message.getContent().asReadBuffer();
                    String senderId = message.getSenderId();
                    TransactionLogHeader.Entry parse = TransactionLogHeader.parse(asReadBuffer, StandardLogProcessorFramework.this.serializer, StandardLogProcessorFramework.this.times);
                    StandardTransactionId standardTransactionId = parse.getMetadata().containsKey(LogTxMeta.SOURCE_TRANSACTION) ? (StandardTransactionId) parse.getMetadata().get(LogTxMeta.SOURCE_TRANSACTION) : new StandardTransactionId(senderId, parse.getHeader().getId(), parse.getHeader().getTimestamp());
                    readRelations(parse, standardJanusGraphTx, standardChangeState);
                    if (!$assertionsDisabled && standardTransactionId == null) {
                        throw new AssertionError();
                    }
                    try {
                        try {
                            this.processor.process(standardJanusGraphTx, standardTransactionId, standardChangeState);
                            if (standardJanusGraphTx != null) {
                                return;
                            } else {
                                return;
                            }
                        } catch (Throwable th) {
                            standardJanusGraphTx.rollback();
                            JanusGraphTransaction janusGraphTransaction = null;
                            StandardLogProcessorFramework.logger.error("Encountered exception [{}] when running processor [{}] for user log [{}] on attempt {} of {}", new Object[]{th.getMessage(), this.processor, this.userlogName, Integer.valueOf(i), Integer.valueOf(this.retryAttempts)});
                            StandardLogProcessorFramework.logger.error("Full exception: ", th);
                            if (0 != 0) {
                                janusGraphTransaction.commit();
                            }
                        }
                    } finally {
                        if (standardJanusGraphTx != null) {
                            standardJanusGraphTx.commit();
                        }
                    }
                } catch (Throwable th2) {
                    standardJanusGraphTx.rollback();
                    StandardLogProcessorFramework.logger.error("Encountered exception [{}] when preparing processor [{}] for user log [{}] on attempt {} of {}", new Object[]{th2.getMessage(), this.processor, this.userlogName, Integer.valueOf(i), Integer.valueOf(this.retryAttempts)});
                    StandardLogProcessorFramework.logger.error("Full exception: ", th2);
                }
            }
        }

        @Override // org.janusgraph.diskstorage.log.MessageReader
        public void updateState() {
        }

        static {
            $assertionsDisabled = !StandardLogProcessorFramework.class.desiredAssertionStatus();
        }
    }

    public StandardLogProcessorFramework(StandardJanusGraph standardJanusGraph) {
        Preconditions.checkArgument(standardJanusGraph != null && standardJanusGraph.isOpen());
        this.graph = standardJanusGraph;
        this.serializer = standardJanusGraph.getDataSerializer();
        this.times = standardJanusGraph.getConfiguration().getTimestampProvider();
        this.processorLogs = new HashMap();
    }

    private void checkOpen() {
        Preconditions.checkState(this.isOpen, "Transaction log framework has already been closed");
    }

    @Override // org.janusgraph.core.log.LogProcessorFramework
    public synchronized boolean removeLogProcessor(String str) {
        checkOpen();
        if (!this.processorLogs.containsKey(str)) {
            return false;
        }
        try {
            this.processorLogs.get(str).close();
            this.processorLogs.remove(str);
            return true;
        } catch (BackendException e) {
            throw new JanusGraphException("Could not close transaction log: " + str, e);
        }
    }

    @Override // org.janusgraph.core.log.LogProcessorFramework
    public synchronized void shutdown() throws JanusGraphException {
        if (this.isOpen) {
            this.isOpen = false;
            try {
                ExceptionWrapper exceptionWrapper = new ExceptionWrapper();
                for (Log log : this.processorLogs.values()) {
                    log.getClass();
                    ExecuteUtil.executeWithCatching(log::close, exceptionWrapper);
                }
                ExecuteUtil.throwIfException(exceptionWrapper);
                this.processorLogs.clear();
            } catch (BackendException e) {
                throw new JanusGraphException(e);
            }
        }
    }

    @Override // org.janusgraph.core.log.LogProcessorFramework
    public LogProcessorBuilder addLogProcessor(String str) {
        return new Builder(str);
    }
}
