/*
 * Decompiled with CFR 0.152.
 */
package io.datalbry.connector.sdk.consumer;

import io.datalbry.alxndria.client.api.IndexClient;
import io.datalbry.connector.api.CrawlProcessor;
import io.datalbry.connector.api.DocumentEdge;
import io.datalbry.connector.api.Edge;
import io.datalbry.connector.api.Node;
import io.datalbry.connector.sdk.ConnectorProperties;
import io.datalbry.connector.sdk.consumer.AdditionMessageConsumerKt;
import io.datalbry.connector.sdk.extension.DocumentExtensionsKt;
import io.datalbry.connector.sdk.messaging.Channel;
import io.datalbry.connector.sdk.state.ConnectorDocumentState;
import io.datalbry.connector.sdk.state.Lock;
import io.datalbry.connector.sdk.state.NodeReference;
import io.datalbry.precise.api.schema.document.Document;
import kotlin.Metadata;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;

@Metadata(mv={1, 4, 2}, bv={1, 0, 3}, k=1, d1={"\u0000V\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018\u0000 \u001c2\u00020\u0001:\u0001\u001cBM\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0012\u0010\u0006\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\t0\u0007\u0012\f\u0010\n\u001a\b\u0012\u0004\u0012\u00020\f0\u000b\u0012\f\u0010\r\u001a\b\u0012\u0004\u0012\u00020\b0\u000b\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u00a2\u0006\u0002\u0010\u0010J\u0010\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\bH\u0007J \u0010\u0016\u001a\u00020\u00172\u0006\u0010\u0018\u001a\u00020\f2\u0006\u0010\u0019\u001a\u00020\t2\u0006\u0010\u001a\u001a\u00020\u001bH\u0002R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\b0\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0011\u001a\u00020\u0012X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\n\u001a\b\u0012\u0004\u0012\u00020\f0\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001a\u0010\u0006\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\t0\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001d"}, d2={"Lio/datalbry/connector/sdk/consumer/AdditionMessageConsumer;", "", "props", "Lio/datalbry/connector/sdk/ConnectorProperties;", "index", "Lio/datalbry/alxndria/client/api/IndexClient;", "processor", "Lio/datalbry/connector/api/CrawlProcessor;", "Lio/datalbry/connector/api/DocumentEdge;", "Lio/datalbry/precise/api/schema/document/Document;", "deletionChannel", "Lio/datalbry/connector/sdk/messaging/Channel;", "Lio/datalbry/connector/sdk/state/NodeReference;", "addChannel", "state", "Lio/datalbry/connector/sdk/state/ConnectorDocumentState;", "(Lio/datalbry/connector/sdk/ConnectorProperties;Lio/datalbry/alxndria/client/api/IndexClient;Lio/datalbry/connector/api/CrawlProcessor;Lio/datalbry/connector/sdk/messaging/Channel;Lio/datalbry/connector/sdk/messaging/Channel;Lio/datalbry/connector/sdk/state/ConnectorDocumentState;)V", "datasourceKey", "", "consume", "", "edge", "hasChanged", "", "node", "doc", "lock", "Lio/datalbry/connector/sdk/state/Lock;", "Companion", "sdk"})
public final class AdditionMessageConsumer {
    private final String datasourceKey;
    private final IndexClient index;
    private final CrawlProcessor<DocumentEdge, Document> processor;
    private final Channel<NodeReference> deletionChannel;
    private final Channel<DocumentEdge> addChannel;
    private final ConnectorDocumentState state;
    @NotNull
    public static final String CHECKSUM_FIELD = "_checksum";
    public static final int CONCURRENCY_DEFAULT = 1;
    @NotNull
    public static final String CONCURRENCY = "${io.datalbry.connector.concurrency:1}";
    @NotNull
    public static final String DESTINATION = "${io.datalbry.connector.alxndria.datasource}-document_addition";
    private static final Logger log;
    @NotNull
    public static final Companion Companion;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @JmsListener(destination="${io.datalbry.connector.alxndria.datasource}-document_addition", concurrency="${io.datalbry.connector.concurrency:1}")
    public final void consume(@NotNull DocumentEdge edge) {
        Intrinsics.checkNotNullParameter((Object)edge, (String)"edge");
        NodeReference node = new NodeReference(edge.getUuid());
        Lock lock = this.state.lock(node);
        try {
            Object it;
            log.trace("Start processing Edge[" + edge.getUuid() + ']');
            Node item = this.processor.process((Edge)edge);
            Iterable $this$forEach$iv = item.getObjects();
            boolean $i$f$forEach = false;
            for (Object element$iv : $this$forEach$iv) {
                it = (Document)element$iv;
                boolean bl = false;
                log.trace("Processing Document[" + it.getType() + "][" + it.getId() + ']');
                boolean hasChanged = this.hasChanged(node, (Document)it, lock);
                this.state.put(node, DocumentExtensionsKt.toDocumentState((Document)it), lock);
                if (hasChanged) {
                    this.index.putDocument(this.datasourceKey, AdditionMessageConsumerKt.access$removeChecksum((Document)it));
                }
                log.trace("Completed processing Document[" + it.getType() + "][" + it.getId() + ']');
            }
            $this$forEach$iv = this.state.getUnseenDocuments(node, lock);
            $i$f$forEach = false;
            for (Object element$iv : $this$forEach$iv) {
                it = (String)element$iv;
                boolean bl = false;
                log.trace("Deleting Document[" + (String)it + ']');
                this.index.deleteDocument(this.datasourceKey, (String)it);
                this.state.remove(node, (String)it, lock);
                log.trace("Completed deleting Document[" + (String)it + ']');
            }
            $this$forEach$iv = item.getEdges();
            $i$f$forEach = false;
            for (Object element$iv : $this$forEach$iv) {
                it = (DocumentEdge)element$iv;
                boolean bl = false;
                log.trace("Discovered Edge[" + it.getUuid() + ']');
                this.state.put(node, new NodeReference(it.getUuid()), lock);
                this.addChannel.propagate((DocumentEdge)it);
            }
            $this$forEach$iv = this.state.getUnseenNodes(node, lock);
            $i$f$forEach = false;
            for (Object element$iv : $this$forEach$iv) {
                it = (NodeReference)element$iv;
                boolean bl = false;
                log.trace("Unseen Node[" + ((NodeReference)it).getUuid() + ']');
                this.deletionChannel.propagate((NodeReference)it);
                this.state.remove(node, (NodeReference)it, lock);
            }
        }
        catch (Throwable e) {
            log.warn("Failed to process Node[" + node.getUuid() + "] due to an Exception[\"" + e.getMessage() + "\"]. " + "Check trace for further information.");
            log.trace("", e);
            if (e instanceof Error) {
                throw e;
            }
        }
        finally {
            this.state.release(node, lock);
            log.trace("Completed Edge[" + edge.getUuid() + ']');
        }
    }

    private final boolean hasChanged(NodeReference node, Document doc, Lock lock) {
        String checksum2 = this.state.getChecksum(node, doc.getId(), lock);
        CharSequence charSequence = checksum2;
        boolean bl = false;
        return charSequence.length() == 0 || Intrinsics.areEqual((Object)checksum2, (Object)doc.get(CHECKSUM_FIELD).getValue()) ^ true;
    }

    public AdditionMessageConsumer(@NotNull ConnectorProperties props, @NotNull IndexClient index, @NotNull CrawlProcessor<DocumentEdge, Document> processor, @NotNull Channel<NodeReference> deletionChannel, @NotNull Channel<DocumentEdge> addChannel, @NotNull ConnectorDocumentState state) {
        Intrinsics.checkNotNullParameter((Object)props, (String)"props");
        Intrinsics.checkNotNullParameter((Object)index, (String)"index");
        Intrinsics.checkNotNullParameter(processor, (String)"processor");
        Intrinsics.checkNotNullParameter(deletionChannel, (String)"deletionChannel");
        Intrinsics.checkNotNullParameter(addChannel, (String)"addChannel");
        Intrinsics.checkNotNullParameter((Object)state, (String)"state");
        this.index = index;
        this.processor = processor;
        this.deletionChannel = deletionChannel;
        this.addChannel = addChannel;
        this.state = state;
        this.datasourceKey = props.getAlxndria().getDatasource();
    }

    static {
        Companion = new Companion(null);
        log = LoggerFactory.getLogger(AdditionMessageConsumer.class);
    }

    @Metadata(mv={1, 4, 2}, bv={1, 0, 3}, k=1, d1={"\u0000$\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0086T\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0005\u001a\u00020\u0004X\u0086T\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0086T\u00a2\u0006\u0002\n\u0000R\u000e\u0010\b\u001a\u00020\u0004X\u0086T\u00a2\u0006\u0002\n\u0000R\u0016\u0010\t\u001a\n \u000b*\u0004\u0018\u00010\n0\nX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\f"}, d2={"Lio/datalbry/connector/sdk/consumer/AdditionMessageConsumer$Companion;", "", "()V", "CHECKSUM_FIELD", "", "CONCURRENCY", "CONCURRENCY_DEFAULT", "", "DESTINATION", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "sdk"})
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

