/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.WindowedWordCount;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.examples.java.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.examples.java.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.examples.java.repackaged.com.google.common.collect.Lists;
import org.apache.beam.examples.java.repackaged.com.google.common.collect.UnmodifiableIterator;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.StreamingIT;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.ExplicitShardedFile;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NumberedShardedFile;
import org.apache.beam.sdk.util.ShardedFile;
import org.apache.beam.sdk.util.Sleeper;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class WindowedWordCountIT {
    @Rule
    public TestName testName = new TestName();
    private static final String DEFAULT_INPUT = "gs://apache-beam-samples/shakespeare/sonnets.txt";
    static final int MAX_READ_RETRIES = 4;
    static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds((long)10L);
    static final FluentBackoff BACK_OFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(DEFAULT_SLEEP_DURATION).withMaxRetries(4);

    @BeforeClass
    public static void setUp() {
        PipelineOptionsFactory.register(TestPipelineOptions.class);
    }

    @Test
    public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
        WindowedWordCountITOptions options = this.batchOptions();
        options.setNumShards(null);
        this.testWindowedWordCountPipeline(options);
    }

    @Test
    public void testWindowedWordCountInBatchStaticSharding() throws Exception {
        WindowedWordCountITOptions options = this.batchOptions();
        options.setNumShards(3);
        this.testWindowedWordCountPipeline(options);
    }

    @Test
    @Category(value={StreamingIT.class})
    public void testWindowedWordCountInStreamingStaticSharding() throws Exception {
        WindowedWordCountITOptions options = this.streamingOptions();
        options.setNumShards(3);
        this.testWindowedWordCountPipeline(options);
    }

    private WindowedWordCountITOptions defaultOptions() throws Exception {
        WindowedWordCountITOptions options = (WindowedWordCountITOptions)TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
        options.setInputFile(DEFAULT_INPUT);
        options.setTestTimeoutSeconds(1200L);
        options.setMinTimestampMillis(0L);
        options.setMinTimestampMillis(Duration.standardHours((long)1L).getMillis());
        options.setWindowSize(10);
        options.setOutput(FileSystems.matchNewResource((String)options.getTempRoot(), (boolean)true).resolve(String.format("WindowedWordCountIT.%s-%tFT%<tH:%<tM:%<tS.%<tL+%s", this.testName.getMethodName(), new Date(), ThreadLocalRandom.current().nextInt()), (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("output", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("results", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
        return options;
    }

    private WindowedWordCountITOptions streamingOptions() throws Exception {
        WindowedWordCountITOptions options = this.defaultOptions();
        options.setStreaming(true);
        return options;
    }

    private WindowedWordCountITOptions batchOptions() throws Exception {
        WindowedWordCountITOptions options = this.defaultOptions();
        options.setStreaming(false);
        return options;
    }

    private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception {
        String outputPrefix = options.getOutput();
        WriteOneFilePerWindow.PerWindowFiles filenamePolicy = new WriteOneFilePerWindow.PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible((String)outputPrefix));
        ArrayList expectedOutputFiles = Lists.newArrayListWithCapacity((int)6);
        UnmodifiableIterator unmodifiableIterator = ImmutableList.of((Object)0, (Object)10, (Object)20, (Object)30, (Object)40, (Object)50).iterator();
        while (unmodifiableIterator.hasNext()) {
            int startMinute = (Integer)unmodifiableIterator.next();
            Instant windowStart = new Instant((Object)options.getMinTimestampMillis()).plus((ReadableDuration)Duration.standardMinutes((long)startMinute));
            expectedOutputFiles.add(new NumberedShardedFile(filenamePolicy.filenamePrefixForWindow(new IntervalWindow(windowStart, windowStart.plus((ReadableDuration)Duration.standardMinutes((long)10L)))) + "*"));
        }
        ExplicitShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
        TreeMap<String, Long> expectedWordCounts = new TreeMap<String, Long>();
        for (String line : inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
            String[] words;
            for (String word : words = line.split("[^\\p{L}]+")) {
                if (word.isEmpty()) continue;
                expectedWordCounts.put(word, (Long)MoreObjects.firstNonNull(expectedWordCounts.get(word), (Object)0L) + 1L);
            }
        }
        options.setOnSuccessMatcher(new WordCountsMatcher(expectedWordCounts, expectedOutputFiles));
        WindowedWordCount.main((String[])TestPipeline.convertToArgs((PipelineOptions)options));
    }

    private static class WordCountsMatcher
    extends TypeSafeMatcher<PipelineResult>
    implements SerializableMatcher<PipelineResult> {
        private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
        private final SortedMap<String, Long> expectedWordCounts;
        private final List<ShardedFile> outputFiles;
        private SortedMap<String, Long> actualCounts;

        public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, List<ShardedFile> outputFiles) {
            this.expectedWordCounts = expectedWordCounts;
            this.outputFiles = outputFiles;
        }

        public boolean matchesSafely(PipelineResult pipelineResult) {
            try {
                ArrayList outputLines = new ArrayList();
                for (ShardedFile outputFile : this.outputFiles) {
                    outputLines.addAll(outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()));
                }
                this.actualCounts = new TreeMap<String, Long>();
                for (String line : outputLines) {
                    String[] splits = line.split(": ");
                    String word = splits[0];
                    long count = Long.parseLong(splits[1]);
                    this.actualCounts.merge(word, count, (a, b) -> a + b);
                }
                return this.actualCounts.equals(this.expectedWordCounts);
            }
            catch (Exception e) {
                throw new RuntimeException(String.format("Failed to read from sharded output: %s due to exception", this.outputFiles), e);
            }
        }

        public void describeTo(Description description) {
            Matchers.equalTo(this.expectedWordCounts).describeTo(description);
        }

        public void describeMismatchSafely(PipelineResult pResult, Description description) {
            Matchers.equalTo(this.expectedWordCounts).describeMismatch(this.actualCounts, description);
        }
    }

    public static interface WindowedWordCountITOptions
    extends WindowedWordCount.Options,
    TestPipelineOptions,
    StreamingOptions {
    }
}

