/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.python;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonService {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PythonService.class);
    private final @UnknownKeyFor @NonNull @Initialized String module;
    private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> args;
    private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages;

    public PythonService(@UnknownKeyFor @NonNull @Initialized String module, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> args, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages) {
        this.module = module;
        this.args = args;
        this.extraPackages = extraPackages;
    }

    public PythonService(@UnknownKeyFor @NonNull @Initialized String module, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> args) {
        this(module, args, (List<String>)ImmutableList.of());
    }

    public PythonService(@UnknownKeyFor @NonNull @Initialized String module, String ... args) {
        this(module, Arrays.asList(args));
    }

    public @UnknownKeyFor @NonNull @Initialized PythonService withExtraPackages(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> extraPackages) {
        return new PythonService(this.module, this.args, (List<String>)ImmutableList.builder().addAll(this.extraPackages).addAll(extraPackages).build());
    }

    public @UnknownKeyFor @NonNull @Initialized AutoCloseable start() throws @UnknownKeyFor @NonNull @Initialized IOException, @UnknownKeyFor @NonNull @Initialized InterruptedException {
        String lastLine;
        File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
        bootstrapScript.deleteOnExit();
        try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath());){
            ByteStreams.copy((InputStream)this.getClass().getResourceAsStream("bootstrap_beam_venv.py"), (OutputStream)fout);
        }
        ArrayList<String> bootstrapCommand = new ArrayList<String>();
        bootstrapCommand.add(this.whichPython());
        bootstrapCommand.add(bootstrapScript.getAbsolutePath());
        bootstrapCommand.add("--beam_version=" + PythonService.getMatchingStablePythonSDKVersion(ReleaseInfo.getReleaseInfo().getSdkVersion()));
        if (!this.extraPackages.isEmpty()) {
            bootstrapCommand.add("--extra_packages=" + String.join((CharSequence)";", this.extraPackages));
        }
        LOG.info("Running bootstrap command " + bootstrapCommand);
        Process bootstrap = new ProcessBuilder(bootstrapCommand).redirectError(ProcessBuilder.Redirect.INHERIT).start();
        bootstrap.getOutputStream().close();
        BufferedReader reader = new BufferedReader(new InputStreamReader(bootstrap.getInputStream(), Charsets.UTF_8));
        String lastNonEmptyLine = lastLine = reader.readLine();
        while (lastLine != null) {
            LOG.info(lastLine);
            if (lastLine.length() > 0) {
                lastNonEmptyLine = lastLine;
            }
            lastLine = reader.readLine();
        }
        reader.close();
        int result = bootstrap.waitFor();
        if (result != 0) {
            throw new RuntimeException("Python bootstrap failed with error " + result + ", " + lastNonEmptyLine);
        }
        String pythonExecutable = lastNonEmptyLine;
        ArrayList<String> command = new ArrayList<String>();
        command.add(pythonExecutable);
        command.add("-m");
        command.add(this.module);
        command.addAll(this.args);
        LOG.info("Starting python service with arguments " + command);
        Process p = new ProcessBuilder(command).redirectError(ProcessBuilder.Redirect.INHERIT).redirectOutput(ProcessBuilder.Redirect.INHERIT).start();
        return p::destroy;
    }

    private @UnknownKeyFor @NonNull @Initialized String whichPython() {
        for (String executable : ImmutableList.of((Object)"python3", (Object)"python")) {
            try {
                new ProcessBuilder(executable, "--version").start().waitFor();
                return executable;
            }
            catch (IOException | InterruptedException exception) {
            }
        }
        throw new RuntimeException("Unable to find a suitable Python executable.");
    }

    @VisibleForTesting
    static @UnknownKeyFor @NonNull @Initialized String getMatchingStablePythonSDKVersion(@UnknownKeyFor @NonNull @Initialized String javaSDKVersion) {
        if (javaSDKVersion == null) {
            return "latest";
        }
        if (javaSDKVersion.endsWith(".dev")) {
            return "latest";
        }
        return javaSDKVersion;
    }

    public static @UnknownKeyFor @NonNull @Initialized int findAvailablePort() throws @UnknownKeyFor @NonNull @Initialized IOException {
        ServerSocket s = new ServerSocket(0);
        try {
            int n = s.getLocalPort();
            return n;
        }
        finally {
            s.close();
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public static void waitForPort(@UnknownKeyFor @NonNull @Initialized String host, @UnknownKeyFor @NonNull @Initialized int port, @UnknownKeyFor @NonNull @Initialized int timeoutMs) throws @UnknownKeyFor @NonNull @Initialized TimeoutException, @UnknownKeyFor @NonNull @Initialized InterruptedException {
        long start = System.currentTimeMillis();
        long duration = 10L;
        while (System.currentTimeMillis() - start < (long)timeoutMs) {
            try {
                new Socket(host, port).close();
                return;
            }
            catch (IOException exn) {
                Thread.sleep(duration);
                duration = (long)((double)duration * 1.2);
            }
        }
        throw new TimeoutException("Timeout waiting for Python service startup after " + (System.currentTimeMillis() - start) + " milliseconds.");
    }
}

