/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.python;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
import org.apache.beam.runners.core.construction.External;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.python.PythonService;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class PythonExternalTransform<@UnknownKeyFor InputT extends @UnknownKeyFor @NonNull @Initialized PInput, @UnknownKeyFor OutputT extends @UnknownKeyFor @NonNull @Initialized POutput>
extends PTransform<InputT, OutputT> {
    private static final @UnknownKeyFor @NonNull @Initialized SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
    private @UnknownKeyFor @NonNull @Initialized String fullyQualifiedName;
    private @UnknownKeyFor @NonNull @Initialized String expansionService;
    private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages;
    private @UnknownKeyFor @NonNull @Initialized SortedMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> kwargsMap;
    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Schema.FieldType> typeHints;
    private @Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] argsArray;
    private @Nullable @UnknownKeyFor @Initialized Row providedKwargsRow;
    /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoders;

    private PythonExternalTransform(@UnknownKeyFor @NonNull @Initialized String fullyQualifiedName, @UnknownKeyFor @NonNull @Initialized String expansionService) {
        this.fullyQualifiedName = fullyQualifiedName;
        this.expansionService = expansionService;
        this.extraPackages = new ArrayList<String>();
        this.kwargsMap = new TreeMap<String, Object>();
        this.typeHints = new HashMap();
        this.typeHints.put(PythonCallableSource.class, Schema.FieldType.logicalType((Schema.LogicalType)new PythonCallable()));
        this.argsArray = new Object[0];
        this.outputCoders = new HashMap();
    }

    public static <InputT extends PInput, OutputT extends POutput> @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> from(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return new PythonExternalTransform<InputT, OutputT>(transformName, "");
    }

    public static <InputT extends PInput, OutputT extends POutput> @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> from(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized String expansionService) {
        return new PythonExternalTransform<InputT, OutputT>(transformName, expansionService);
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withArgs(Object ... args) {
        @Nullable Object @NonNull [] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
        System.arraycopy(args, 0, result, this.argsArray.length, args.length);
        this.argsArray = result;
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withKwarg(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized Object value) {
        if (this.providedKwargsRow != null) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.kwargsMap.put(name, value);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withKwargs(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> kwargs) {
        if (this.providedKwargsRow != null) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.kwargsMap.putAll(kwargs);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withKwargs(@UnknownKeyFor @NonNull @Initialized Row kwargs) {
        if (this.kwargsMap.size() > 0) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.providedKwargsRow = kwargs;
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withTypeHint(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> argType, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Schema.FieldType fieldType) {
        if (this.typeHints.containsKey(argType)) {
            throw new IllegalArgumentException(String.format("typehint for arg type %s already exists", argType));
        }
        this.typeHints.put(argType, fieldType);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withOutputCoders(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoders) {
        if (this.outputCoders.size() > 0) {
            throw new IllegalArgumentException("Output coders were already specified");
        }
        this.outputCoders.putAll(outputCoders);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withOutputCoder(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> outputCoder) {
        if (this.outputCoders.size() > 0) {
            throw new IllegalArgumentException("Output coders were already specified");
        }
        this.outputCoders.put("random_output_key", outputCoder);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withExtraPackages(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages) {
        if (extraPackages.isEmpty()) {
            return this;
        }
        Preconditions.checkState((boolean)Strings.isNullOrEmpty((String)this.expansionService), (Object)"Extra packages only apply to auto-started expansion service.");
        this.extraPackages = extraPackages;
        return this;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized Row buildOrGetKwargsRow() {
        if (this.providedKwargsRow != null) {
            return this.providedKwargsRow;
        }
        Schema schema = this.generateSchemaFromFieldValues(this.kwargsMap.values().toArray(), this.kwargsMap.keySet().toArray(new String[0]));
        schema.setUUID(UUID.randomUUID());
        return Row.withSchema((Schema)schema).addValues(this.convertComplexTypesToRows(this.kwargsMap.values().toArray())).build();
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isCustomType(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> type) {
        boolean val = !ClassUtils.isPrimitiveOrWrapper(type) && type != String.class && !this.typeHints.containsKey(type) && !Row.class.isAssignableFrom(type);
        return val;
    }

    private @UnknownKeyFor @NonNull @Initialized Row convertCustomValue(@UnknownKeyFor @NonNull @Initialized Object value) {
        SerializableFunction toRowFunc;
        try {
            toRowFunc = SCHEMA_REGISTRY.getToRowFunction(value.getClass());
        }
        catch (NoSuchSchemaException e) {
            SCHEMA_REGISTRY.registerSchemaProvider(value.getClass(), (SchemaProvider)new JavaFieldSchema());
            try {
                toRowFunc = SCHEMA_REGISTRY.getToRowFunction(value.getClass());
            }
            catch (NoSuchSchemaException e1) {
                throw new RuntimeException(e1);
            }
        }
        return (Row)toRowFunc.apply(value);
    }

    private @UnknownKeyFor @NonNull @Initialized Object @UnknownKeyFor @NonNull @Initialized [] convertComplexTypesToRows(@Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] values) {
        Object[] converted = new Object[values.length];
        for (int i = 0; i < values.length; ++i) {
            Object value = values[i];
            if (value == null) {
                throw new RuntimeException("Null values are not supported");
            }
            converted[i] = this.isCustomType(value.getClass()) ? this.convertCustomValue(value) : value;
        }
        return converted;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized Row buildOrGetArgsRow() {
        Schema schema = this.generateSchemaFromFieldValues(this.argsArray, null);
        schema.setUUID(UUID.randomUUID());
        Object[] convertedValues = this.convertComplexTypesToRows(this.argsArray);
        return Row.withSchema((Schema)schema).addValues(convertedValues).build();
    }

    private @UnknownKeyFor @NonNull @Initialized Schema generateSchemaDirectly(@Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] fieldValues, @NonNull @UnknownKeyFor @Initialized String @Nullable @UnknownKeyFor @Initialized [] fieldNames) {
        Schema.Builder builder = Schema.builder();
        int counter = 0;
        for (Object field : fieldValues) {
            String fieldName;
            if (field == null) {
                throw new RuntimeException("Null field values are not supported");
            }
            String string = fieldName = fieldNames != null ? fieldNames[counter] : "field" + counter;
            if (field instanceof Row) {
                builder.addRowField(fieldName, ((Row)field).getSchema());
            } else if (this.typeHints.containsKey(field.getClass())) {
                builder.addField(fieldName, this.typeHints.get(field.getClass()));
            } else {
                builder.addField(fieldName, StaticSchemaInference.fieldFromType((TypeDescriptor)TypeDescriptor.of(field.getClass()), (FieldValueTypeSupplier)JavaFieldSchema.JavaFieldTypeSupplier.INSTANCE));
            }
            ++counter;
        }
        Schema schema = builder.build();
        return schema;
    }

    private @UnknownKeyFor @NonNull @Initialized Schema generateSchemaFromFieldValues(@Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] fieldValues, @NonNull @UnknownKeyFor @Initialized String @Nullable @UnknownKeyFor @Initialized [] fieldNames) {
        return this.generateSchemaDirectly(fieldValues, fieldNames);
    }

    @VisibleForTesting
    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ExternalTransforms.ExternalConfigurationPayload generatePayload() {
        Row argsRow = this.buildOrGetArgsRow();
        Row kwargsRow = this.buildOrGetKwargsRow();
        Schema.Builder schemaBuilder = Schema.builder();
        schemaBuilder.addStringField("constructor");
        if (argsRow.getValues().size() > 0) {
            schemaBuilder.addRowField("args", argsRow.getSchema());
        }
        if (kwargsRow.getValues().size() > 0) {
            schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
        }
        Schema payloadSchema = schemaBuilder.build();
        payloadSchema.setUUID(UUID.randomUUID());
        Row.Builder payloadRowBuilder = Row.withSchema((Schema)payloadSchema);
        payloadRowBuilder.addValue((Object)this.fullyQualifiedName);
        if (argsRow.getValues().size() > 0) {
            payloadRowBuilder.addValue((Object)argsRow);
        }
        if (kwargsRow.getValues().size() > 0) {
            payloadRowBuilder.addValue((Object)kwargsRow);
        }
        try {
            return ExternalTransforms.ExternalConfigurationPayload.newBuilder().setSchema(SchemaTranslation.schemaToProto((Schema)payloadSchema, (boolean)true)).setPayload(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)RowCoder.of((Schema)payloadSchema), (Object)payloadRowBuilder.build()))).build();
        }
        catch (CoderException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public OutputT expand(InputT input) {
        try {
            Throwable throwable;
            ExternalTransforms.ExternalConfigurationPayload payload = this.generatePayload();
            if (!Strings.isNullOrEmpty((String)this.expansionService)) {
                PythonService.waitForPort((String)Iterables.get((Iterable)Splitter.on((char)':').split((CharSequence)this.expansionService), (int)0), Integer.parseInt((String)Iterables.get((Iterable)Splitter.on((char)':').split((CharSequence)this.expansionService), (int)1)), 15000);
                return this.apply(input, this.expansionService, payload);
            }
            int port = PythonService.findAvailablePort();
            ImmutableList.Builder args = ImmutableList.builder();
            args.add((Object[])new String[]{"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle"});
            if (!this.extraPackages.isEmpty()) {
                File requirementsFile = File.createTempFile("requirements", ".txt");
                requirementsFile.deleteOnExit();
                throwable = null;
                try (OutputStreamWriter fout = new OutputStreamWriter((OutputStream)new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8);){
                    for (String pkg : this.extraPackages) {
                        fout.write(pkg);
                        ((Writer)fout).write(10);
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                args.add((Object)("--requirements_file=" + requirementsFile.getAbsolutePath()));
            }
            PythonService service = new PythonService("apache_beam.runners.portability.expansion_service_main", (List<String>)args.build()).withExtraPackages(this.extraPackages);
            throwable = null;
            try (AutoCloseable p = service.start();){
                PythonService.waitForPort("localhost", port, 60000);
                OutputT OutputT = this.apply(input, String.format("localhost:%s", port), payload);
                return OutputT;
            }
            catch (Throwable throwable3) {
                throwable = throwable3;
                throw throwable3;
            }
        }
        catch (RuntimeException exn) {
            throw exn;
        }
        catch (Exception exn) {
            throw new RuntimeException(exn);
        }
    }

    private OutputT apply(InputT input, @UnknownKeyFor @NonNull @Initialized String expansionService, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ExternalTransforms.ExternalConfigurationPayload payload) {
        PCollectionTuple outputs;
        External.MultiOutputExpandableTransform transform = External.of((String)"beam:transforms:python:fully_qualified_named", (byte[])payload.toByteArray(), (String)expansionService).withMultiOutputs().withOutputCoder(this.outputCoders);
        if (input instanceof PCollection) {
            outputs = (PCollectionTuple)((PCollection)input).apply((PTransform)transform);
        } else if (input instanceof PCollectionTuple) {
            outputs = (PCollectionTuple)((PCollectionTuple)input).apply((PTransform)transform);
        } else if (input instanceof PBegin) {
            outputs = (PCollectionTuple)((PBegin)input).apply((PTransform)transform);
        } else {
            throw new RuntimeException("Unhandled input type " + input.getClass());
        }
        Set tags = outputs.getAll().keySet();
        if (tags.size() == 1) {
            return (OutputT)outputs.get((TupleTag)Iterables.getOnlyElement(tags));
        }
        return (OutputT)outputs;
    }
}

