package org.apache.camel.processor.resume;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-core-processor-4.0.0.jar:org/apache/camel/processor/resume/ResumableCompletion.class */
public class ResumableCompletion implements Synchronization {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ResumableCompletion.class);
    private final ResumeStrategy resumeStrategy;
    private final LoggingLevel loggingLevel;
    private final boolean intermittent;

    public ResumableCompletion(ResumeStrategy resumeStrategy, LoggingLevel loggingLevel, boolean z) {
        this.resumeStrategy = resumeStrategy;
        this.loggingLevel = loggingLevel;
        this.intermittent = z;
    }

    @Override // org.apache.camel.spi.Synchronization
    public void onComplete(Exchange exchange) {
        if (ExchangeHelper.isFailureHandled(exchange)) {
            return;
        }
        Object header = ExchangeHelper.getResultMessage(exchange).getHeader(Exchange.OFFSET);
        if (!(header instanceof Resumable)) {
            if (this.intermittent) {
                return;
            }
            exchange.setException(new NoOffsetException(exchange));
            LOG.warn("Cannot update the last offset because it's not available");
            return;
        }
        Resumable resumable = (Resumable) header;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Processing the resumable: {}", resumable.getOffsetKey());
            LOG.trace("Processing the resumable of type: {}", resumable.getLastOffset().getValue());
        }
        try {
            this.resumeStrategy.updateLastOffset(resumable);
        } catch (Exception e) {
            LOG.error("Unable to update the offset: {}", e.getMessage(), e);
        }
    }

    @Override // org.apache.camel.spi.Synchronization
    public void onFailure(Exchange exchange) {
        Exception exception = exchange.getException();
        Object header = exchange.getMessage().getHeader(Exchange.OFFSET);
        if (header instanceof Resumable) {
            Resumable resumable = (Resumable) header;
            String format = String.format("Skipping offset update with address '%s' and offset value '%s' due to failure in processing: %s", resumable.getOffsetKey(), resumable.getLastOffset().getValue(), exception.getMessage());
            if (LOG.isDebugEnabled() || CamelLogger.shouldLog(LOG, this.loggingLevel)) {
                CamelLogger.log(LOG, LoggingLevel.DEBUG, format, exception);
                return;
            } else {
                CamelLogger.log(LOG, this.loggingLevel, format + " (stacktrace available in DEBUG logging level)");
                return;
            }
        }
        Object[] objArr = new Object[2];
        objArr[0] = header == null ? "type null" : "unspecified type";
        objArr[1] = exception.getMessage();
        String format2 = String.format("Skipping offset update of '%s' due to failure in processing: %s", objArr);
        if (LOG.isDebugEnabled() || CamelLogger.shouldLog(LOG, this.loggingLevel)) {
            CamelLogger.log(LOG, LoggingLevel.DEBUG, format2, exception);
        } else {
            CamelLogger.log(LOG, this.loggingLevel, format2 + " (stacktrace available in DEBUG logging level)");
        }
    }
}
