/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class TrafficMaxLaneFlow {
    static final int WINDOW_DURATION = 60;
    static final int WINDOW_SLIDE_EVERY = 5;

    public static void main(String[] args) throws IOException {
        TrafficMaxLaneFlowOptions options = (TrafficMaxLaneFlowOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(TrafficMaxLaneFlowOptions.class);
        options.setBigQuerySchema(FormatMaxesFn.getSchema());
        ExampleUtils exampleUtils = new ExampleUtils(options);
        exampleUtils.setup();
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        TableReference tableRef = new TableReference();
        tableRef.setProjectId(options.getProject());
        tableRef.setDatasetId(options.getBigQueryDataset());
        tableRef.setTableId(options.getBigQueryTable());
        ((PCollection)((PCollection)((PCollection)((PCollection)pipeline.apply("ReadLines", (PTransform)new ReadFileAndExtractTimestamps(options.getInputFile()))).apply((PTransform)ParDo.of((DoFn)new ExtractFlowInfoFn()))).apply((PTransform)Window.into((WindowFn)SlidingWindows.of((Duration)Duration.standardMinutes((long)options.getWindowDuration().intValue())).every(Duration.standardMinutes((long)options.getWindowSlideEvery().intValue()))))).apply((PTransform)new MaxLaneFlow())).apply((PTransform)BigQueryIO.writeTableRows().to(tableRef).withSchema(FormatMaxesFn.getSchema()));
        PipelineResult result = pipeline.run();
        exampleUtils.waitToFinish(result);
    }

    private static Integer tryIntParse(String number) {
        try {
            return Integer.parseInt(number);
        }
        catch (NumberFormatException e) {
            return null;
        }
    }

    private static Double tryDoubleParse(String number) {
        try {
            return Double.parseDouble(number);
        }
        catch (NumberFormatException e) {
            return null;
        }
    }

    public static interface TrafficMaxLaneFlowOptions
    extends ExampleOptions,
    ExampleBigQueryTableOptions {
        @Description(value="Path of the file to read from")
        @Default.String(value="gs://apache-beam-samples/traffic_sensor/Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
        public String getInputFile();

        public void setInputFile(String var1);

        @Description(value="Numeric value of sliding window duration, in minutes")
        @Default.Integer(value=60)
        public Integer getWindowDuration();

        public void setWindowDuration(Integer var1);

        @Description(value="Numeric value of window 'slide every' setting, in minutes")
        @Default.Integer(value=5)
        public Integer getWindowSlideEvery();

        public void setWindowSlideEvery(Integer var1);
    }

    static class ReadFileAndExtractTimestamps
    extends PTransform<PBegin, PCollection<String>> {
        private final String inputFile;

        public ReadFileAndExtractTimestamps(String inputFile) {
            this.inputFile = inputFile;
        }

        public PCollection<String> expand(PBegin begin) {
            return (PCollection)((PCollection)begin.apply((PTransform)TextIO.read().from(this.inputFile))).apply((PTransform)ParDo.of((DoFn)new ExtractTimestamps()));
        }
    }

    static class MaxLaneFlow
    extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
        MaxLaneFlow() {
        }

        public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
            PCollection flowMaxes = (PCollection)flowInfo.apply((PTransform)Combine.perKey((SerializableFunction)new MaxFlow()));
            PCollection results = (PCollection)flowMaxes.apply((PTransform)ParDo.of((DoFn)new FormatMaxesFn()));
            return results;
        }
    }

    static class FormatMaxesFn
    extends DoFn<KV<String, LaneInfo>, TableRow> {
        FormatMaxesFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            LaneInfo laneInfo = (LaneInfo)((KV)c.element()).getValue();
            TableRow row = new TableRow().set("station_id", ((KV)c.element()).getKey()).set("direction", (Object)laneInfo.getDirection()).set("freeway", (Object)laneInfo.getFreeway()).set("lane_max_flow", (Object)laneInfo.getLaneFlow()).set("lane", (Object)laneInfo.getLane()).set("avg_occ", (Object)laneInfo.getLaneAO()).set("avg_speed", (Object)laneInfo.getLaneAS()).set("total_flow", (Object)laneInfo.getTotalFlow()).set("recorded_timestamp", (Object)laneInfo.getRecordedTimestamp()).set("window_timestamp", (Object)c.timestamp().toString());
            c.output((Object)row);
        }

        static TableSchema getSchema() {
            ArrayList<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
            fields.add(new TableFieldSchema().setName("station_id").setType("STRING"));
            fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
            fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
            fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER"));
            fields.add(new TableFieldSchema().setName("lane").setType("STRING"));
            fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT"));
            fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
            fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
            fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
            fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING"));
            TableSchema schema = new TableSchema().setFields(fields);
            return schema;
        }
    }

    public static class MaxFlow
    implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> {
        public LaneInfo apply(Iterable<LaneInfo> input) {
            Integer max = 0;
            LaneInfo maxInfo = new LaneInfo();
            for (LaneInfo item : input) {
                Integer flow = item.getLaneFlow();
                if (flow == null || flow < max) continue;
                max = flow;
                maxInfo = item;
            }
            return maxInfo;
        }
    }

    static class ExtractFlowInfoFn
    extends DoFn<String, KV<String, LaneInfo>> {
        ExtractFlowInfoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            String[] items = ((String)c.element()).split(",", -1);
            if (items.length < 48) {
                return;
            }
            String timestamp = items[0];
            String stationId = items[1];
            String freeway = items[2];
            String direction = items[3];
            Integer totalFlow = TrafficMaxLaneFlow.tryIntParse(items[7]);
            for (int i = 1; i <= 8; ++i) {
                Integer laneFlow = TrafficMaxLaneFlow.tryIntParse(items[6 + 5 * i]);
                Double laneAvgOccupancy = TrafficMaxLaneFlow.tryDoubleParse(items[7 + 5 * i]);
                Double laneAvgSpeed = TrafficMaxLaneFlow.tryDoubleParse(items[8 + 5 * i]);
                if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
                    return;
                }
                LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp, laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow);
                c.output((Object)KV.of((Object)stationId, (Object)laneInfo));
            }
        }
    }

    static class ExtractTimestamps
    extends DoFn<String, String> {
        private static final DateTimeFormatter dateTimeFormat = DateTimeFormat.forPattern((String)"MM/dd/yyyy HH:mm:ss");

        ExtractTimestamps() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            String[] items = ((String)c.element()).split(",", -1);
            if (items.length > 0) {
                try {
                    String timestamp = items[0];
                    c.outputWithTimestamp((Object)((String)c.element()), new Instant(dateTimeFormat.parseMillis(timestamp)));
                }
                catch (IllegalArgumentException illegalArgumentException) {
                    // empty catch block
                }
            }
        }
    }

    @DefaultCoder(value=AvroCoder.class)
    static class LaneInfo {
        @Nullable
        String stationId;
        @Nullable
        String lane;
        @Nullable
        String direction;
        @Nullable
        String freeway;
        @Nullable
        String recordedTimestamp;
        @Nullable
        Integer laneFlow;
        @Nullable
        Integer totalFlow;
        @Nullable
        Double laneAO;
        @Nullable
        Double laneAS;

        public LaneInfo() {
        }

        public LaneInfo(String stationId, String lane, String direction, String freeway, String timestamp, Integer laneFlow, Double laneAO, Double laneAS, Integer totalFlow) {
            this.stationId = stationId;
            this.lane = lane;
            this.direction = direction;
            this.freeway = freeway;
            this.recordedTimestamp = timestamp;
            this.laneFlow = laneFlow;
            this.laneAO = laneAO;
            this.laneAS = laneAS;
            this.totalFlow = totalFlow;
        }

        public String getStationId() {
            return this.stationId;
        }

        public String getLane() {
            return this.lane;
        }

        public String getDirection() {
            return this.direction;
        }

        public String getFreeway() {
            return this.freeway;
        }

        public String getRecordedTimestamp() {
            return this.recordedTimestamp;
        }

        public Integer getLaneFlow() {
            return this.laneFlow;
        }

        public Double getLaneAO() {
            return this.laneAO;
        }

        public Double getLaneAS() {
            return this.laneAS;
        }

        public Integer getTotalFlow() {
            return this.totalFlow;
        }
    }
}

