/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.python;

import io.grpc.ManagedChannelBuilder;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteStreamHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.LogOutputStream;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.python.IPythonClient;
import org.apache.zeppelin.python.Py4JUtils;
import org.apache.zeppelin.python.PythonZeppelinContext;
import org.apache.zeppelin.python.proto.CancelRequest;
import org.apache.zeppelin.python.proto.CompletionRequest;
import org.apache.zeppelin.python.proto.CompletionResponse;
import org.apache.zeppelin.python.proto.ExecuteRequest;
import org.apache.zeppelin.python.proto.ExecuteResponse;
import org.apache.zeppelin.python.proto.ExecuteStatus;
import org.apache.zeppelin.python.proto.IPythonStatus;
import org.apache.zeppelin.python.proto.StatusRequest;
import org.apache.zeppelin.python.proto.StatusResponse;
import org.apache.zeppelin.python.proto.StopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;

public class IPythonInterpreter
extends Interpreter
implements ExecuteResultHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
    private ExecuteWatchdog watchDog;
    private IPythonClient ipythonClient;
    private GatewayServer gatewayServer;
    protected BaseZeppelinContext zeppelinContext;
    private String pythonExecutable;
    private long ipythonLaunchTimeout;
    private String additionalPythonPath;
    private String additionalPythonInitFile;
    private boolean useBuiltinPy4j = true;
    private boolean useAuth = false;
    private String secret;
    private volatile boolean pythonProcessFailed = false;
    private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);

    public IPythonInterpreter(Properties properties) {
        super(properties);
    }

    public void setAdditionalPythonPath(String additionalPythonPath) {
        LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
        this.additionalPythonPath = additionalPythonPath;
    }

    public void setAdditionalPythonInitFile(String additionalPythonInitFile) {
        this.additionalPythonInitFile = additionalPythonInitFile;
    }

    public void setAddBulitinPy4j(boolean add) {
        this.useBuiltinPy4j = add;
    }

    public BaseZeppelinContext buildZeppelinContext() {
        return new PythonZeppelinContext(this.getInterpreterGroup().getInterpreterHookRegistry(), Integer.parseInt(this.getProperty("zeppelin.python.maxResult", "1000")));
    }

    public void open() throws InterpreterException {
        try {
            if (this.ipythonClient != null) {
                return;
            }
            this.pythonExecutable = this.getProperty("zeppelin.python", "python");
            LOGGER.info("Python Exec: " + this.pythonExecutable);
            String checkPrerequisiteResult = this.checkIPythonPrerequisite(this.pythonExecutable);
            if (!StringUtils.isEmpty((String)checkPrerequisiteResult)) {
                throw new InterpreterException("IPython prerequisite is not meet: " + checkPrerequisiteResult);
            }
            this.ipythonLaunchTimeout = Long.parseLong(this.getProperty("zeppelin.ipython.launch.timeout", "30000"));
            this.zeppelinContext = this.buildZeppelinContext();
            int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
            int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
            LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
            LOGGER.info("Launching JVM Gateway at port: " + jvmGatewayPort);
            int message_size = Integer.parseInt(this.getProperty("zeppelin.ipython.grpc.message_size", "33554432"));
            this.ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress((String)"127.0.0.1", (int)ipythonPort).usePlaintext(true).maxInboundMessageSize(message_size));
            this.useAuth = Boolean.parseBoolean(this.getProperty("zeppelin.py4j.useAuth", "false"));
            this.secret = Py4JUtils.createSecret(256);
            this.launchIPythonKernel(ipythonPort);
            this.setupJVMGateway(jvmGatewayPort, this.secret, this.useAuth);
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to open IPythonInterpreter", e);
        }
    }

    public String checkIPythonPrerequisite(String pythonExec) {
        ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze");
        try {
            File stderrFile = File.createTempFile("zeppelin", ".txt");
            processBuilder.redirectError(stderrFile);
            File stdoutFile = File.createTempFile("zeppelin", ".txt");
            processBuilder.redirectOutput(stdoutFile);
            Process proc = processBuilder.start();
            int ret = proc.waitFor();
            if (ret != 0) {
                return "Fail to run pip freeze.\n" + IOUtils.toString((InputStream)new FileInputStream(stderrFile));
            }
            String freezeOutput = IOUtils.toString((InputStream)new FileInputStream(stdoutFile));
            LOGGER.debug("Installed python packages:\n" + freezeOutput);
            if (!freezeOutput.contains("jupyter-client=")) {
                return "jupyter-client is not installed.";
            }
            if (!freezeOutput.contains("ipykernel=")) {
                return "ipkernel is not installed";
            }
            if (!freezeOutput.contains("ipython=")) {
                return "ipython is not installed";
            }
            if (!freezeOutput.contains("grpcio=")) {
                return "grpcio is not installed";
            }
            if (!freezeOutput.contains("protobuf=")) {
                return "protobuf is not installed";
            }
            LOGGER.info("IPython prerequisite is met");
        }
        catch (Exception e) {
            LOGGER.warn("Fail to checkIPythonPrerequisite", (Throwable)e);
            return "Fail to checkIPythonPrerequisite: " + ExceptionUtils.getStackTrace((Throwable)e);
        }
        return "";
    }

    private void setupJVMGateway(int jvmGatewayPort, String secret, boolean useAuth) throws IOException {
        this.gatewayServer = Py4JUtils.createGatewayServer((Object)this, "127.0.0.1", jvmGatewayPort, secret, useAuth);
        this.gatewayServer.start();
        InputStream input = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream("grpc/python/zeppelin_python.py");
        List lines = IOUtils.readLines((InputStream)input);
        ExecuteResponse response = this.ipythonClient.block_execute(ExecuteRequest.newBuilder().setCode(StringUtils.join((Collection)lines, (String)System.lineSeparator()).replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")).build());
        if (response.getStatus() == ExecuteStatus.ERROR) {
            throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
        }
        if (this.additionalPythonInitFile != null) {
            input = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream(this.additionalPythonInitFile);
            lines = IOUtils.readLines((InputStream)input);
            response = this.ipythonClient.block_execute(ExecuteRequest.newBuilder().setCode(StringUtils.join((Collection)lines, (String)System.lineSeparator()).replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "")).build());
            if (response.getStatus() == ExecuteStatus.ERROR) {
                throw new IOException("Fail to run additional Python init file: " + this.additionalPythonInitFile + "\n" + response.getOutput());
            }
        }
    }

    private void launchIPythonKernel(int ipythonPort) throws IOException, URISyntaxException {
        String[] ipythonScripts;
        File tmpPythonScriptFolder = Files.createTempDirectory("zeppelin_ipython", new FileAttribute[0]).toFile();
        for (String ipythonScript : ipythonScripts = new String[]{"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"}) {
            URL url = ((Object)((Object)this)).getClass().getClassLoader().getResource("grpc/python/" + ipythonScript);
            FileUtils.copyURLToFile((URL)url, (File)new File(tmpPythonScriptFolder, ipythonScript));
        }
        CommandLine cmd = CommandLine.parse((String)this.pythonExecutable);
        cmd.addArgument(tmpPythonScriptFolder.getAbsolutePath() + "/ipython_server.py");
        cmd.addArgument(ipythonPort + "");
        DefaultExecutor executor = new DefaultExecutor();
        ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER);
        executor.setStreamHandler((ExecuteStreamHandler)new PumpStreamHandler((OutputStream)((Object)processOutput)));
        this.watchDog = new ExecuteWatchdog(-1L);
        executor.setWatchdog(this.watchDog);
        if (this.useBuiltinPy4j) {
            String py4jLibPath = null;
            if (System.getenv("ZEPPELIN_HOME") != null) {
                py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + "interpreter/python/py4j-0.9.2/src";
            } else {
                Path workingPath = Paths.get("..", new String[0]).toAbsolutePath();
                py4jLibPath = workingPath + File.separator + "interpreter/python/py4j-0.9.2/src";
            }
            this.additionalPythonPath = this.additionalPythonPath != null ? this.additionalPythonPath + ":" + py4jLibPath : py4jLibPath;
        }
        Map<String, String> envs = this.setupIPythonEnv();
        executor.execute(cmd, envs, (ExecuteResultHandler)this);
        long startTime = System.currentTimeMillis();
        while (!this.pythonProcessFailed) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                LOGGER.error("Interrupted by something", (Throwable)e);
            }
            try {
                StatusResponse response = this.ipythonClient.status(StatusRequest.newBuilder().build());
                if (response.getStatus() == IPythonStatus.RUNNING) {
                    LOGGER.info("IPython Kernel is Running");
                    break;
                }
                LOGGER.info("Wait for IPython Kernel to be started");
            }
            catch (Exception e) {
                LOGGER.info("Wait for IPython Kernel to be started");
            }
            if (System.currentTimeMillis() - startTime <= this.ipythonLaunchTimeout) continue;
            throw new IOException("Fail to launch IPython Kernel in " + this.ipythonLaunchTimeout / 1000L + " seconds");
        }
        if (this.pythonProcessFailed) {
            throw new IOException("Fail to launch IPython Kernel as the python process is failed");
        }
    }

    protected Map<String, String> setupIPythonEnv() throws IOException {
        Map envs = EnvironmentUtils.getProcEnvironment();
        if (envs.containsKey("PYTHONPATH")) {
            if (this.additionalPythonPath != null) {
                envs.put("PYTHONPATH", this.additionalPythonPath + ":" + (String)envs.get("PYTHONPATH"));
            }
        } else {
            envs.put("PYTHONPATH", this.additionalPythonPath);
        }
        if (this.useAuth) {
            envs.put("PY4J_GATEWAY_SECRET", this.secret);
        }
        LOGGER.info("PYTHONPATH:" + (String)envs.get("PYTHONPATH"));
        return envs;
    }

    public void close() throws InterpreterException {
        if (this.watchDog != null) {
            LOGGER.debug("Kill IPython Process");
            this.ipythonClient.stop(StopRequest.newBuilder().build());
            try {
                this.ipythonClient.shutdown();
            }
            catch (InterruptedException e) {
                LOGGER.warn("Fail to shutdown IPythonClient");
            }
            this.watchDog.destroyProcess();
            this.gatewayServer.shutdown();
        }
    }

    public InterpreterResult interpret(String st, InterpreterContext context) {
        this.zeppelinContext.setGui(context.getGui());
        this.zeppelinContext.setNoteGui(context.getNoteGui());
        this.zeppelinContext.setInterpreterContext(context);
        this.interpreterOutput.setInterpreterOutput(context.out);
        ExecuteResponse response = this.ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(), this.interpreterOutput);
        try {
            this.interpreterOutput.getInterpreterOutput().flush();
        }
        catch (IOException e) {
            throw new RuntimeException("Fail to write output", e);
        }
        InterpreterResult result = new InterpreterResult(InterpreterResult.Code.valueOf((String)response.getStatus().name()));
        return result;
    }

    public void cancel(InterpreterContext context) throws InterpreterException {
        this.ipythonClient.cancel(CancelRequest.newBuilder().build());
    }

    public Interpreter.FormType getFormType() {
        return Interpreter.FormType.SIMPLE;
    }

    public int getProgress(InterpreterContext context) throws InterpreterException {
        return 0;
    }

    public List<InterpreterCompletion> completion(String buf, int cursor, InterpreterContext interpreterContext) {
        LOGGER.debug("Call completion for: " + buf);
        ArrayList<InterpreterCompletion> completions = new ArrayList<InterpreterCompletion>();
        CompletionRequest.getDefaultInstance();
        CompletionResponse response = this.ipythonClient.complete(CompletionRequest.newBuilder().setCode(buf).setCursor(cursor).build());
        for (int i = 0; i < response.getMatchesCount(); ++i) {
            String match = response.getMatches(i);
            int lastIndexOfDot = match.lastIndexOf(".");
            if (lastIndexOfDot != -1) {
                match = match.substring(lastIndexOfDot + 1);
            }
            completions.add(new InterpreterCompletion(match, match, ""));
        }
        return completions;
    }

    public BaseZeppelinContext getZeppelinContext() {
        return this.zeppelinContext;
    }

    public void onProcessComplete(int exitValue) {
        LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
    }

    public void onProcessFailed(ExecuteException e) {
        LOGGER.warn("Exception happens in Python Process", (Throwable)e);
        this.pythonProcessFailed = true;
    }

    private static class ProcessLogOutputStream
    extends LogOutputStream {
        private Logger logger;

        public ProcessLogOutputStream(Logger logger) {
            this.logger = logger;
        }

        protected void processLine(String s, int i) {
            this.logger.debug("Process Output: " + s);
        }
    }
}

