/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.impl.cdfw;

import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.ProtocolStringList;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.resource.JobListener;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.master.worker.JMWorkerMessenger;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CDFWRuntime
implements JobListener {
    private static final Logger LOG = Logger.getLogger(CDFWRuntime.class.getName());
    private BlockingQueue<Any> executeMessageQueue;
    private int workerId;
    private KryoSerializer serializer;
    private Map<String, Map<String, DataObject<Object>>> outPuts = new HashMap<String, Map<String, DataObject<Object>>>();
    private TaskExecutor taskExecutor;

    public CDFWRuntime(Config cfg, int wId, List<JobMasterAPI.WorkerInfo> workerInfoList, Communicator net) {
        this.taskExecutor = new TaskExecutor(cfg, wId, workerInfoList, net, null);
        this.executeMessageQueue = new LinkedBlockingQueue<Any>();
        this.workerId = wId;
        this.serializer = new KryoSerializer();
    }

    public boolean execute() {
        block2: while (true) {
            try {
                while (true) {
                    Any msg;
                    if ((msg = this.executeMessageQueue.take()).is(CDFWJobAPI.ExecuteMessage.class)) {
                        if (!this.handleExecuteMessage(msg)) continue;
                        return false;
                    }
                    if (msg.is(CDFWJobAPI.CDFWJobCompletedMessage.class)) {
                        LOG.log(Level.INFO, this.workerId + "Received CDFW job completed message. Leaving execution loop");
                        break block2;
                    }
                    LOG.log(Level.WARNING, this.workerId + "Unknown message for cdfw task execution");
                }
            }
            catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unable to insert message to the queue", e);
                continue;
            }
            break;
        }
        LOG.log(Level.INFO, this.workerId + " Execution Completed");
        return true;
    }

    private boolean handleExecuteMessage(Any msg) {
        JMWorkerMessenger workerMessenger = JMWorkerAgent.getJMWorkerAgent().getJMWorkerMessenger();
        try {
            CDFWJobAPI.ExecuteMessage executeMessage = (CDFWJobAPI.ExecuteMessage)msg.unpack(CDFWJobAPI.ExecuteMessage.class);
            CDFWJobAPI.SubGraph subGraph = executeMessage.getGraph();
            ComputeGraph taskGraph = (ComputeGraph)this.serializer.deserialize(subGraph.getGraphSerialized().toByteArray());
            if (taskGraph == null) {
                LOG.severe(this.workerId + " Unable to find the subgraph " + subGraph.getName());
                return true;
            }
            ExecutionPlan executionPlan = this.taskExecutor.plan(taskGraph);
            if (subGraph.getInputsList().size() != 0) {
                for (CDFWJobAPI.Input input : subGraph.getInputsList()) {
                    String inputName = input.getName();
                    String inputGraph = input.getParentGraph();
                    if (!this.outPuts.containsKey(inputGraph)) {
                        throw new RuntimeException("We cannot find the input graph: " + inputGraph);
                    }
                    Map<String, DataObject<Object>> outsPerGraph = this.outPuts.get(inputGraph);
                    if (!outsPerGraph.containsKey(inputName)) {
                        throw new RuntimeException("We cannot find the input: " + inputName);
                    }
                    DataObject<Object> outPutObject = outsPerGraph.get(inputName);
                    this.taskExecutor.addSourceInput(taskGraph, executionPlan, inputName, outPutObject);
                }
            }
            List inputs = subGraph.getInputsList();
            for (CDFWJobAPI.Input in : inputs) {
                DataObject<Object> dataSet = this.outPuts.get(in.getParentGraph()).get(in.getName());
                this.taskExecutor.addSourceInput(taskGraph, executionPlan, in.getName(), dataSet);
            }
            this.taskExecutor.execute(taskGraph, executionPlan);
            LOG.log(Level.INFO, this.workerId + " Completed subgraph : " + subGraph.getName());
            CDFWJobAPI.ExecuteCompletedMessage completedMessage = CDFWJobAPI.ExecuteCompletedMessage.newBuilder().setSubgraphName(subGraph.getName()).build();
            ProtocolStringList outPutNames = subGraph.getOutputsList();
            HashMap outs = new HashMap();
            for (String out : outPutNames) {
                DataObject outPut = this.taskExecutor.getSinkOutput(taskGraph, executionPlan, out);
                outs.put(out, outPut);
            }
            this.outPuts.put(subGraph.getName(), outs);
            if (!workerMessenger.sendToDriver((Message)completedMessage)) {
                LOG.severe("Unable to send the subgraph completed message :" + completedMessage);
            }
        }
        catch (InvalidProtocolBufferException e) {
            LOG.log(Level.SEVERE, "Unable to unpack received message ", e);
        }
        return false;
    }

    public void workersScaledUp(int instancesAdded) {
        LOG.log(Level.INFO, this.workerId + "Workers scaled up msg received. Instances added: " + instancesAdded);
    }

    public void workersScaledDown(int instancesRemoved) {
        LOG.log(Level.INFO, this.workerId + "Workers scaled down msg received. Instances removed: " + instancesRemoved);
    }

    public void driverMessageReceived(Any anyMessage) {
        try {
            this.executeMessageQueue.put(anyMessage);
        }
        catch (InterruptedException e) {
            LOG.log(Level.SEVERE, "Unable to insert message to the queue", e);
        }
    }

    public void allWorkersJoined(List<JobMasterAPI.WorkerInfo> workerList) {
        LOG.log(Level.INFO, this.workerId + "All workers joined msg received");
    }
}

