/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.window.policy.trigger.duration;

import edu.iu.dsc.tws.task.window.api.DefaultEvictionContext;
import edu.iu.dsc.tws.task.window.api.Event;
import edu.iu.dsc.tws.task.window.api.IEvictionPolicy;
import edu.iu.dsc.tws.task.window.manage.IManager;
import edu.iu.dsc.tws.task.window.manage.WindowManager;
import edu.iu.dsc.tws.task.window.policy.trigger.duration.DurationWindowPolicy;
import java.util.logging.Level;
import java.util.logging.Logger;

public class WatermarkDurationWindowPolicy<T>
extends DurationWindowPolicy<T> {
    private static final Logger LOG = Logger.getLogger(WatermarkDurationWindowPolicy.class.getName());
    private final long slidingInterval;
    private final IManager manager;
    private final WindowManager<T> windowManager;
    private final IEvictionPolicy<T> evictionPolicy;
    private boolean started;
    private long nextWindowEndTime = 0L;

    public WatermarkDurationWindowPolicy(long slidingInterval, IManager manager, WindowManager<T> windowManager, IEvictionPolicy<T> evictionPolicy) {
        super(slidingInterval, manager, evictionPolicy);
        this.slidingInterval = slidingInterval;
        this.manager = manager;
        this.windowManager = windowManager;
        this.evictionPolicy = evictionPolicy;
        this.started = false;
    }

    @Override
    public boolean validate() {
        return this.slidingInterval > 0L;
    }

    @Override
    public String whyInvalid() {
        return null;
    }

    @Override
    public void track(Event<T> event) {
        if (this.started && event.isWatermark()) {
            this.onWatermarkEvent(event);
        }
    }

    @Override
    public void reset() {
    }

    @Override
    public void start() {
        this.started = true;
    }

    @Override
    public void shutdown() {
    }

    private void onWatermarkEvent(Event<T> event) {
        long watermarkTimestamp = event.getTimeStamp();
        long windowEndTimestamp = this.nextWindowEndTime;
        LOG.log(Level.FINE, String.format("Window End Timestamp : %s, Watermark Timestamp : %s", String.valueOf(windowEndTimestamp), String.valueOf(watermarkTimestamp)));
        while (windowEndTimestamp <= watermarkTimestamp) {
            long currentEventCount = this.windowManager.getEventCount(windowEndTimestamp);
            this.evictionPolicy.setContext(new DefaultEvictionContext(windowEndTimestamp, currentEventCount));
            if (this.manager.onEvent()) {
                windowEndTimestamp += this.slidingInterval;
                continue;
            }
            long timestamp = this.getNextAlignedWindowTimestamp(windowEndTimestamp, watermarkTimestamp);
            LOG.log(Level.FINE, String.format("Next Aligned Window End at timestamp %s", String.valueOf(timestamp)));
            if (timestamp == Long.MAX_VALUE) {
                LOG.log(Level.FINE, String.format("No Events to process in time periods of window end timestamp %s and watermark timestamp %s", windowEndTimestamp, watermarkTimestamp));
                break;
            }
            windowEndTimestamp = timestamp;
        }
        this.nextWindowEndTime = windowEndTimestamp;
    }

    private long getNextAlignedWindowTimestamp(long start, long end) {
        long nextTimestamp = this.windowManager.getEarliestEventTimestamp(start, end);
        if (nextTimestamp == Long.MAX_VALUE || nextTimestamp % this.slidingInterval == 0L) {
            return nextTimestamp;
        }
        return nextTimestamp + (this.slidingInterval - nextTimestamp % this.slidingInterval);
    }
}

