package org.pragmaticminds.crunch.execution;

import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.TimerGraphStageLogic;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/pragmaticminds/crunch/execution/SortGraphFlow.class */
public class SortGraphFlow<T extends MRecord> extends GraphStage<FlowShape<T, T>> implements Serializable {
    private static final Logger logger = LoggerFactory.getLogger(SortGraphFlow.class);
    private final Long watermarkOffsetMs;
    private final transient Inlet<T> in = Inlet.create("SortGraphFlow.in");
    private final transient Outlet<T> out = Outlet.create("SortGraphFlow.out");
    private final FlowShape<T, T> shape = FlowShape.of(this.in, this.out);
    private final Serializable bufferMutex = new Serializable() { // from class: org.pragmaticminds.crunch.execution.SortGraphFlow.1
    };
    private final ArrayDeque<T> buffer = new ArrayDeque<>();
    private TimestampSortFunction<T> sortFunction = new TimestampSortFunction<>();

    public SortGraphFlow(Long l) {
        this.watermarkOffsetMs = l;
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public FlowShape<T, T> m4shape() {
        return this.shape;
    }

    public GraphStageLogic createLogic(Attributes attributes) {
        return new TimerGraphStageLogic(m4shape()) { // from class: org.pragmaticminds.crunch.execution.SortGraphFlow.2
            private AtomicReference<Long> systemTimeToRecordTimeDifference = new AtomicReference<>();

            {
                setHandlers(SortGraphFlow.this.in, SortGraphFlow.this.out, new AbstractInOutHandler() { // from class: org.pragmaticminds.crunch.execution.SortGraphFlow.2.1
                    public void onPush() {
                        MRecord mRecord = (MRecord) grab(SortGraphFlow.this.in);
                        AnonymousClass2.this.systemTimeToRecordTimeDifference.set(Long.valueOf(System.currentTimeMillis() - mRecord.getTimestamp()));
                        SortGraphFlow.this.sortFunction.process(Long.valueOf(mRecord.getTimestamp()), Long.valueOf(calculateWatermark(mRecord.getTimestamp())), mRecord);
                        scheduleOnce("key", Duration.of(SortGraphFlow.this.watermarkOffsetMs.longValue(), ChronoUnit.MILLIS));
                    }

                    public void onPull() {
                        MRecord bufferPop = bufferPop();
                        if (bufferPop != null) {
                            push(SortGraphFlow.this.out, bufferPop);
                        } else {
                            if (isClosed(SortGraphFlow.this.in) || hasBeenPulled(SortGraphFlow.this.in)) {
                                return;
                            }
                            pull(SortGraphFlow.this.in);
                        }
                    }
                });
            }

            /* JADX INFO: Access modifiers changed from: private */
            public T bufferPop() {
                synchronized (SortGraphFlow.this.bufferMutex) {
                    if (SortGraphFlow.this.buffer.isEmpty()) {
                        return null;
                    }
                    return (T) SortGraphFlow.this.buffer.pop();
                }
            }

            public void onTimer(Object obj) {
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                if (this.systemTimeToRecordTimeDifference.get() != null) {
                    valueOf = Long.valueOf(valueOf.longValue() - this.systemTimeToRecordTimeDifference.get().longValue());
                }
                Collection onTimer = SortGraphFlow.this.sortFunction.onTimer(Long.valueOf(calculateWatermark(valueOf.longValue())));
                if (!onTimer.isEmpty()) {
                    synchronized (SortGraphFlow.this.bufferMutex) {
                        ArrayDeque arrayDeque = SortGraphFlow.this.buffer;
                        arrayDeque.getClass();
                        onTimer.forEach((v1) -> {
                            r1.push(v1);
                        });
                    }
                }
                MRecord bufferPop = bufferPop();
                if (bufferPop == null && !isClosed(SortGraphFlow.this.in) && !hasBeenPulled(SortGraphFlow.this.in)) {
                    pull(SortGraphFlow.this.in);
                }
                while (bufferPop != null) {
                    push(SortGraphFlow.this.out, bufferPop);
                    bufferPop = bufferPop();
                }
            }

            /* JADX INFO: Access modifiers changed from: private */
            public long calculateWatermark(long j) {
                return j - SortGraphFlow.this.watermarkOffsetMs.longValue();
            }

            public void preStart() throws Exception {
                super.preStart();
                pull(SortGraphFlow.this.in);
                SortGraphFlow.logger.debug("Initializing SortGraphFlow");
            }

            public void postStop() throws Exception {
                super.postStop();
                SortGraphFlow.logger.debug("Closing SortGraphFlow");
            }
        };
    }
}
