/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.game;

import java.io.Serializable;
import org.apache.beam.examples.complete.game.LeaderBoard;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.examples.java.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class LeaderBoardTest
implements Serializable {
    private static final Duration ALLOWED_LATENESS = Duration.standardHours((long)1L);
    private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes((long)20L);
    private Instant baseTime = new Instant(0L);
    @Rule
    public TestPipeline p = TestPipeline.create();

    @Test
    public void testTeamScoresOnTime() {
        TestStream createEvents = TestStream.create((Coder)AvroCoder.of(UserScore.GameActionInfo.class)).advanceWatermarkTo(this.baseTime).addElements(this.event(TestUser.BLUE_ONE, 3, Duration.standardSeconds((long)3L)), new TimestampedValue[]{this.event(TestUser.BLUE_ONE, 2, Duration.standardMinutes((long)1L)), this.event(TestUser.RED_TWO, 3, Duration.standardSeconds((long)22L)), this.event(TestUser.BLUE_TWO, 5, Duration.standardMinutes((long)3L))}).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)Duration.standardMinutes((long)3L))).addElements(this.event(TestUser.RED_ONE, 1, Duration.standardMinutes((long)4L)), new TimestampedValue[]{this.event(TestUser.BLUE_ONE, 2, Duration.standardSeconds((long)270L))}).advanceWatermarkToInfinity();
        PCollection teamScores = (PCollection)((PCollection)this.p.apply((PTransform)createEvents)).apply((PTransform)new LeaderBoard.CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
        String blueTeam = TestUser.BLUE_ONE.getTeam();
        String redTeam = TestUser.RED_ONE.getTeam();
        PAssert.that((PCollection)teamScores).inOnTimePane((BoundedWindow)new IntervalWindow(this.baseTime, (ReadableDuration)TEAM_WINDOW_DURATION)).containsInAnyOrder((Object[])new KV[]{KV.of((Object)blueTeam, (Object)12), KV.of((Object)redTeam, (Object)4)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testTeamScoresSpeculative() {
        TestStream createEvents = TestStream.create((Coder)AvroCoder.of(UserScore.GameActionInfo.class)).advanceWatermarkTo(this.baseTime).addElements(this.event(TestUser.BLUE_ONE, 3, Duration.standardSeconds((long)3L)), new TimestampedValue[]{this.event(TestUser.BLUE_ONE, 2, Duration.standardMinutes((long)1L))}).advanceProcessingTime(Duration.standardMinutes((long)10L)).addElements(this.event(TestUser.RED_TWO, 5, Duration.standardMinutes((long)3L)), new TimestampedValue[0]).advanceProcessingTime(Duration.standardMinutes((long)12L)).addElements(this.event(TestUser.BLUE_TWO, 3, Duration.standardSeconds((long)22L)), new TimestampedValue[0]).advanceProcessingTime(Duration.standardMinutes((long)10L)).addElements(this.event(TestUser.RED_ONE, 4, Duration.standardMinutes((long)4L)), new TimestampedValue[]{this.event(TestUser.BLUE_TWO, 2, Duration.standardMinutes((long)2L))}).advanceWatermarkToInfinity();
        PCollection teamScores = (PCollection)((PCollection)this.p.apply((PTransform)createEvents)).apply((PTransform)new LeaderBoard.CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
        String blueTeam = TestUser.BLUE_ONE.getTeam();
        String redTeam = TestUser.RED_ONE.getTeam();
        IntervalWindow window = new IntervalWindow(this.baseTime, (ReadableDuration)TEAM_WINDOW_DURATION);
        PAssert.that((PCollection)teamScores).inWindow((BoundedWindow)window).containsInAnyOrder((Object[])new KV[]{KV.of((Object)blueTeam, (Object)10), KV.of((Object)redTeam, (Object)9), KV.of((Object)blueTeam, (Object)5), KV.of((Object)blueTeam, (Object)8), KV.of((Object)redTeam, (Object)5)});
        PAssert.that((PCollection)teamScores).inOnTimePane((BoundedWindow)window).containsInAnyOrder((Object[])new KV[]{KV.of((Object)blueTeam, (Object)10), KV.of((Object)redTeam, (Object)9)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testTeamScoresUnobservablyLate() {
        IntervalWindow window = new IntervalWindow(this.baseTime, (ReadableDuration)TEAM_WINDOW_DURATION);
        TestStream createEvents = TestStream.create((Coder)AvroCoder.of(UserScore.GameActionInfo.class)).advanceWatermarkTo(this.baseTime).addElements(this.event(TestUser.BLUE_ONE, 3, Duration.standardSeconds((long)3L)), new TimestampedValue[]{this.event(TestUser.BLUE_TWO, 5, Duration.standardMinutes((long)8L)), this.event(TestUser.RED_ONE, 4, Duration.standardMinutes((long)2L)), this.event(TestUser.BLUE_ONE, 3, Duration.standardMinutes((long)5L))}).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)TEAM_WINDOW_DURATION).minus((ReadableDuration)Duration.standardMinutes((long)1L))).addElements(this.event(TestUser.RED_TWO, 2, Duration.ZERO), new TimestampedValue[]{this.event(TestUser.RED_TWO, 5, Duration.standardMinutes((long)1L)), this.event(TestUser.BLUE_TWO, 2, Duration.standardSeconds((long)90L)), this.event(TestUser.RED_TWO, 3, Duration.standardMinutes((long)3L))}).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)TEAM_WINDOW_DURATION).plus((ReadableDuration)Duration.standardMinutes((long)1L))).advanceWatermarkToInfinity();
        PCollection teamScores = (PCollection)((PCollection)this.p.apply((PTransform)createEvents)).apply((PTransform)new LeaderBoard.CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
        String blueTeam = TestUser.BLUE_ONE.getTeam();
        String redTeam = TestUser.RED_ONE.getTeam();
        PAssert.that((PCollection)teamScores).inOnTimePane((BoundedWindow)window).containsInAnyOrder((Object[])new KV[]{KV.of((Object)redTeam, (Object)14), KV.of((Object)blueTeam, (Object)13)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testTeamScoresObservablyLate() {
        Instant firstWindowCloses = this.baseTime.plus((ReadableDuration)ALLOWED_LATENESS).plus((ReadableDuration)TEAM_WINDOW_DURATION);
        TestStream createEvents = TestStream.create((Coder)AvroCoder.of(UserScore.GameActionInfo.class)).advanceWatermarkTo(this.baseTime).addElements(this.event(TestUser.BLUE_ONE, 3, Duration.standardSeconds((long)3L)), new TimestampedValue[]{this.event(TestUser.BLUE_TWO, 5, Duration.standardMinutes((long)8L))}).advanceProcessingTime(Duration.standardMinutes((long)10L)).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)Duration.standardMinutes((long)3L))).addElements(this.event(TestUser.RED_ONE, 3, Duration.standardMinutes((long)1L)), new TimestampedValue[]{this.event(TestUser.RED_ONE, 4, Duration.standardMinutes((long)2L)), this.event(TestUser.BLUE_ONE, 3, Duration.standardMinutes((long)5L))}).advanceWatermarkTo(firstWindowCloses.minus((ReadableDuration)Duration.standardMinutes((long)1L))).addElements(this.event(TestUser.RED_TWO, 2, Duration.ZERO), new TimestampedValue[]{this.event(TestUser.RED_TWO, 5, Duration.standardMinutes((long)1L)), this.event(TestUser.RED_TWO, 3, Duration.standardMinutes((long)3L))}).advanceProcessingTime(Duration.standardMinutes((long)12L)).addElements(this.event(TestUser.RED_TWO, 9, Duration.standardMinutes((long)1L)), new TimestampedValue[]{this.event(TestUser.RED_TWO, 1, Duration.standardMinutes((long)3L))}).advanceWatermarkToInfinity();
        PCollection teamScores = (PCollection)((PCollection)this.p.apply((PTransform)createEvents)).apply((PTransform)new LeaderBoard.CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
        IntervalWindow window = new IntervalWindow(this.baseTime, (ReadableDuration)TEAM_WINDOW_DURATION);
        String blueTeam = TestUser.BLUE_ONE.getTeam();
        String redTeam = TestUser.RED_ONE.getTeam();
        PAssert.that((PCollection)teamScores).inWindow((BoundedWindow)window).satisfies((SerializableFunction & Serializable)input -> {
            Assert.assertThat((Object)input, (Matcher)Matchers.hasItem((Object)KV.of((Object)blueTeam, (Object)11)));
            Assert.assertThat((Object)input, (Matcher)Matchers.hasItem((Object)KV.of((Object)redTeam, (Object)27)));
            return null;
        });
        PAssert.thatMap((PCollection)teamScores).inOnTimePane((BoundedWindow)window).isEqualTo((Object)ImmutableMap.builder().put((Object)redTeam, (Object)7).put((Object)blueTeam, (Object)11).build());
        PAssert.that((PCollection)teamScores).inFinalPane((BoundedWindow)window).containsInAnyOrder((Object[])new KV[]{KV.of((Object)redTeam, (Object)27)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testTeamScoresDroppablyLate() {
        IntervalWindow window = new IntervalWindow(this.baseTime, (ReadableDuration)TEAM_WINDOW_DURATION);
        TestStream infos = TestStream.create((Coder)AvroCoder.of(UserScore.GameActionInfo.class)).addElements(this.event(TestUser.BLUE_ONE, 12, Duration.ZERO), new TimestampedValue[]{this.event(TestUser.RED_ONE, 3, Duration.ZERO)}).advanceWatermarkTo(window.maxTimestamp()).addElements(this.event(TestUser.RED_ONE, 4, Duration.standardMinutes((long)2L)), new TimestampedValue[]{this.event(TestUser.BLUE_TWO, 3, Duration.ZERO), this.event(TestUser.BLUE_ONE, 3, Duration.standardMinutes((long)3L))}).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)TEAM_WINDOW_DURATION)).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)ALLOWED_LATENESS).plus((ReadableDuration)TEAM_WINDOW_DURATION).plus((ReadableDuration)Duration.standardMinutes((long)1L))).addElements(this.event(TestUser.BLUE_TWO, 3, TEAM_WINDOW_DURATION.minus((ReadableDuration)Duration.standardSeconds((long)5L))), new TimestampedValue[]{this.event(TestUser.RED_ONE, 7, Duration.standardMinutes((long)4L))}).advanceWatermarkToInfinity();
        PCollection teamScores = (PCollection)((PCollection)this.p.apply((PTransform)infos)).apply((PTransform)new LeaderBoard.CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
        String blueTeam = TestUser.BLUE_ONE.getTeam();
        String redTeam = TestUser.RED_ONE.getTeam();
        PAssert.that((PCollection)teamScores).inWindow((BoundedWindow)window).containsInAnyOrder((Object[])new KV[]{KV.of((Object)redTeam, (Object)7), KV.of((Object)blueTeam, (Object)18)});
        PAssert.that((PCollection)teamScores).inFinalPane((BoundedWindow)window).empty();
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testUserScore() {
        TestStream infos = TestStream.create((Coder)AvroCoder.of(UserScore.GameActionInfo.class)).addElements(this.event(TestUser.BLUE_ONE, 12, Duration.ZERO), new TimestampedValue[]{this.event(TestUser.RED_ONE, 3, Duration.ZERO)}).advanceProcessingTime(Duration.standardMinutes((long)7L)).addElements(this.event(TestUser.RED_ONE, 4, Duration.standardMinutes((long)2L)), new TimestampedValue[]{this.event(TestUser.BLUE_TWO, 3, Duration.ZERO), this.event(TestUser.BLUE_ONE, 3, Duration.standardMinutes((long)3L))}).advanceProcessingTime(Duration.standardMinutes((long)5L)).advanceWatermarkTo(this.baseTime.plus((ReadableDuration)ALLOWED_LATENESS).plus((ReadableDuration)Duration.standardHours((long)12L))).addElements(this.event(TestUser.RED_ONE, 3, Duration.standardMinutes((long)7L)), new TimestampedValue[]{this.event(TestUser.RED_ONE, 2, ALLOWED_LATENESS.plus((ReadableDuration)Duration.standardHours((long)13L)))}).advanceProcessingTime(Duration.standardMinutes((long)6L)).addElements(this.event(TestUser.BLUE_TWO, 5, Duration.standardMinutes((long)12L)), new TimestampedValue[0]).advanceProcessingTime(Duration.standardMinutes((long)20L)).advanceWatermarkToInfinity();
        PCollection userScores = (PCollection)((PCollection)this.p.apply((PTransform)infos)).apply((PTransform)new LeaderBoard.CalculateUserScores(ALLOWED_LATENESS));
        PAssert.that((PCollection)userScores).inEarlyGlobalWindowPanes().containsInAnyOrder((Object[])new KV[]{KV.of((Object)TestUser.BLUE_ONE.getUser(), (Object)15), KV.of((Object)TestUser.RED_ONE.getUser(), (Object)7), KV.of((Object)TestUser.RED_ONE.getUser(), (Object)12), KV.of((Object)TestUser.BLUE_TWO.getUser(), (Object)3), KV.of((Object)TestUser.BLUE_TWO.getUser(), (Object)8)});
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testLeaderBoardOptions() {
        PipelineOptionsFactory.as(LeaderBoard.Options.class);
    }

    private TimestampedValue<UserScore.GameActionInfo> event(TestUser user, int score, Duration baseTimeOffset) {
        return TimestampedValue.of((Object)new UserScore.GameActionInfo(user.getUser(), user.getTeam(), Integer.valueOf(score), Long.valueOf(this.baseTime.plus((ReadableDuration)baseTimeOffset).getMillis())), (Instant)this.baseTime.plus((ReadableDuration)baseTimeOffset));
    }

    private static enum TestUser {
        RED_ONE("scarlet", "red"),
        RED_TWO("burgundy", "red"),
        BLUE_ONE("navy", "blue"),
        BLUE_TWO("sky", "blue");

        private final String userName;
        private final String teamName;

        private TestUser(String userName, String teamName) {
            this.userName = userName;
            this.teamName = teamName;
        }

        public String getUser() {
            return this.userName;
        }

        public String getTeam() {
            return this.teamName;
        }
    }
}

