/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.cookbook;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;

public class FilterExamples {
    private static final String WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations";
    static final Logger LOG = Logger.getLogger(FilterExamples.class.getName());
    static final int MONTH_TO_FILTER = 7;

    private static TableSchema buildWeatherSchemaProjection() {
        ArrayList<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
        fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT"));
        return new TableSchema().setFields(fields);
    }

    public static void main(String[] args) throws Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        FilterExamples.runFilterExamples(options);
    }

    static void runFilterExamples(Options options) {
        Pipeline p = Pipeline.create((PipelineOptions)options);
        TableSchema schema = FilterExamples.buildWeatherSchemaProjection();
        ((PCollection)((PCollection)((PCollection)p.apply((PTransform)BigQueryIO.readTableRows().from(options.getInput()))).apply((PTransform)ParDo.of((DoFn)new ProjectionFn()))).apply((PTransform)new BelowGlobalMean(options.getMonthFilter()))).apply((PTransform)BigQueryIO.writeTableRows().to(options.getOutput()).withSchema(schema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
        p.run().waitUntilFinish();
    }

    public static interface Options
    extends PipelineOptions {
        @Description(value="Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
        @Default.String(value="clouddataflow-readonly:samples.weather_stations")
        public String getInput();

        public void setInput(String var1);

        @Description(value="Table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset_id must already exist")
        @Validation.Required
        public String getOutput();

        public void setOutput(String var1);

        @Description(value="Numeric value of month to filter on")
        @Default.Integer(value=7)
        public Integer getMonthFilter();

        public void setMonthFilter(Integer var1);
    }

    static class BelowGlobalMean
    extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
        Integer monthFilter;

        public BelowGlobalMean(Integer monthFilter) {
            this.monthFilter = monthFilter;
        }

        public PCollection<TableRow> expand(PCollection<TableRow> rows) {
            PCollection meanTemps = (PCollection)rows.apply((PTransform)ParDo.of((DoFn)new ExtractTempFn()));
            final PCollectionView globalMeanTemp = (PCollectionView)((PCollection)meanTemps.apply((PTransform)Mean.globally())).apply((PTransform)View.asSingleton());
            PCollection monthFilteredRows = (PCollection)rows.apply((PTransform)ParDo.of((DoFn)new FilterSingleMonthDataFn(this.monthFilter)));
            PCollection filteredRows = (PCollection)monthFilteredRows.apply("ParseAndFilter", (PTransform)ParDo.of((DoFn)new DoFn<TableRow, TableRow>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    Double meanTemp = Double.parseDouble(((TableRow)c.element()).get((Object)"mean_temp").toString());
                    Double gTemp = (Double)c.sideInput(globalMeanTemp);
                    if (meanTemp < gTemp) {
                        c.output((Object)((TableRow)c.element()));
                    }
                }
            }).withSideInputs(new PCollectionView[]{globalMeanTemp}));
            return filteredRows;
        }
    }

    static class ExtractTempFn
    extends DoFn<TableRow, Double> {
        ExtractTempFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            TableRow row = (TableRow)c.element();
            Double meanTemp = Double.parseDouble(row.get((Object)"mean_temp").toString());
            c.output((Object)meanTemp);
        }
    }

    static class FilterSingleMonthDataFn
    extends DoFn<TableRow, TableRow> {
        Integer monthFilter;

        public FilterSingleMonthDataFn(Integer monthFilter) {
            this.monthFilter = monthFilter;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            TableRow row = (TableRow)c.element();
            Integer month = (Integer)row.get((Object)"month");
            if (month.equals(this.monthFilter)) {
                c.output((Object)row);
            }
        }
    }

    static class ProjectionFn
    extends DoFn<TableRow, TableRow> {
        ProjectionFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            TableRow row = (TableRow)c.element();
            Integer year = Integer.parseInt((String)row.get((Object)"year"));
            Integer month = Integer.parseInt((String)row.get((Object)"month"));
            Integer day = Integer.parseInt((String)row.get((Object)"day"));
            Double meanTemp = Double.parseDouble(row.get((Object)"mean_temp").toString());
            TableRow outRow = new TableRow().set("year", (Object)year).set("month", (Object)month).set("day", (Object)day).set("mean_temp", (Object)meanTemp);
            c.output((Object)outRow);
        }
    }
}

