package com.hazelcast.jet.impl.connector;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.JmsSourceBuilder;
import com.hazelcast.security.permission.ActionConstants;
import com.hazelcast.security.permission.ConnectorPermission;
import java.security.Permission;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.1.3.jar:com/hazelcast/jet/impl/connector/StreamJmsP.class */
public class StreamJmsP<T> extends AbstractProcessor {
    private static final BroadcastKey<String> SEEN_IDS_KEY = BroadcastKey.broadcastKey("seen");
    private static final long RESTORED_IDS_TTL = TimeUnit.MINUTES.toNanos(1);
    private final Connection connection;
    private final FunctionEx<? super Session, ? extends MessageConsumer> consumerFn;
    private final FunctionEx<? super Message, ? extends T> projectionFn;
    private final FunctionEx<? super Message, ?> messageIdFn;
    private final EventTimeMapper<? super T> eventTimeMapper;
    private final ProcessingGuarantee guarantee;
    private final Set<Object> seenIds;
    private Set<Object> restoredIds;
    private Session session;
    private MessageConsumer consumer;
    private boolean snapshotInProgress;
    private Traverser<Map.Entry<BroadcastKey<String>, Set<Object>>> snapshotTraverser;
    private long restoredIdsExpiration = Long.MAX_VALUE;
    private Traverser<Object> pendingTraverser = Traversers.empty();

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.1.3.jar:com/hazelcast/jet/impl/connector/StreamJmsP$Supplier.class */
    public static final class Supplier<T> implements ProcessorSupplier {
        static final long serialVersionUID = 1;
        private final String destination;
        private final SupplierEx<? extends Connection> newConnectionFn;
        private final FunctionEx<? super Session, ? extends MessageConsumer> consumerFn;
        private final FunctionEx<? super Message, ?> messageIdFn;
        private final FunctionEx<? super Message, ? extends T> projectionFn;
        private final EventTimePolicy<? super T> eventTimePolicy;
        private ProcessingGuarantee sourceGuarantee;
        private transient Connection connection;

        public Supplier(String str, ProcessingGuarantee processingGuarantee, EventTimePolicy<? super T> eventTimePolicy, SupplierEx<? extends Connection> supplierEx, FunctionEx<? super Session, ? extends MessageConsumer> functionEx, FunctionEx<? super Message, ?> functionEx2, FunctionEx<? super Message, ? extends T> functionEx3) {
            Util.checkSerializable(supplierEx, "newConnectionFn");
            Util.checkSerializable(functionEx, "consumerFn");
            Util.checkSerializable(functionEx2, "messageIdFn");
            Util.checkSerializable(functionEx3, "projectionFn");
            this.destination = str;
            this.newConnectionFn = supplierEx;
            this.consumerFn = functionEx;
            this.messageIdFn = functionEx2;
            this.projectionFn = functionEx3;
            this.eventTimePolicy = eventTimePolicy;
            this.sourceGuarantee = processingGuarantee;
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void init(@Nonnull ProcessorSupplier.Context context) throws Exception {
            this.connection = this.newConnectionFn.get();
            this.connection.start();
            this.sourceGuarantee = Util.min(this.sourceGuarantee, context.processingGuarantee());
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void close(@Nullable Throwable th) throws Exception {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        @Nonnull
        public Collection<? extends Processor> get(int i) {
            return (Collection) IntStream.range(0, i).mapToObj(i2 -> {
                return new StreamJmsP(this.connection, this.consumerFn, this.messageIdFn, this.projectionFn, this.eventTimePolicy, this.sourceGuarantee);
            }).collect(Collectors.toList());
        }

        @Override // com.hazelcast.security.impl.function.SecuredFunction
        public List<Permission> permissions() {
            return Collections.singletonList(ConnectorPermission.jms(this.destination, ActionConstants.ACTION_READ));
        }
    }

    StreamJmsP(Connection connection, FunctionEx<? super Session, ? extends MessageConsumer> functionEx, FunctionEx<? super Message, ?> functionEx2, FunctionEx<? super Message, ? extends T> functionEx3, EventTimePolicy<? super T> eventTimePolicy, ProcessingGuarantee processingGuarantee) {
        this.connection = connection;
        this.consumerFn = functionEx;
        this.messageIdFn = functionEx2;
        this.projectionFn = functionEx3;
        this.guarantee = processingGuarantee;
        this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
        this.eventTimeMapper.addPartitions(1);
        this.seenIds = processingGuarantee == ProcessingGuarantee.EXACTLY_ONCE ? new HashSet<>() : Collections.emptySet();
        this.restoredIds = processingGuarantee == ProcessingGuarantee.EXACTLY_ONCE ? new HashSet<>() : Collections.emptySet();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean isCooperative() {
        return this.guarantee == ProcessingGuarantee.NONE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) throws JMSException {
        this.session = this.connection.createSession(this.guarantee != ProcessingGuarantee.NONE, 3);
        this.consumer = this.consumerFn.apply(this.session);
    }

    private static long handleJmsTimestamp(Message message) {
        try {
            if (message.getJMSTimestamp() == 0) {
                return Long.MIN_VALUE;
            }
            return message.getJMSTimestamp();
        } catch (JMSException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        if (this.snapshotInProgress) {
            return false;
        }
        while (emitFromTraverser(this.pendingTraverser)) {
            try {
                Message receiveNoWait = this.consumer.receiveNoWait();
                if (receiveNoWait == null) {
                    this.pendingTraverser = this.eventTimeMapper.flatMapIdle();
                    return false;
                }
                if (this.guarantee == ProcessingGuarantee.EXACTLY_ONCE) {
                    if (this.restoredIdsExpiration == Long.MAX_VALUE) {
                        this.restoredIdsExpiration = System.nanoTime() + RESTORED_IDS_TTL;
                    } else if (!this.restoredIds.isEmpty() && this.restoredIdsExpiration <= System.nanoTime()) {
                        this.restoredIds = Collections.emptySet();
                    }
                    Object apply = this.messageIdFn.apply(receiveNoWait);
                    if (apply == null) {
                        throw new JetException("Received a message without an ID. All messages must have an ID, you can specify an extracting function using " + JmsSourceBuilder.class.getSimpleName() + ".messageIdFn()");
                    }
                    this.seenIds.add(apply);
                    if (this.restoredIds.remove(apply)) {
                        LoggingUtil.logFine(getLogger(), "Redelivered message dropped: %s", receiveNoWait);
                    }
                }
                T apply2 = this.projectionFn.apply(receiveNoWait);
                this.pendingTraverser = apply2 != null ? this.eventTimeMapper.flatMapEvent(apply2, 0, handleJmsTimestamp(receiveNoWait)) : this.eventTimeMapper.flatMapIdle();
            } catch (JMSException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }
        return false;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean snapshotCommitPrepare() {
        if (!emitFromTraverser(this.pendingTraverser)) {
            return false;
        }
        this.snapshotInProgress = this.guarantee != ProcessingGuarantee.NONE;
        if (this.guarantee != ProcessingGuarantee.EXACTLY_ONCE) {
            return true;
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseItems(this.seenIds, this.restoredIds).filter(set -> {
                return !set.isEmpty();
            }).map(set2 -> {
                return com.hazelcast.jet.Util.entry(SEEN_IDS_KEY, set2);
            }).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
            LoggingUtil.logFine(getLogger(), "Saved %d seenIds and %d restoredIds to snapshot", Integer.valueOf(this.seenIds.size()), Integer.valueOf(this.restoredIds.size()));
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean snapshotCommitFinish(boolean z) {
        if (this.guarantee == ProcessingGuarantee.NONE) {
            return true;
        }
        if (z) {
            try {
                this.session.commit();
                getLogger().fine("Session committed");
                this.seenIds.clear();
            } catch (JMSException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        } else if (this.guarantee == ProcessingGuarantee.EXACTLY_ONCE) {
            throw new RestartableException("the snapshot failed");
        }
        this.snapshotInProgress = false;
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (!SEEN_IDS_KEY.equals(obj)) {
            throw new RuntimeException("Unexpected key received from snapshot: " + obj);
        }
        if (this.guarantee == ProcessingGuarantee.EXACTLY_ONCE) {
            Set set = (Set) obj2;
            this.restoredIds.addAll(set);
            LoggingUtil.logFine(getLogger(), "Restored %d seen IDs from snapshot", Integer.valueOf(set.size()));
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() throws Exception {
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.session != null) {
            this.session.close();
        }
    }
}
