package org.apache.pulsar.functions.windowing;

import com.google.gson.Gson;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.WatermarkTimeEvictionPolicy;
import org.apache.pulsar.functions.windowing.triggers.CountTriggerPolicy;
import org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy;
import org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
import org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.0.0-rc-1.jar:org/apache/pulsar/functions/windowing/WindowFunctionExecutor.class */
public class WindowFunctionExecutor<I, O> implements Function<I, O> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WindowFunctionExecutor.class);
    private boolean initialized;
    protected WindowConfig windowConfig;
    private WindowManager<Record<I>> windowManager;
    private TimestampExtractor<I> timestampExtractor;
    protected transient WaterMarkEventGenerator<Record<I>> waterMarkEventGenerator;
    protected java.util.function.Function<Collection<I>, O> bareWindowFunction;
    protected WindowFunction<I, O> windowFunction;

    public void initialize(Context context) {
        this.windowConfig = getWindowConfigs(context);
        initializeUserFunction(this.windowConfig);
        log.info("Window Config: {}", this.windowConfig);
        this.windowManager = getWindowManager(this.windowConfig, context);
        this.initialized = true;
        start();
    }

    private void initializeUserFunction(WindowConfig windowConfig) {
        Object createInstance = Reflections.createInstance(windowConfig.getActualWindowFunctionClassName(), Thread.currentThread().getContextClassLoader());
        if (createInstance instanceof java.util.function.Function) {
            if (!TypeResolver.resolveRawArguments((Type) java.util.function.Function.class, createInstance.getClass())[0].equals(Collection.class)) {
                throw new IllegalArgumentException("Window function must take a collection as input");
            }
            this.bareWindowFunction = (java.util.function.Function) createInstance;
        } else {
            if (!(createInstance instanceof WindowFunction)) {
                throw new IllegalArgumentException("Window function does not implement the correct interface");
            }
            this.windowFunction = (WindowFunction) createInstance;
        }
    }

    private WindowConfig getWindowConfigs(Context context) {
        if (context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).isPresent()) {
            return (WindowConfig) new Gson().fromJson(new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get()), WindowConfig.class);
        }
        throw new IllegalArgumentException("Window Configs cannot be found");
    }

    private WindowManager<Record<I>> getWindowManager(WindowConfig windowConfig, Context context) {
        WindowManager<Record<I>> windowManager = new WindowManager<>(newWindowLifecycleListener(context), new ConcurrentLinkedQueue());
        if (this.windowConfig.getTimestampExtractorClassName() != null) {
            this.timestampExtractor = getTimeStampExtractor(windowConfig);
            this.waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, this.windowConfig.getWatermarkEmitIntervalMs().longValue(), this.windowConfig.getMaxLagMs().longValue(), new HashSet(context.getInputTopics()), context);
        } else if (this.windowConfig.getLateDataTopic() != null) {
            throw new IllegalArgumentException("Late data topic can be defined only when specifying a timestamp extractor class");
        }
        EvictionPolicy<Record<I>, ?> evictionPolicy = getEvictionPolicy(windowConfig);
        TriggerPolicy<Record<I>, ?> triggerPolicy = getTriggerPolicy(windowConfig, windowManager, evictionPolicy, context);
        windowManager.setEvictionPolicy(evictionPolicy);
        windowManager.setTriggerPolicy(triggerPolicy);
        return windowManager;
    }

    private TimestampExtractor<I> getTimeStampExtractor(WindowConfig windowConfig) {
        try {
            try {
                Constructor<?> declaredConstructor = Class.forName(windowConfig.getTimestampExtractorClassName(), true, Thread.currentThread().getContextClassLoader()).getDeclaredConstructor(new Class[0]);
                declaredConstructor.setAccessible(true);
                Object newInstance = declaredConstructor.newInstance(new Object[0]);
                Class<?>[] resolveRawArguments = TypeResolver.resolveRawArguments((Type) TimestampExtractor.class, newInstance.getClass());
                Class<?>[] resolveRawArguments2 = TypeResolver.resolveRawArguments(Function.class, (Class) getClass());
                if (resolveRawArguments2[0].equals(resolveRawArguments[0])) {
                    return (TimestampExtractor) newInstance;
                }
                throw new RuntimeException("Inconsistent types found between function input type and timestamp extractor type:  function type = " + resolveRawArguments2[0] + ", timestamp extractor type = " + resolveRawArguments[0]);
            } catch (IllegalAccessException e) {
                throw new RuntimeException("User class must have a no-arg constructor", e);
            } catch (InstantiationException e2) {
                throw new RuntimeException("User class must be concrete", e2);
            } catch (NoSuchMethodException e3) {
                throw new RuntimeException("User class doesn't have such method", e3);
            } catch (InvocationTargetException e4) {
                throw new RuntimeException("User class constructor throws exception", e4);
            }
        } catch (ClassNotFoundException | NoClassDefFoundError e5) {
            throw new RuntimeException(String.format("Timestamp extractor class %s must be in class path", windowConfig.getTimestampExtractorClassName()), e5);
        }
    }

    private TriggerPolicy<Record<I>, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<Record<I>> windowManager, EvictionPolicy<Record<I>, ?> evictionPolicy, Context context) {
        return windowConfig.getSlidingIntervalCount() != null ? isEventTime() ? new WatermarkCountTriggerPolicy(windowConfig.getSlidingIntervalCount().intValue(), windowManager, evictionPolicy, windowManager) : new CountTriggerPolicy(windowConfig.getSlidingIntervalCount().intValue(), windowManager, evictionPolicy) : isEventTime() ? new WatermarkTimeTriggerPolicy(windowConfig.getSlidingIntervalDurationMs().longValue(), windowManager, evictionPolicy, windowManager) : new TimeTriggerPolicy(windowConfig.getSlidingIntervalDurationMs().longValue(), windowManager, evictionPolicy, context);
    }

    private EvictionPolicy<Record<I>, ?> getEvictionPolicy(WindowConfig windowConfig) {
        return windowConfig.getWindowLengthCount() != null ? isEventTime() ? new WatermarkCountEvictionPolicy(windowConfig.getWindowLengthCount().intValue()) : new CountEvictionPolicy(windowConfig.getWindowLengthCount().intValue()) : isEventTime() ? new WatermarkTimeEvictionPolicy(windowConfig.getWindowLengthDurationMs().longValue(), windowConfig.getMaxLagMs().longValue()) : new TimeEvictionPolicy(windowConfig.getWindowLengthDurationMs().longValue());
    }

    protected WindowLifecycleListener<Event<Record<I>>> newWindowLifecycleListener(final Context context) {
        return new WindowLifecycleListener<Event<Record<I>>>() { // from class: org.apache.pulsar.functions.windowing.WindowFunctionExecutor.1
            @Override // org.apache.pulsar.functions.windowing.WindowLifecycleListener
            public void onExpiry(List<Event<Record<I>>> list) {
                Iterator<Event<Record<I>>> it = list.iterator();
                while (it.hasNext()) {
                    it.next().getRecord().ack();
                }
            }

            @Override // org.apache.pulsar.functions.windowing.WindowLifecycleListener
            public void onActivation(List<Event<Record<I>>> list, List<Event<Record<I>>> list2, List<Event<Record<I>>> list3, Long l) {
                WindowFunctionExecutor.this.processWindow(context, (List) list.stream().map(event -> {
                    return (Record) event.get();
                }).collect(Collectors.toList()), (List) list2.stream().map(event2 -> {
                    return (Record) event2.get();
                }).collect(Collectors.toList()), (List) list3.stream().map(event3 -> {
                    return (Record) event3.get();
                }).collect(Collectors.toList()), l);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processWindow(Context context, List<Record<I>> list, List<Record<I>> list2, List<Record<I>> list3, Long l) {
        try {
            O process = process(new WindowImpl(list, list2, list3, getWindowStartTs(l), l), new WindowContextImpl(context));
            if (process != null) {
                context.publish(context.getOutputTopic(), process, context.getOutputSchemaType());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Long getWindowStartTs(Long l) {
        Long l2 = null;
        if (l != null && this.windowConfig.getWindowLengthDurationMs() != null) {
            l2 = Long.valueOf(l.longValue() - this.windowConfig.getWindowLengthDurationMs().longValue());
        }
        return l2;
    }

    private void start() {
        if (this.waterMarkEventGenerator != null) {
            log.debug("Starting waterMarkEventGenerator");
            this.waterMarkEventGenerator.start();
        }
        log.debug("Starting trigger policy");
        this.windowManager.triggerPolicy.start();
    }

    public void shutdown() {
        if (this.waterMarkEventGenerator != null) {
            this.waterMarkEventGenerator.shutdown();
        }
        if (this.windowManager != null) {
            this.windowManager.shutdown();
        }
    }

    private boolean isEventTime() {
        return this.timestampExtractor != null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.pulsar.functions.api.Function
    public O process(I i, Context context) throws Exception {
        if (!this.initialized) {
            initialize(context);
        }
        Record<?> currentRecord = context.getCurrentRecord();
        if (!isEventTime()) {
            this.windowManager.add(currentRecord, System.currentTimeMillis(), currentRecord);
            return null;
        }
        long extractTimestamp = this.timestampExtractor.extractTimestamp(currentRecord.getValue());
        if (this.waterMarkEventGenerator.track(currentRecord.getTopicName().get(), extractTimestamp)) {
            this.windowManager.add(currentRecord, extractTimestamp, currentRecord);
            return null;
        }
        if (this.windowConfig.getLateDataTopic() != null) {
            context.newOutputMessage(this.windowConfig.getLateDataTopic(), null).value(i).sendAsync();
        } else {
            log.info(String.format("Received a late tuple %s with ts %d. This will not be processed.", i, Long.valueOf(extractTimestamp)));
        }
        currentRecord.ack();
        return null;
    }

    public O process(Window<Record<I>> window, WindowContext windowContext) throws Exception {
        if (this.bareWindowFunction == null) {
            return this.windowFunction.process(window.get(), windowContext);
        }
        return this.bareWindowFunction.apply((Collection) window.get().stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()));
    }
}
