/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.windowing;

import java.util.logging.Logger;
import org.apache.storm.windowing.DefaultEvictionContext;
import org.apache.storm.windowing.Event;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TriggerHandler;
import org.apache.storm.windowing.TriggerPolicy;
import org.apache.storm.windowing.WindowManager;

public class WatermarkTimeTriggerPolicy<T>
implements TriggerPolicy<T, Long> {
    private static final Logger LOG = Logger.getLogger(WatermarkTimeTriggerPolicy.class.getName());
    private final long slidingIntervalMs;
    private final TriggerHandler handler;
    private final EvictionPolicy<T, ?> evictionPolicy;
    private final WindowManager<T> windowManager;
    private volatile long nextWindowEndTs;
    private boolean started;

    public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler, EvictionPolicy<T, ?> evictionPolicy, WindowManager<T> windowManager) {
        this.slidingIntervalMs = slidingIntervalMs;
        this.handler = handler;
        this.evictionPolicy = evictionPolicy;
        this.windowManager = windowManager;
        this.started = false;
    }

    @Override
    public void track(Event<T> event) {
        if (this.started && event.isWatermark()) {
            this.handleWaterMarkEvent(event);
        }
    }

    @Override
    public void reset() {
    }

    @Override
    public void start() {
        this.started = true;
    }

    @Override
    public void shutdown() {
    }

    private void handleWaterMarkEvent(Event<T> event) {
        long watermarkTs = event.getTimestamp();
        long windowEndTs = this.nextWindowEndTs;
        LOG.fine(String.format("Window end ts %s Watermark ts %s", windowEndTs, watermarkTs));
        while (windowEndTs <= watermarkTs) {
            long currentCount = this.windowManager.getEventCount(windowEndTs);
            this.evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
            if (this.handler.onTrigger()) {
                windowEndTs += this.slidingIntervalMs;
                continue;
            }
            long ts = this.getNextAlignedWindowTs(windowEndTs, watermarkTs);
            LOG.info(() -> String.format("Next aligned window end ts %s", ts));
            if (ts == Long.MAX_VALUE) {
                LOG.fine(String.format("No events to process between %s and watermark ts %s", windowEndTs, watermarkTs));
                break;
            }
            windowEndTs = ts;
        }
        this.nextWindowEndTs = windowEndTs;
    }

    private long getNextAlignedWindowTs(long startTs, long endTs) {
        long nextTs = this.windowManager.getEarliestEventTs(startTs, endTs);
        if (nextTs == Long.MAX_VALUE || nextTs % this.slidingIntervalMs == 0L) {
            return nextTs;
        }
        return nextTs + (this.slidingIntervalMs - nextTs % this.slidingIntervalMs);
    }

    @Override
    public Long getState() {
        return this.nextWindowEndTs;
    }

    @Override
    public void restoreState(Long state) {
        this.nextWindowEndTs = state;
    }

    public String toString() {
        return "WatermarkTimeTriggerPolicy{slidingIntervalMs=" + this.slidingIntervalMs + ", nextWindowEndTs=" + this.nextWindowEndTs + ", started=" + this.started + '}';
    }
}

