/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.python;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
import org.apache.beam.runners.core.construction.External;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.python.PythonService;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public class PythonExternalTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
    private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
    private String fullyQualifiedName;
    private String expansionService;
    private SortedMap<String, Object> kwargsMap;
    private Map<Class<?>, Schema.FieldType> typeHints;
    private @Nullable Object @NonNull [] argsArray;
    private @Nullable Row providedKwargsRow;
    Map<String, Coder<?>> outputCoders;

    private PythonExternalTransform(String fullyQualifiedName, String expansionService) {
        this.fullyQualifiedName = fullyQualifiedName;
        this.expansionService = expansionService;
        this.kwargsMap = new TreeMap<String, Object>();
        this.typeHints = new HashMap();
        this.typeHints.put(PythonCallableSource.class, Schema.FieldType.logicalType((Schema.LogicalType)new PythonCallable()));
        this.argsArray = new Object[0];
        this.outputCoders = new HashMap();
    }

    public static <InputT extends PInput, OutputT extends POutput> PythonExternalTransform<InputT, OutputT> from(String tranformName) {
        return new PythonExternalTransform<InputT, OutputT>(tranformName, "");
    }

    public static <InputT extends PInput, OutputT extends POutput> PythonExternalTransform<InputT, OutputT> from(String tranformName, String expansionService) {
        return new PythonExternalTransform<InputT, OutputT>(tranformName, expansionService);
    }

    public PythonExternalTransform<InputT, OutputT> withArgs(Object ... args) {
        @Nullable Object @NonNull [] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
        System.arraycopy(args, 0, result, this.argsArray.length, args.length);
        this.argsArray = result;
        return this;
    }

    public PythonExternalTransform<InputT, OutputT> withKwarg(String name, Object value) {
        if (this.providedKwargsRow != null) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.kwargsMap.put(name, value);
        return this;
    }

    public PythonExternalTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
        if (this.providedKwargsRow != null) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.kwargsMap.putAll(kwargs);
        return this;
    }

    public PythonExternalTransform<InputT, OutputT> withKwargs(Row kwargs) {
        if (this.kwargsMap.size() > 0) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.providedKwargsRow = kwargs;
        return this;
    }

    public PythonExternalTransform<InputT, OutputT> withTypeHint(Class<?> argType, Schema.FieldType fieldType) {
        if (this.typeHints.containsKey(argType)) {
            throw new IllegalArgumentException(String.format("typehint for arg type %s already exists", argType));
        }
        this.typeHints.put(argType, fieldType);
        return this;
    }

    public PythonExternalTransform<InputT, OutputT> withOutputCoders(Map<String, Coder<?>> outputCoders) {
        if (this.outputCoders.size() > 0) {
            throw new IllegalArgumentException("Output coders were already specified");
        }
        this.outputCoders.putAll(outputCoders);
        return this;
    }

    public PythonExternalTransform<InputT, OutputT> withOutputCoder(Coder<?> outputCoder) {
        if (this.outputCoders.size() > 0) {
            throw new IllegalArgumentException("Output coders were already specified");
        }
        this.outputCoders.put("random_output_key", outputCoder);
        return this;
    }

    @VisibleForTesting
    Row buildOrGetKwargsRow() {
        if (this.providedKwargsRow != null) {
            return this.providedKwargsRow;
        }
        Schema schema = this.generateSchemaFromFieldValues(this.kwargsMap.values().toArray(), this.kwargsMap.keySet().toArray(new String[0]));
        schema.setUUID(UUID.randomUUID());
        return Row.withSchema((Schema)schema).addValues(this.convertComplexTypesToRows(this.kwargsMap.values().toArray())).build();
    }

    private boolean isCustomType(Class<?> type) {
        boolean val = !ClassUtils.isPrimitiveOrWrapper(type) && type != String.class && !this.typeHints.containsKey(type) && !Row.class.isAssignableFrom(type);
        return val;
    }

    private Row convertCustomValue(Object value) {
        SerializableFunction toRowFunc;
        try {
            toRowFunc = SCHEMA_REGISTRY.getToRowFunction(value.getClass());
        }
        catch (NoSuchSchemaException e) {
            SCHEMA_REGISTRY.registerSchemaProvider(value.getClass(), (SchemaProvider)new JavaFieldSchema());
            try {
                toRowFunc = SCHEMA_REGISTRY.getToRowFunction(value.getClass());
            }
            catch (NoSuchSchemaException e1) {
                throw new RuntimeException(e1);
            }
        }
        return (Row)toRowFunc.apply(value);
    }

    private Object[] convertComplexTypesToRows(@Nullable Object @NonNull [] values) {
        Object[] converted = new Object[values.length];
        for (int i = 0; i < values.length; ++i) {
            Object value = values[i];
            if (value == null) {
                throw new RuntimeException("Null values are not supported");
            }
            converted[i] = this.isCustomType(value.getClass()) ? this.convertCustomValue(value) : value;
        }
        return converted;
    }

    @VisibleForTesting
    Row buildOrGetArgsRow() {
        Schema schema = this.generateSchemaFromFieldValues(this.argsArray, null);
        schema.setUUID(UUID.randomUUID());
        Object[] convertedValues = this.convertComplexTypesToRows(this.argsArray);
        return Row.withSchema((Schema)schema).addValues(convertedValues).build();
    }

    private Schema generateSchemaDirectly(@Nullable Object @NonNull [] fieldValues, @NonNull String @Nullable [] fieldNames) {
        Schema.Builder builder = Schema.builder();
        int counter = 0;
        for (Object field : fieldValues) {
            String fieldName;
            if (field == null) {
                throw new RuntimeException("Null field values are not supported");
            }
            String string = fieldName = fieldNames != null ? fieldNames[counter] : "field" + counter;
            if (field instanceof Row) {
                builder.addRowField(fieldName, ((Row)field).getSchema());
            } else if (this.typeHints.containsKey(field.getClass())) {
                builder.addField(fieldName, this.typeHints.get(field.getClass()));
            } else {
                builder.addField(fieldName, StaticSchemaInference.fieldFromType((TypeDescriptor)TypeDescriptor.of(field.getClass()), (FieldValueTypeSupplier)JavaFieldSchema.JavaFieldTypeSupplier.INSTANCE));
            }
            ++counter;
        }
        Schema schema = builder.build();
        return schema;
    }

    private Schema generateSchemaFromFieldValues(@Nullable Object @NonNull [] fieldValues, @NonNull String @Nullable [] fieldNames) {
        return this.generateSchemaDirectly(fieldValues, fieldNames);
    }

    @VisibleForTesting
    ExternalTransforms.ExternalConfigurationPayload generatePayload() {
        Row argsRow = this.buildOrGetArgsRow();
        Row kwargsRow = this.buildOrGetKwargsRow();
        Schema.Builder schemaBuilder = Schema.builder();
        schemaBuilder.addStringField("constructor");
        if (argsRow.getValues().size() > 0) {
            schemaBuilder.addRowField("args", argsRow.getSchema());
        }
        if (kwargsRow.getValues().size() > 0) {
            schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
        }
        Schema payloadSchema = schemaBuilder.build();
        payloadSchema.setUUID(UUID.randomUUID());
        Row.Builder payloadRowBuilder = Row.withSchema((Schema)payloadSchema);
        payloadRowBuilder.addValue((Object)this.fullyQualifiedName);
        if (argsRow.getValues().size() > 0) {
            payloadRowBuilder.addValue((Object)argsRow);
        }
        if (kwargsRow.getValues().size() > 0) {
            payloadRowBuilder.addValue((Object)kwargsRow);
        }
        try {
            return ExternalTransforms.ExternalConfigurationPayload.newBuilder().setSchema(SchemaTranslation.schemaToProto((Schema)payloadSchema, (boolean)true)).setPayload(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)RowCoder.of((Schema)payloadSchema), (Object)payloadRowBuilder.build()))).build();
        }
        catch (CoderException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public OutputT expand(InputT input) {
        try {
            ExternalTransforms.ExternalConfigurationPayload payload = this.generatePayload();
            if (!Strings.isNullOrEmpty((String)this.expansionService)) {
                PythonService.waitForPort((String)Iterables.get((Iterable)Splitter.on((char)':').split((CharSequence)this.expansionService), (int)0), Integer.parseInt((String)Iterables.get((Iterable)Splitter.on((char)':').split((CharSequence)this.expansionService), (int)1)), 15000);
                return this.apply(input, this.expansionService, payload);
            }
            int port = PythonService.findAvailablePort();
            PythonService service = new PythonService("apache_beam.runners.portability.expansion_service_main", "--port", "" + port, "--fully_qualified_name_glob", "*");
            try (AutoCloseable p = service.start();){
                PythonService.waitForPort("localhost", port, 15000);
                OutputT OutputT = this.apply(input, String.format("localhost:%s", port), payload);
                return OutputT;
            }
        }
        catch (Exception exn) {
            throw new RuntimeException(exn);
        }
    }

    private OutputT apply(InputT input, String expansionService, ExternalTransforms.ExternalConfigurationPayload payload) {
        PCollectionTuple outputs;
        External.MultiOutputExpandableTransform transform = External.of((String)"beam:transforms:python:fully_qualified_named", (byte[])payload.toByteArray(), (String)expansionService).withMultiOutputs().withOutputCoder(this.outputCoders);
        if (input instanceof PCollection) {
            outputs = (PCollectionTuple)((PCollection)input).apply((PTransform)transform);
        } else if (input instanceof PCollectionTuple) {
            outputs = (PCollectionTuple)((PCollectionTuple)input).apply((PTransform)transform);
        } else if (input instanceof PBegin) {
            outputs = (PCollectionTuple)((PBegin)input).apply((PTransform)transform);
        } else {
            throw new RuntimeException("Unhandled input type " + input.getClass());
        }
        Set tags = outputs.getAll().keySet();
        if (tags.size() == 1) {
            return (OutputT)outputs.get((TupleTag)Iterables.getOnlyElement(tags));
        }
        return (OutputT)outputs;
    }
}

