/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.game;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.LeaderBoard;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.examples.complete.game.utils.GameConstants;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GameStats
extends LeaderBoard {
    protected static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized WriteToBigQuery.FieldInfo<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>>> configureWindowedWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
        tableConfigure.put("team", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> ((KV)c.element()).getKey()));
        tableConfigure.put("total_score", new WriteToBigQuery.FieldInfo("INTEGER", (c, w) -> ((KV)c.element()).getValue()));
        tableConfigure.put("window_start", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> {
            IntervalWindow window = (IntervalWindow)w;
            return GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)window.start());
        }));
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)Instant.now())));
        return tableConfigure;
    }

    protected static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized WriteToBigQuery.FieldInfo<@UnknownKeyFor @NonNull @Initialized Double>> configureSessionWindowWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<Double>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<Double>>();
        tableConfigure.put("window_start", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> {
            IntervalWindow window = (IntervalWindow)w;
            return GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)window.start());
        }));
        tableConfigure.put("mean_duration", new WriteToBigQuery.FieldInfo("FLOAT", (c, w) -> c.element()));
        return tableConfigure;
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        options.setStreaming(true);
        ExampleUtils exampleUtils = new ExampleUtils(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection rawEvents = (PCollection)((PCollection)pipeline.apply((PTransform)PubsubIO.readStrings().withTimestampAttribute("timestamp_ms").fromTopic(options.getTopic()))).apply("ParseGameEvent", (PTransform)ParDo.of((DoFn)new UserScore.ParseEventFn()));
        PCollection userEvents = (PCollection)rawEvents.apply("ExtractUserScore", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.kvs((TypeDescriptor)TypeDescriptors.strings(), (TypeDescriptor)TypeDescriptors.integers())).via((SerializableFunction & Serializable)gInfo -> KV.of((Object)gInfo.getUser(), (Object)gInfo.getScore())));
        final PCollectionView spammersView = (PCollectionView)((PCollection)((PCollection)userEvents.apply("FixedWindowsUser", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getFixedWindowDuration().intValue()))))).apply("CalculateSpammyUsers", (PTransform)new CalculateSpammyUsers())).apply("CreateSpammersView", (PTransform)View.asMap());
        ((PCollection)((PCollection)((PCollection)rawEvents.apply("WindowIntoFixedWindows", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getFixedWindowDuration().intValue()))))).apply("FilterOutSpammers", (PTransform)ParDo.of((DoFn)new DoFn<UserScore.GameActionInfo, UserScore.GameActionInfo>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
                if (((Map)c.sideInput(spammersView)).get(((UserScore.GameActionInfo)c.element()).getUser().trim()) == null) {
                    c.output((Object)((UserScore.GameActionInfo)c.element()));
                }
            }
        }).withSideInputs(new PCollectionView[]{spammersView}))).apply("ExtractTeamScore", (PTransform)new UserScore.ExtractAndSumScore("team"))).apply("WriteTeamSums", new WriteWindowedToBigQuery(((GcpOptions)options.as(GcpOptions.class)).getProject(), options.getDataset(), options.getGameStatsTablePrefix() + "_team", GameStats.configureWindowedWrite()));
        ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)userEvents.apply("WindowIntoSessions", (PTransform)Window.into((WindowFn)Sessions.withGapDuration((Duration)Duration.standardMinutes((long)options.getSessionGap().intValue()))).withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))).apply((PTransform)Combine.perKey((SerializableFunction & Serializable)x -> 0))).apply("UserSessionActivity", (PTransform)ParDo.of((DoFn)new UserSessionInfoFn()))).apply("WindowToExtractSessionMean", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)options.getUserActivityWindowDuration().intValue()))))).apply((PTransform)Mean.globally().withoutDefaults())).apply("WriteAvgSessionLength", new WriteWindowedToBigQuery(((GcpOptions)options.as(GcpOptions.class)).getProject(), options.getDataset(), options.getGameStatsTablePrefix() + "_sessions", GameStats.configureSessionWindowWrite()));
        PipelineResult result = pipeline.run();
        exampleUtils.waitToFinish(result);
    }

    public static interface Options
    extends LeaderBoard.Options {
        @Description(value="Numeric value of fixed window duration for user analysis, in minutes")
        @Default.Integer(value=60)
        public @UnknownKeyFor @NonNull @Initialized Integer getFixedWindowDuration();

        public void setFixedWindowDuration(@UnknownKeyFor @NonNull @Initialized Integer var1);

        @Description(value="Numeric value of gap between user sessions, in minutes")
        @Default.Integer(value=5)
        public @UnknownKeyFor @NonNull @Initialized Integer getSessionGap();

        public void setSessionGap(@UnknownKeyFor @NonNull @Initialized Integer var1);

        @Description(value="Numeric value of fixed window for finding mean of user session duration, in minutes")
        @Default.Integer(value=30)
        public @UnknownKeyFor @NonNull @Initialized Integer getUserActivityWindowDuration();

        public void setUserActivityWindowDuration(@UnknownKeyFor @NonNull @Initialized Integer var1);

        @Description(value="Prefix used for the BigQuery table names")
        @Default.String(value="game_stats")
        public @UnknownKeyFor @NonNull @Initialized String getGameStatsTablePrefix();

        public void setGameStatsTablePrefix(@UnknownKeyFor @NonNull @Initialized String var1);
    }

    private static class UserSessionInfoFn
    extends DoFn<KV<String, Integer>, Integer> {
        private UserSessionInfoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
            IntervalWindow w = (IntervalWindow)window;
            int duration = new Duration((ReadableInstant)w.start(), (ReadableInstant)w.end()).toPeriod().toStandardMinutes().getMinutes();
            c.output((Object)duration);
        }
    }

    public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
        private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
        private static final @UnknownKeyFor @NonNull @Initialized double SCORE_WEIGHT = 2.5;

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>> userScores) {
            PCollection sumScores = (PCollection)userScores.apply("UserSum", (PTransform)Sum.integersPerKey());
            final PCollectionView globalMeanScore = (PCollectionView)((PCollection)sumScores.apply((PTransform)Values.create())).apply((PTransform)Mean.globally().asSingletonView());
            PCollection filtered = (PCollection)sumScores.apply("ProcessAndFilter", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Integer>, KV<String, Integer>>(){
                private final @UnknownKeyFor @NonNull @Initialized Counter numSpammerUsers = Metrics.counter((String)"main", (String)"SpammerUsers");

                @DoFn.ProcessElement
                public void processElement(/*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                // Could not load outer class - annotation placement on inner may be incorrect
                @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
                    Integer score = (Integer)((KV)c.element()).getValue();
                    Double gmc = (Double)c.sideInput(globalMeanScore);
                    if ((double)score.intValue() > gmc * 2.5) {
                        LOG.info("user " + (String)((KV)c.element()).getKey() + " spammer score " + score + " with mean " + gmc);
                        this.numSpammerUsers.inc();
                        c.output((Object)((KV)c.element()));
                    }
                }
            }).withSideInputs(new PCollectionView[]{globalMeanScore}));
            return filtered;
        }
    }
}

