/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.impl;

import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.IExecution;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.modifiers.Collector;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.compute.nodes.INode;
import edu.iu.dsc.tws.api.compute.nodes.ISink;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.dataset.DataObjectImpl;
import edu.iu.dsc.tws.dataset.EmptyDataObject;
import edu.iu.dsc.tws.executor.core.ExecutionPlanBuilder;
import edu.iu.dsc.tws.executor.threading.Executor;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.tsched.taskscheduler.TaskScheduler;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public class TaskExecutor {
    private static final Logger LOG = Logger.getLogger(TaskExecutor.class.getName());
    private Config config;
    private int workerID;
    private List<JobMasterAPI.WorkerInfo> workerInfoList;
    private Communicator communicator;
    private CheckpointingClient checkpointingClient;
    private Executor executor;

    public TaskExecutor(Config cfg, int wId, List<JobMasterAPI.WorkerInfo> workerInfoList, Communicator net, CheckpointingClient checkpointingClient) {
        this.config = cfg;
        this.workerID = wId;
        this.workerInfoList = workerInfoList;
        this.communicator = net;
        this.checkpointingClient = checkpointingClient;
    }

    public TaskExecutor(WorkerEnvironment workerEnv) {
        this.config = workerEnv.getConfig();
        this.workerID = workerEnv.getWorkerId();
        this.workerInfoList = workerEnv.getWorkerList();
        this.communicator = workerEnv.getCommunicator();
        this.checkpointingClient = workerEnv.getWorkerController().getCheckpointingClient();
    }

    public ExecutionPlan plan(ComputeGraph graph) {
        TaskScheduler taskScheduler = new TaskScheduler();
        taskScheduler.initialize(this.config);
        WorkerPlan workerPlan = this.createWorkerPlan();
        TaskSchedulePlan taskSchedulePlan = taskScheduler.schedule(graph, workerPlan);
        ExecutionPlanBuilder executionPlanBuilder = new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient);
        return executionPlanBuilder.build(this.config, graph, taskSchedulePlan);
    }

    public Map<String, ExecutionPlan> plan(ComputeGraph ... graph) {
        WorkerPlan workerPlan = this.createWorkerPlan();
        TaskScheduler taskScheduler = new TaskScheduler();
        taskScheduler.initialize(this.config);
        Map schedulePlanMap = taskScheduler.schedule(workerPlan, graph);
        LinkedHashMap<String, ExecutionPlan> executionPlanMap = new LinkedHashMap<String, ExecutionPlan>();
        for (ComputeGraph aGraph : graph) {
            TaskSchedulePlan taskSchedulePlan = (TaskSchedulePlan)schedulePlanMap.get(aGraph.getGraphName());
            ExecutionPlanBuilder executionPlanBuilder = new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient);
            ExecutionPlan executionPlan = executionPlanBuilder.build(this.config, aGraph, taskSchedulePlan);
            executionPlanMap.put(aGraph.getGraphName(), executionPlan);
        }
        return executionPlanMap;
    }

    public ExecutionPlan executionPlan(ComputeGraph graph, TaskSchedulePlan taskSchedulePlan) {
        ExecutionPlanBuilder executionPlanBuilder = new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient);
        return executionPlanBuilder.build(this.config, graph, taskSchedulePlan);
    }

    public void execute(Config taskConfig, ComputeGraph graph, ExecutionPlan plan) {
        Config newCfg = Config.newBuilder().putAll(this.config).putAll(taskConfig).build();
        if (this.executor == null) {
            this.executor = new Executor(newCfg, this.workerID, this.communicator.getChannel(), graph.getOperationMode());
        }
        this.executor.execute(plan);
        this.executor.waitFor(plan);
    }

    public void execute(ComputeGraph graph, ExecutionPlan plan) {
        if (this.executor == null) {
            this.executor = new Executor(this.config, this.workerID, this.communicator.getChannel(), graph.getOperationMode());
        }
        this.executor.execute(plan);
        this.executor.waitFor(plan);
    }

    public void itrExecute(ComputeGraph graph, ExecutionPlan plan) {
        if (this.executor == null) {
            this.executor = new Executor(this.config, this.workerID, this.communicator.getChannel(), graph.getOperationMode());
        }
        this.executor.execute(plan);
    }

    public void waitFor(ComputeGraph graph, ExecutionPlan plan) {
        if (this.executor == null) {
            throw new IllegalStateException("Cannot call waifor before calling execute");
        }
        this.executor.waitFor(plan);
    }

    public IExecution iExecute(ComputeGraph graph, ExecutionPlan plan) {
        if (this.executor == null) {
            this.executor = new Executor(this.config, this.workerID, this.communicator.getChannel(), graph.getOperationMode());
        }
        return this.executor.iExecute(plan);
    }

    public void addInput(ComputeGraph graph, ExecutionPlan plan, String taskName, String inputKey, DataObject<?> input) {
        Map nodes = plan.getNodes(taskName);
        if (nodes == null) {
            return;
        }
        for (Map.Entry e : nodes.entrySet()) {
            INodeInstance node = (INodeInstance)e.getValue();
            INode task = node.getNode();
            if (task instanceof Receptor) {
                ((Receptor)task).add(inputKey, input);
                continue;
            }
            throw new RuntimeException("Cannot add input to non input instance: " + node);
        }
    }

    public void addSourceInput(ComputeGraph graph, ExecutionPlan plan, String inputKey, DataObject<Object> input) {
        Map nodes = plan.getNodes();
        if (nodes == null) {
            throw new RuntimeException(String.format("%d Failed to set input for non-existing existing sources: %s", this.workerID, plan.getNodeNames()));
        }
        for (Map.Entry e : nodes.entrySet()) {
            INodeInstance node = (INodeInstance)e.getValue();
            INode task = node.getNode();
            if (!(task instanceof Receptor) || !(task instanceof ISource)) continue;
            ((Receptor)task).add(inputKey, input);
        }
    }

    public <T> DataObject<T> getOutput(ComputeGraph graph, ExecutionPlan plan, String taskName) {
        Map nodes = plan.getNodes(taskName);
        if (nodes == null) {
            return new EmptyDataObject();
        }
        DataObjectImpl dataSet = new DataObjectImpl(taskName, this.config);
        for (Map.Entry e : nodes.entrySet()) {
            INodeInstance node = (INodeInstance)e.getValue();
            INode task = node.getNode();
            if (task instanceof Collector) {
                DataPartition partition = ((Collector)task).get();
                dataSet.addPartition(partition);
                continue;
            }
            throw new RuntimeException("Cannot collect from node because it is not a collector: " + node);
        }
        return dataSet;
    }

    public <T> DataObject<T> getSinkOutput(ComputeGraph graph, ExecutionPlan plan, String dataName) {
        Map nodes = plan.getNodes();
        DataObjectImpl dataSet = new DataObjectImpl(this.config);
        for (Map.Entry e : nodes.entrySet()) {
            INodeInstance node = (INodeInstance)e.getValue();
            INode task = node.getNode();
            if (!(task instanceof Collector) || !(task instanceof ISink)) continue;
            DataPartition partition = ((Collector)task).get(dataName);
            if (partition != null) {
                dataSet.addPartition(partition);
                continue;
            }
            LOG.warning(String.format("Task id %d returned null for data %s", node.getId(), dataName));
        }
        return dataSet;
    }

    public <T> DataObject<T> getOutput(ComputeGraph graph, ExecutionPlan plan, String taskName, String dataName) {
        Map nodes = plan.getNodes(taskName);
        if (nodes == null) {
            return new EmptyDataObject();
        }
        DataObjectImpl dataSet = new DataObjectImpl(this.config);
        for (Map.Entry e : nodes.entrySet()) {
            INodeInstance node = (INodeInstance)e.getValue();
            INode task = node.getNode();
            if (task instanceof Collector) {
                DataPartition partition = ((Collector)task).get(dataName);
                if (partition != null) {
                    dataSet.addPartition(partition);
                    continue;
                }
                LOG.warning(String.format("Task id %d returned null for data %s", node.getId(), dataName));
                continue;
            }
            throw new RuntimeException("Cannot collect from node because it is not a collector: " + node);
        }
        return dataSet;
    }

    private WorkerPlan createWorkerPlan() {
        ArrayList<Worker> workers = new ArrayList<Worker>();
        for (JobMasterAPI.WorkerInfo workerInfo : this.workerInfoList) {
            Worker w = new Worker(workerInfo.getWorkerID());
            workers.add(w);
        }
        return new WorkerPlan(workers);
    }

    public void close() {
        if (this.executor != null) {
            this.executor.close();
        }
    }
}

