package org.ikasan.component.endpoint.bigqueue.consumer;

import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.ikasan.bigqueue.IBigQueue;
import org.ikasan.component.endpoint.bigqueue.consumer.configuration.BigQueueConsumerConfiguration;
import org.ikasan.component.endpoint.bigqueue.serialiser.BigQueueMessageJsonSerialiser;
import org.ikasan.spec.component.endpoint.Consumer;
import org.ikasan.spec.component.endpoint.EndpointListener;
import org.ikasan.spec.configuration.ConfiguredResource;
import org.ikasan.spec.event.EventFactory;
import org.ikasan.spec.event.EventListener;
import org.ikasan.spec.event.ForceTransactionRollbackException;
import org.ikasan.spec.event.ManagedRelatedEventIdentifierService;
import org.ikasan.spec.event.MessageListener;
import org.ikasan.spec.event.Resubmission;
import org.ikasan.spec.flow.FlowEvent;
import org.ikasan.spec.management.ManagedIdentifierService;
import org.ikasan.spec.resubmission.ResubmissionEventFactory;
import org.ikasan.spec.resubmission.ResubmissionService;
import org.ikasan.spec.serialiser.Serialiser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:BOOT-INF/lib/ikasan-big-queue-3.3.2.jar:org/ikasan/component/endpoint/bigqueue/consumer/BigQueueConsumer.class */
public class BigQueueConsumer<T> implements Consumer<EventListener<?>, EventFactory>, ManagedIdentifierService<ManagedRelatedEventIdentifierService>, EndpointListener<T, Throwable>, MessageListener<T>, ConfiguredResource<BigQueueConsumerConfiguration>, ResubmissionService<T>, XAResource {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) BigQueueConsumer.class);
    private boolean isRunning;
    protected EventFactory<FlowEvent<?, ?>> flowEventFactory;
    protected ResubmissionEventFactory<Resubmission<?>> resubmissionEventFactory;
    protected EventListener eventListener;
    protected IBigQueue inboundQueue;
    protected ExecutorService bigQueueListenerExecutor;
    protected ListenableFuture<byte[]> listenableFuture;
    protected ManagedRelatedEventIdentifierService managedRelatedEventIdentifierService;
    protected Serialiser<T, byte[]> serialiser = new BigQueueMessageJsonSerialiser();
    private InboundQueueMessageRunner inboundQueueMessageRunner;
    private TransactionManager transactionManager;
    private BigQueueConsumerConfiguration bigQueueConsumerConfiguration;
    private String configurationId;

    public BigQueueConsumer(IBigQueue iBigQueue, InboundQueueMessageRunner inboundQueueMessageRunner, TransactionManager transactionManager) {
        this.inboundQueue = iBigQueue;
        if (this.inboundQueue == null) {
            throw new IllegalArgumentException("inboundQueue cannot bee null!");
        }
        this.inboundQueueMessageRunner = inboundQueueMessageRunner;
        if (this.inboundQueueMessageRunner == null) {
            throw new IllegalArgumentException("inboundQueueMessageRunner cannot bee null!");
        }
        this.transactionManager = transactionManager;
        if (this.transactionManager == null) {
            throw new IllegalArgumentException("transactionManager cannot bee null!");
        }
    }

    public void setSerialiser(Serialiser<T, byte[]> serialiser) {
        this.serialiser = serialiser;
    }

    protected void invoke(FlowEvent flowEvent) {
        if (this.eventListener == null) {
            throw new RuntimeException("No active eventListeners registered for flowEvent!");
        }
        this.eventListener.invoke((EventListener) flowEvent);
    }

    protected void invoke(Resubmission resubmission) {
        if (this.eventListener == null) {
            throw new RuntimeException("No active eventListeners registered for resubmission event!");
        }
        this.eventListener.invoke((Resubmission) this.resubmissionEventFactory.newResubmissionEvent(this.managedRelatedEventIdentifierService != null ? this.flowEventFactory.newEvent(this.managedRelatedEventIdentifierService.getEventIdentifier(resubmission.getEvent()), this.managedRelatedEventIdentifierService.getRelatedEventIdentifier(resubmission.getEvent()), resubmission) : this.flowEventFactory.newEvent(String.valueOf(resubmission.getEvent().hashCode()), String.valueOf(resubmission.getEvent().hashCode()), resubmission.getEvent())));
    }

    @Override // org.ikasan.spec.component.endpoint.Consumer
    public void setListener(EventListener<?> eventListener) {
        this.eventListener = eventListener;
    }

    @Override // org.ikasan.spec.component.endpoint.Consumer
    public void setEventFactory(EventFactory eventFactory) {
        this.flowEventFactory = eventFactory;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.ikasan.spec.component.endpoint.Consumer
    public EventFactory getEventFactory() {
        return this.flowEventFactory;
    }

    @Override // org.ikasan.spec.component.endpoint.Consumer
    public void start() {
        logger.info("Starting BigQueueConsumer - " + this.configurationId);
        this.bigQueueListenerExecutor = Executors.newSingleThreadExecutor();
        addInboundListener();
        this.isRunning = true;
        logger.info("Started BigQueueConsumer - " + this.configurationId);
    }

    private void addInboundListener() {
        if (this.bigQueueListenerExecutor != null) {
            this.listenableFuture = this.inboundQueue.peekAsync();
            this.listenableFuture.addListener(this.inboundQueueMessageRunner, this.bigQueueListenerExecutor);
        }
    }

    @Override // org.ikasan.spec.component.endpoint.Consumer
    public boolean isRunning() {
        return this.isRunning;
    }

    @Override // org.ikasan.spec.component.endpoint.Consumer
    public void stop() {
        logger.info("Stopping BigQueueConsumer - " + this.configurationId);
        this.isRunning = false;
        if (this.listenableFuture != null) {
            this.listenableFuture.cancel(true);
            logger.debug("future is cancelled" + this.listenableFuture.isCancelled());
            logger.debug("future is done" + this.listenableFuture.isDone());
            this.listenableFuture = null;
        }
        if (this.bigQueueListenerExecutor != null) {
            try {
                shutdownExecutor(this.bigQueueListenerExecutor);
            } catch (Exception e) {
                logger.warn("Unable to shut down big queue executor!", (Throwable) e);
            }
            this.bigQueueListenerExecutor = null;
        }
        this.isRunning = false;
        logger.info("Stopped BigQueueConsumer - " + this.configurationId);
    }

    @Override // org.ikasan.spec.component.endpoint.EndpointListener, org.ikasan.spec.event.MessageListener
    public void onMessage(T t) {
        logger.debug("Received message " + t);
        try {
            this.transactionManager.getTransaction().enlistResource(this);
            invoke(this.managedRelatedEventIdentifierService != null ? this.flowEventFactory.newEvent(this.managedRelatedEventIdentifierService.getEventIdentifier(t), this.managedRelatedEventIdentifierService.getRelatedEventIdentifier(t), t) : this.flowEventFactory.newEvent(String.valueOf(t.hashCode()), String.valueOf(t.hashCode()), t));
        } catch (RollbackException | SystemException e) {
            onException(e);
        }
    }

    @Override // org.ikasan.spec.component.endpoint.EndpointListener
    public void onException(Throwable th) {
        if (th instanceof ForceTransactionRollbackException) {
            logger.info("Ignoring rethrown ForceTransactionRollbackException");
        } else if (this.eventListener != null) {
            this.eventListener.invoke(th);
        } else {
            logger.error(th.getMessage(), th);
        }
    }

    @Override // org.ikasan.spec.component.endpoint.EndpointListener
    public boolean isActive() {
        return false;
    }

    @Override // org.ikasan.spec.management.ManagedIdentifierService
    public void setManagedIdentifierService(ManagedRelatedEventIdentifierService managedRelatedEventIdentifierService) {
        this.managedRelatedEventIdentifierService = managedRelatedEventIdentifierService;
    }

    @Override // org.ikasan.spec.resubmission.ResubmissionService
    public void onResubmission(T t) {
        logger.info("Resubmission message " + t);
        invoke(this.resubmissionEventFactory.newResubmissionEvent(t));
    }

    @Override // org.ikasan.spec.resubmission.ResubmissionService
    public void setResubmissionEventFactory(ResubmissionEventFactory resubmissionEventFactory) {
        this.resubmissionEventFactory = resubmissionEventFactory;
    }

    @Override // org.ikasan.spec.configuration.Configured
    public BigQueueConsumerConfiguration getConfiguration() {
        return this.bigQueueConsumerConfiguration;
    }

    @Override // org.ikasan.spec.configuration.Configured
    public void setConfiguration(BigQueueConsumerConfiguration bigQueueConsumerConfiguration) {
        this.bigQueueConsumerConfiguration = bigQueueConsumerConfiguration;
    }

    @Override // org.ikasan.spec.configuration.ConfiguredResource
    public String getConfiguredResourceId() {
        return this.configurationId;
    }

    @Override // org.ikasan.spec.configuration.ConfiguredResource
    public void setConfiguredResourceId(String str) {
        this.configurationId = str;
    }

    @Override // javax.transaction.xa.XAResource
    public void commit(Xid xid, boolean z) throws XAException {
        logger.debug("commit " + xid);
        try {
            this.inboundQueue.dequeue();
            this.inboundQueue.gc();
            addInboundListener();
        } catch (IOException e) {
            throw new XAException(e.getMessage());
        }
    }

    @Override // javax.transaction.xa.XAResource
    public void end(Xid xid, int i) throws XAException {
        logger.debug("end " + xid);
    }

    @Override // javax.transaction.xa.XAResource
    public void forget(Xid xid) throws XAException {
        logger.debug("forget " + xid);
    }

    @Override // javax.transaction.xa.XAResource
    public int getTransactionTimeout() throws XAException {
        return 0;
    }

    @Override // javax.transaction.xa.XAResource
    public boolean isSameRM(XAResource xAResource) throws XAException {
        return false;
    }

    @Override // javax.transaction.xa.XAResource
    public int prepare(Xid xid) throws XAException {
        logger.debug("prepare " + xid);
        return 0;
    }

    @Override // javax.transaction.xa.XAResource
    public Xid[] recover(int i) throws XAException {
        return new Xid[0];
    }

    @Override // javax.transaction.xa.XAResource
    public void rollback(Xid xid) throws XAException {
        logger.debug("rollback " + xid);
        try {
            if (this.bigQueueConsumerConfiguration.isPutErrorsToBackOfQueue()) {
                this.inboundQueue.enqueue(this.inboundQueue.dequeue());
            }
            addInboundListener();
        } catch (IOException e) {
            throw new XAException(e.getMessage());
        }
    }

    @Override // javax.transaction.xa.XAResource
    public boolean setTransactionTimeout(int i) throws XAException {
        return false;
    }

    @Override // javax.transaction.xa.XAResource
    public void start(Xid xid, int i) throws XAException {
        logger.debug("start " + xid);
    }

    private void shutdownExecutor(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL, TimeUnit.MILLISECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }
}
