/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.subprocess;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.examples.subprocess.SubProcessPipelineOptions;
import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExampleEchoPipeline {
    static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipeline.class);

    public static void main(String[] args) throws Exception {
        SubProcessPipelineOptions options = (SubProcessPipelineOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(SubProcessPipelineOptions.class);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        SubProcessConfiguration configuration = options.getSubProcessConfiguration();
        ArrayList<KV> sampleData = new ArrayList<KV>();
        for (int i = 0; i < 10000; ++i) {
            String str = String.valueOf(i);
            sampleData.add(KV.of((Object)str, (Object)str));
        }
        ((PCollection)((PCollection)p.apply((PTransform)Create.of(sampleData))).apply("Echo inputs round 1", (PTransform)ParDo.of((DoFn)new EchoInputDoFn(configuration, "Echo")))).apply("Echo inputs round 2", (PTransform)ParDo.of((DoFn)new EchoInputDoFn(configuration, "EchoAgain")));
        p.run();
    }

    private static String getTestShellEcho() {
        return "#!/bin/sh\nfilename=$1;\necho $2 >> $filename;";
    }

    private static String getTestShellEchoAgain() {
        return "#!/bin/sh\nfilename=$1;\necho \"You again? Well ok, here is your word again.\" >> $2 >> $filename;";
    }

    public static class EchoInputDoFn
    extends DoFn<KV<String, String>, KV<String, String>> {
        static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
        private SubProcessConfiguration configuration;
        private String binaryName;

        public EchoInputDoFn(SubProcessConfiguration configuration, String binary) {
            this.configuration = configuration;
            this.binaryName = binary;
        }

        @DoFn.Setup
        public void setUp() throws Exception {
            CallingSubProcessUtils.setUp(this.configuration, this.binaryName);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            try {
                SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
                SubProcessCommandLineArgs.Command command = new SubProcessCommandLineArgs.Command(0, String.valueOf(((KV)c.element()).getValue()));
                commands.putCommand(command);
                SubProcessKernel kernel = new SubProcessKernel(this.configuration, this.binaryName);
                List<String> results = kernel.exec(commands);
                for (String s : results) {
                    c.output((Object)KV.of((Object)((String)((KV)c.element()).getKey()), (Object)s));
                }
            }
            catch (Exception ex) {
                LOG.error("Error processing element ", (Throwable)ex);
                throw ex;
            }
        }
    }
}

