/*
 * Decompiled with CFR 0.152.
 */
package org.jberet.wildfly.cluster.jms;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.StreamMessage;
import javax.jms.Topic;
import org.jberet.job.model.Step;
import org.jberet.runtime.JobExecutionImpl;
import org.jberet.runtime.JobStopNotificationListener;
import org.jberet.runtime.PartitionExecutionImpl;
import org.jberet.runtime.context.StepContextImpl;
import org.jberet.spi.PartitionHandler;
import org.jberet.spi.PartitionInfo;
import org.jberet.wildfly.cluster.jms.JmsPartitionResource;
import org.jberet.wildfly.cluster.jms._private.ClusterJmsLogger;
import org.jberet.wildfly.cluster.jms._private.ClusterJmsMessages;

public class JmsPartitionHandler
implements PartitionHandler,
JobStopNotificationListener {
    private BlockingQueue<Boolean> completedPartitionThreads;
    private BlockingQueue<Serializable> collectorDataQueue;
    private final JmsPartitionResource jmsPartitionResource = new JmsPartitionResource();
    private final JMSContext partitionQueueConsumerContext;
    private final JMSContext partitionQueueProducerContext;
    private final Queue partitionQueue = this.jmsPartitionResource.getPartitionQueue();
    private final ConnectionFactory connectionFactory = this.jmsPartitionResource.getConnectionFactory();

    public JmsPartitionHandler(StepContextImpl stepContext) {
        this.partitionQueueConsumerContext = this.connectionFactory.createContext();
        this.partitionQueueProducerContext = this.connectionFactory.createContext();
        long stepExecutionId = stepContext.getStepExecutionId();
        String partitionResultSelector = JmsPartitionResource.getMessageSelector("R", stepExecutionId);
        JMSConsumer consumer = this.partitionQueueConsumerContext.createConsumer((Destination)this.partitionQueue, partitionResultSelector);
        consumer.setMessageListener(message -> {
            Serializable partitionCollectorData;
            try {
                partitionCollectorData = (Serializable)message.getBody(Serializable.class);
            }
            catch (JMSException e) {
                throw ClusterJmsMessages.MESSAGES.failedInJms(e);
            }
            if (partitionCollectorData instanceof PartitionExecutionImpl) {
                if (this.completedPartitionThreads != null) {
                    this.completedPartitionThreads.offer(Boolean.TRUE);
                }
                PartitionExecutionImpl partitionExecution = (PartitionExecutionImpl)partitionCollectorData;
                int partitionId = partitionExecution.getPartitionId();
                ClusterJmsLogger.LOGGER.receivedPartitionResult(stepContext.getJobContext().getExecutionId(), stepContext.getStepExecutionId(), partitionId, partitionExecution.getBatchStatus());
                List partitionExecutions = stepContext.getStepExecution().getPartitionExecutions();
                for (int i = 0; i < partitionExecutions.size(); ++i) {
                    if (((PartitionExecutionImpl)partitionExecutions.get(i)).getPartitionId() != partitionId) continue;
                    partitionExecutions.remove(i);
                    partitionExecutions.add(partitionExecution);
                }
            }
            try {
                this.collectorDataQueue.put(partitionCollectorData);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public void setResourceTracker(BlockingQueue<Boolean> completedPartitionThreads) {
        this.completedPartitionThreads = completedPartitionThreads;
    }

    public void setCollectorDataQueue(BlockingQueue<Serializable> collectorDataQueue) {
        this.collectorDataQueue = collectorDataQueue;
    }

    public void submitPartitionTask(StepContextImpl partitionStepContext, int currentIndex, int numOfPartitions) throws Exception {
        Step step1 = partitionStepContext.getStep();
        PartitionExecutionImpl partitionExecution = (PartitionExecutionImpl)partitionStepContext.getStepExecution();
        JobExecutionImpl jobExecution = partitionStepContext.getJobContext().getJobExecution();
        PartitionInfo partitionInfo = new PartitionInfo(partitionExecution, step1, jobExecution);
        ObjectMessage message = this.partitionQueueProducerContext.createObjectMessage((Serializable)partitionInfo);
        message.setStringProperty("type", "P");
        this.partitionQueueProducerContext.createProducer().send((Destination)this.partitionQueue, (Message)message);
    }

    public void stopRequested(long jobExecutionId) {
        try (JMSContext stopRequestTopicContext = this.connectionFactory.createContext();){
            Topic stopRequestTopic = this.jmsPartitionResource.getStopRequestTopic();
            StreamMessage message = stopRequestTopicContext.createStreamMessage();
            try {
                message.setLongProperty("jobExecutionId", jobExecutionId);
                message.writeByte((byte)1);
                stopRequestTopicContext.createProducer().send((Destination)stopRequestTopic, (Message)message);
            }
            catch (JMSException e) {
                throw ClusterJmsMessages.MESSAGES.failedInJms(e);
            }
        }
    }

    public void close(StepContextImpl stepContext) {
        JmsPartitionResource.closeJmsContext(this.partitionQueueConsumerContext);
        JmsPartitionResource.closeJmsContext(this.partitionQueueProducerContext);
        this.jmsPartitionResource.close();
    }
}

