/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2.translators.functions;

import edu.iu.dsc.tws.api.tset.Collector;
import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.FlatMapFunc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

public class GroupByWindowFunction<K, V, W extends BoundedWindow>
implements FlatMapFunc<WindowedValue<KV<K, Iterable<V>>>, KV<K, Iterable<WindowedValue<V>>>> {
    private static final Logger LOG = Logger.getLogger(GroupByWindowFunction.class.getName());
    private final WindowingStrategy<?, W> windowingStrategy;
    private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, W> reduceFn;
    private final SerializablePipelineOptions options;

    public GroupByWindowFunction(WindowingStrategy<?, W> windowingStrategy, SystemReduceFn<K, V, Iterable<V>, Iterable<V>, W> reduceFn, SerializablePipelineOptions options) {
        this.windowingStrategy = windowingStrategy;
        this.reduceFn = reduceFn;
        this.options = options;
    }

    public void flatMap(KV<K, Iterable<WindowedValue<V>>> kIteratorKV, Collector<WindowedValue<KV<K, Iterable<V>>>> collector) {
        try {
            Object key = kIteratorKV.getKey();
            Iterable values = (Iterable)kIteratorKV.getValue();
            InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
            timerInternals.advanceProcessingTime(Instant.now());
            timerInternals.advanceSynchronizedProcessingTime(Instant.now());
            InMemoryStateInternals stateInternals = InMemoryStateInternals.forKey((Object)key);
            GABWOutputWindowedValue outputter = new GABWOutputWindowedValue();
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create((TriggerStateMachine)TriggerStateMachines.stateMachineForTrigger((RunnerApi.Trigger)TriggerTranslation.toProto((Trigger)this.windowingStrategy.getTrigger()))), (StateInternals)stateInternals, (TimerInternals)timerInternals, outputter, (SideInputReader)new UnsupportedSideInputReader("GroupAlsoByWindow"), this.reduceFn, this.options.get());
            reduceFnRunner.processElements(values);
            timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
            timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
            timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
            this.fireEligibleTimers(timerInternals, reduceFnRunner);
            reduceFnRunner.persist();
            Iterator outputs = outputter.getOutputs().iterator();
            while (outputs.hasNext()) {
                collector.collect(outputs.next());
            }
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
    }

    private void fireEligibleTimers(InMemoryTimerInternals timerInternals, ReduceFnRunner<K, V, Iterable<V>, W> reduceFnRunner) throws Exception {
        ArrayList<TimerInternals.TimerData> timers = new ArrayList<TimerInternals.TimerData>();
        while (true) {
            TimerInternals.TimerData timer;
            if ((timer = timerInternals.removeNextEventTimer()) != null) {
                timers.add(timer);
                continue;
            }
            while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
                timers.add(timer);
            }
            while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
                timers.add(timer);
            }
            if (timers.isEmpty()) break;
            reduceFnRunner.onTimers(timers);
            timers.clear();
        }
    }

    public void prepare(TSetContext context) {
    }

    private static class GABWOutputWindowedValue<K, V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<WindowedValue<KV<K, Iterable<V>>>>();

        private GABWOutputWindowedValue() {
        }

        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.outputs.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        public <AT> void outputWindowedValue(TupleTag<AT> tag, AT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
        }

        Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
            return this.outputs;
        }
    }
}

