/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.python;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.python.PythonExternalTransformOptions;
import org.apache.beam.sdk.extensions.python.PythonService;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.construction.External;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonExternalTransform<@UnknownKeyFor InputT extends @UnknownKeyFor @NonNull @Initialized PInput, @UnknownKeyFor OutputT extends @UnknownKeyFor @NonNull @Initialized POutput>
extends PTransform<InputT, OutputT> {
    private static final @UnknownKeyFor @NonNull @Initialized SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
    private @UnknownKeyFor @NonNull @Initialized String fullyQualifiedName;
    private @UnknownKeyFor @NonNull @Initialized String expansionService;
    private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages;
    private @UnknownKeyFor @NonNull @Initialized SortedMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> kwargsMap;
    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Schema.FieldType> typeHints;
    private @Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] argsArray;
    private @Nullable @UnknownKeyFor @Initialized Row providedKwargsRow;
    /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoders;
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PythonExternalTransform.class);

    private PythonExternalTransform(@UnknownKeyFor @NonNull @Initialized String fullyQualifiedName, @UnknownKeyFor @NonNull @Initialized String expansionService) {
        this.fullyQualifiedName = fullyQualifiedName;
        this.expansionService = expansionService;
        this.extraPackages = new ArrayList<String>();
        this.kwargsMap = new TreeMap<String, Object>();
        this.typeHints = new HashMap();
        this.typeHints.put(PythonCallableSource.class, Schema.FieldType.logicalType((Schema.LogicalType)new PythonCallable()));
        this.argsArray = new Object[0];
        this.outputCoders = new HashMap();
    }

    public static <InputT extends PInput, OutputT extends POutput> @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> from(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return new PythonExternalTransform<InputT, OutputT>(transformName, "");
    }

    public static <InputT extends PInput, OutputT extends POutput> @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> from(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized String expansionService) {
        return new PythonExternalTransform<InputT, OutputT>(transformName, expansionService);
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withArgs(Object ... args) {
        @Nullable Object @NonNull [] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
        System.arraycopy(args, 0, result, this.argsArray.length, args.length);
        this.argsArray = result;
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withKwarg(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized Object value) {
        if (this.providedKwargsRow != null) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.kwargsMap.put(name, value);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withKwargs(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> kwargs) {
        if (this.providedKwargsRow != null) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.kwargsMap.putAll(kwargs);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withKwargs(@UnknownKeyFor @NonNull @Initialized Row kwargs) {
        if (this.kwargsMap.size() > 0) {
            throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
        }
        this.providedKwargsRow = kwargs;
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withTypeHint(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> argType, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Schema.FieldType fieldType) {
        if (this.typeHints.containsKey(argType)) {
            throw new IllegalArgumentException(String.format("typehint for arg type %s already exists", argType));
        }
        this.typeHints.put(argType, fieldType);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withOutputCoders(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputCoders) {
        if (this.outputCoders.size() > 0) {
            throw new IllegalArgumentException("Output coders were already specified");
        }
        this.outputCoders.putAll(outputCoders);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withOutputCoder(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> outputCoder) {
        if (this.outputCoders.size() > 0) {
            throw new IllegalArgumentException("Output coders were already specified");
        }
        this.outputCoders.put("random_output_key", outputCoder);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized PythonExternalTransform<InputT, OutputT> withExtraPackages(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages) {
        if (extraPackages.isEmpty()) {
            return this;
        }
        Preconditions.checkState((boolean)Strings.isNullOrEmpty((String)this.expansionService), (Object)"Extra packages only apply to auto-started expansion service.");
        this.extraPackages = extraPackages;
        return this;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized Row buildOrGetKwargsRow() {
        if (this.providedKwargsRow != null) {
            return this.providedKwargsRow;
        }
        Schema schema = this.generateSchemaFromFieldValues(this.kwargsMap.values().toArray(), this.kwargsMap.keySet().toArray(new String[0]));
        return Row.withSchema((Schema)schema).addValues(this.convertComplexTypesToRows(this.kwargsMap.values().toArray())).build();
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isCustomType(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> type) {
        boolean val = !ClassUtils.isPrimitiveOrWrapper(type) && type != String.class && !this.typeHints.containsKey(type) && !Row.class.isAssignableFrom(type);
        return val;
    }

    private @UnknownKeyFor @NonNull @Initialized Row convertCustomValue(@UnknownKeyFor @NonNull @Initialized Object value) {
        SerializableFunction toRowFunc;
        try {
            toRowFunc = SCHEMA_REGISTRY.getToRowFunction(value.getClass());
        }
        catch (NoSuchSchemaException e) {
            SCHEMA_REGISTRY.registerSchemaProvider(value.getClass(), (SchemaProvider)new JavaFieldSchema());
            try {
                toRowFunc = SCHEMA_REGISTRY.getToRowFunction(value.getClass());
            }
            catch (NoSuchSchemaException e1) {
                throw new RuntimeException(e1);
            }
        }
        return (Row)toRowFunc.apply(value);
    }

    private @UnknownKeyFor @NonNull @Initialized Object @UnknownKeyFor @NonNull @Initialized [] convertComplexTypesToRows(@Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] values) {
        Object[] converted = new Object[values.length];
        for (int i = 0; i < values.length; ++i) {
            Object value = values[i];
            if (value == null) {
                throw new RuntimeException("Null values are not supported");
            }
            converted[i] = this.isCustomType(value.getClass()) ? this.convertCustomValue(value) : value;
        }
        return converted;
    }

    @VisibleForTesting
    @UnknownKeyFor @NonNull @Initialized Row buildOrGetArgsRow() {
        Schema schema = this.generateSchemaFromFieldValues(this.argsArray, null);
        Object[] convertedValues = this.convertComplexTypesToRows(this.argsArray);
        return Row.withSchema((Schema)schema).addValues(convertedValues).build();
    }

    private @UnknownKeyFor @NonNull @Initialized Schema generateSchemaDirectly(@Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] fieldValues, @NonNull @UnknownKeyFor @Initialized String @Nullable @UnknownKeyFor @Initialized [] fieldNames) {
        Schema.Builder builder = Schema.builder();
        int counter = 0;
        for (Object field : fieldValues) {
            String fieldName;
            if (field == null) {
                throw new RuntimeException("Null field values are not supported");
            }
            String string = fieldName = fieldNames != null ? fieldNames[counter] : "field" + counter;
            if (field instanceof Row) {
                builder.addRowField(fieldName, ((Row)field).getSchema());
            } else if (this.typeHints.containsKey(field.getClass())) {
                builder.addField(fieldName, this.typeHints.get(field.getClass()));
            } else {
                builder.addField(fieldName, StaticSchemaInference.fieldFromType((TypeDescriptor)TypeDescriptor.of(field.getClass()), (FieldValueTypeSupplier)JavaFieldSchema.JavaFieldTypeSupplier.INSTANCE));
            }
            ++counter;
        }
        Schema schema = builder.build();
        return schema;
    }

    private @UnknownKeyFor @NonNull @Initialized Schema generateSchemaFromFieldValues(@Nullable @UnknownKeyFor @Initialized Object @NonNull @UnknownKeyFor @Initialized [] fieldValues, @NonNull @UnknownKeyFor @Initialized String @Nullable @UnknownKeyFor @Initialized [] fieldNames) {
        return this.generateSchemaDirectly(fieldValues, fieldNames);
    }

    @VisibleForTesting
    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ExternalTransforms.ExternalConfigurationPayload generatePayload() {
        Row argsRow = this.buildOrGetArgsRow();
        Row kwargsRow = this.buildOrGetKwargsRow();
        Schema.Builder schemaBuilder = Schema.builder();
        schemaBuilder.addStringField("constructor");
        if (argsRow.getValues().size() > 0) {
            schemaBuilder.addRowField("args", argsRow.getSchema());
        }
        if (kwargsRow.getValues().size() > 0) {
            schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
        }
        Schema payloadSchema = schemaBuilder.build();
        Row.Builder payloadRowBuilder = Row.withSchema((Schema)payloadSchema);
        payloadRowBuilder.addValue((Object)this.fullyQualifiedName);
        if (argsRow.getValues().size() > 0) {
            payloadRowBuilder.addValue((Object)argsRow);
        }
        if (kwargsRow.getValues().size() > 0) {
            payloadRowBuilder.addValue((Object)kwargsRow);
        }
        try {
            return ExternalTransforms.ExternalConfigurationPayload.newBuilder().setSchema(SchemaTranslation.schemaToProto((Schema)payloadSchema, (boolean)true)).setPayload(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)RowCoder.of((Schema)payloadSchema), (Object)payloadRowBuilder.build()))).build();
        }
        catch (CoderException e) {
            throw new RuntimeException(e);
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isPythonAvailable() {
        for (String executable : ImmutableList.of((Object)"python3", (Object)"python")) {
            try {
                new ProcessBuilder(executable, "--version").start().waitFor();
                return true;
            }
            catch (IOException | InterruptedException exception) {
            }
        }
        return false;
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isDockerAvailable() {
        String executable = "docker";
        try {
            new ProcessBuilder(executable, "--version").start().waitFor();
            return true;
        }
        catch (IOException | InterruptedException exception) {
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public OutputT expand(InputT input) {
        try {
            ExternalTransforms.ExternalConfigurationPayload payload = this.generatePayload();
            if (!Strings.isNullOrEmpty((String)this.expansionService)) {
                int portIndex = this.expansionService.lastIndexOf(58);
                if (portIndex <= 0) {
                    throw new IllegalArgumentException("Unexpected expansion service address. Expected to be in the format \"<host>:<port>\"");
                }
                PythonService.waitForPort(this.expansionService.substring(0, portIndex), Integer.parseInt(this.expansionService.substring(portIndex + 1, this.expansionService.length())), 15000);
                return this.apply(input, this.expansionService, payload);
            }
            OutputT output = null;
            int port = PythonService.findAvailablePort();
            PipelineOptionsFactory.register(PythonExternalTransformOptions.class);
            PythonExternalTransformOptions options = (PythonExternalTransformOptions)input.getPipeline().getOptions().as(PythonExternalTransformOptions.class);
            boolean useTransformService = options.getUseTransformService();
            @Nullable String customBeamRequirement = options.getCustomBeamRequirement();
            boolean pythonAvailable = this.isPythonAvailable();
            boolean dockerAvailable = this.isDockerAvailable();
            File requirementsFile = null;
            if (!this.extraPackages.isEmpty()) {
                requirementsFile = File.createTempFile("requirements", ".txt");
                requirementsFile.deleteOnExit();
                try (OutputStreamWriter fout = new OutputStreamWriter((OutputStream)new FileOutputStream(requirementsFile.getAbsolutePath()), StandardCharsets.UTF_8);){
                    for (String pkg : this.extraPackages) {
                        fout.write(pkg);
                        ((Writer)fout).write(10);
                    }
                }
            }
            if (useTransformService || !pythonAvailable && dockerAvailable) {
                String projectName = UUID.randomUUID().toString();
                String messageAppend = useTransformService ? "it was explicitly requested" : "a Python executable is not available in the system";
                LOG.info("Using the Docker Compose based transform service since {}. Service will have the project name {} and will be made available at the port {}", new Object[]{messageAppend, projectName, port});
                String pythonRequirementsFile = requirementsFile != null ? requirementsFile.getAbsolutePath() : null;
                TransformServiceLauncher service = TransformServiceLauncher.forProject((String)projectName, (int)port, (String)pythonRequirementsFile);
                service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
                try {
                    service.start();
                    service.waitTillUp(-1);
                    output = this.apply(input, String.format("localhost:%s", port), payload);
                    return output;
                }
                finally {
                    service.shutdown();
                }
            }
            ImmutableList.Builder args = ImmutableList.builder();
            args.add((Object[])new String[]{"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle"});
            if (requirementsFile != null) {
                args.add((Object)("--requirements_file=" + requirementsFile.getAbsolutePath()));
            }
            PythonService service = new PythonService("apache_beam.runners.portability.expansion_service_main", (List<String>)args.build()).withExtraPackages(this.extraPackages);
            if (!Strings.isNullOrEmpty((String)customBeamRequirement)) {
                service = service.withCustomBeamRequirement(customBeamRequirement);
            }
            try (AutoCloseable p = service.start();){
                PythonService.waitForPort("localhost", port, 60000);
                OutputT OutputT = this.apply(input, String.format("localhost:%s", port), payload);
                return OutputT;
            }
        }
        catch (RuntimeException exn) {
            throw exn;
        }
        catch (Exception exn) {
            throw new RuntimeException(exn);
        }
    }

    private OutputT apply(InputT input, @UnknownKeyFor @NonNull @Initialized String expansionService, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ExternalTransforms.ExternalConfigurationPayload payload) {
        PCollectionTuple outputs;
        External.MultiOutputExpandableTransform transform = External.of((String)"beam:transforms:python:fully_qualified_named", (byte[])payload.toByteArray(), (String)expansionService).withMultiOutputs().withOutputCoder(this.outputCoders);
        if (input instanceof PCollection) {
            outputs = (PCollectionTuple)((PCollection)input).apply((PTransform)transform);
        } else if (input instanceof PCollectionTuple) {
            outputs = (PCollectionTuple)((PCollectionTuple)input).apply((PTransform)transform);
        } else if (input instanceof PBegin) {
            outputs = (PCollectionTuple)((PBegin)input).apply((PTransform)transform);
        } else {
            throw new RuntimeException("Unhandled input type " + input.getClass());
        }
        Set tags = outputs.getAll().keySet();
        if (tags.size() == 1) {
            return (OutputT)outputs.get((TupleTag)Iterables.getOnlyElement(tags));
        }
        return (OutputT)outputs;
    }
}

