/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.subprocess;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.examples.subprocess.SubProcessPipelineOptions;
import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class ExampleEchoPipelineTest {
    private static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipelineTest.class);
    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Test
    public void testExampleEchoPipeline() throws Exception {
        Path fileA = Files.createTempFile("test-Echo", ".sh", new FileAttribute[0]);
        Path fileB = Files.createTempFile("test-EchoAgain", ".sh", new FileAttribute[0]);
        Path workerTempFiles = Files.createTempDirectory("test-Echoo", new FileAttribute[0]);
        try (FileChannel channel = FileChannel.open(fileA, StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            channel.write(ByteBuffer.wrap(ExampleEchoPipelineTest.getTestShellEcho().getBytes(StandardCharsets.UTF_8)));
        }
        channel = FileChannel.open(fileB, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        var5_5 = null;
        try {
            channel.write(ByteBuffer.wrap(ExampleEchoPipelineTest.getTestShellEchoAgain().getBytes(StandardCharsets.UTF_8)));
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (channel != null) {
                ExampleEchoPipelineTest.$closeResource(var5_5, channel);
            }
        }
        SubProcessPipelineOptions options = (SubProcessPipelineOptions)PipelineOptionsFactory.as(SubProcessPipelineOptions.class);
        options.setConcurrency(Integer.valueOf(2));
        options.setSourcePath(fileA.getParent().toString());
        options.setWorkerPath(workerTempFiles.toAbsolutePath().toString());
        ((GcsOptions)this.p.getOptions().as(GcsOptions.class)).setGcsUtil(this.buildMockGcsUtil());
        SubProcessConfiguration configuration = options.getSubProcessConfiguration();
        ArrayList<KV> sampleData = new ArrayList<KV>();
        for (int i = 0; i < 100; ++i) {
            String str = String.valueOf(i);
            sampleData.add(KV.of((Object)str, (Object)str));
        }
        PCollection output = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of(sampleData))).apply("Echo inputs round 1", (PTransform)ParDo.of((DoFn)new EchoInputDoFn(configuration, fileA.getFileName().toString())))).apply("Echo inputs round 2", (PTransform)ParDo.of((DoFn)new EchoInputDoFn(configuration, fileB.getFileName().toString())));
        PAssert.that((PCollection)output).containsInAnyOrder(sampleData);
        this.p.run();
    }

    private static String getTestShellEcho() {
        return "#!/bin/sh\nfilename=$1;\necho $2 >> $filename;";
    }

    private static String getTestShellEchoAgain() {
        return "#!/bin/sh\nfilename=$1;\necho $2 >> $filename;";
    }

    private GcsUtil buildMockGcsUtil() throws IOException {
        GcsUtil mockGcsUtil = (GcsUtil)Mockito.mock(GcsUtil.class);
        Mockito.when((Object)mockGcsUtil.open((GcsPath)Mockito.any(GcsPath.class))).then((Answer)new Answer<SeekableByteChannel>(){

            public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
                return FileChannel.open(Files.createTempFile("channel-", ".tmp", new FileAttribute[0]), StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
            }
        });
        Mockito.when((Object)mockGcsUtil.expand((GcsPath)Mockito.any(GcsPath.class))).then((Answer)new Answer<List<GcsPath>>(){

            public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
                return ImmutableList.of((Object)((GcsPath)invocation.getArguments()[0]));
            }
        });
        return mockGcsUtil;
    }

    private static class EchoInputDoFn
    extends DoFn<KV<String, String>, KV<String, String>> {
        private static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
        private SubProcessConfiguration configuration;
        private String binaryName;

        public EchoInputDoFn(SubProcessConfiguration configuration, String binary) {
            this.configuration = configuration;
            this.binaryName = binary;
        }

        @DoFn.Setup
        public void setUp() throws Exception {
            CallingSubProcessUtils.setUp((SubProcessConfiguration)this.configuration, (String)this.binaryName);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            try {
                SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
                SubProcessCommandLineArgs.Command command = new SubProcessCommandLineArgs.Command(0, String.valueOf(((KV)c.element()).getValue()));
                commands.putCommand(command);
                SubProcessKernel kernel = new SubProcessKernel(this.configuration, this.binaryName);
                List results = kernel.exec(commands);
                for (String s : results) {
                    c.output((Object)KV.of((Object)((String)((KV)c.element()).getKey()), (Object)s));
                }
            }
            catch (Exception ex) {
                LOG.error("Error processing element ", (Throwable)ex);
                throw ex;
            }
        }
    }
}

