/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.cdfw;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import edu.iu.dsc.tws.common.driver.IDriverMessenger;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import edu.iu.dsc.tws.task.cdfw.CDFWEnv;
import edu.iu.dsc.tws.task.cdfw.DataFlowGraph;
import edu.iu.dsc.tws.task.cdfw.DefaultScheduler;
import edu.iu.dsc.tws.task.cdfw.DriveEventType;
import edu.iu.dsc.tws.task.cdfw.DriverEvent;
import edu.iu.dsc.tws.task.cdfw.DriverState;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class CDFWExecutor {
    private static final Logger LOG = Logger.getLogger(CDFWExecutor.class.getName());
    private BlockingQueue<DriverEvent> driverEvents = new LinkedBlockingDeque<DriverEvent>();
    private DriverState driverState = DriverState.INITIALIZE;
    private IDriverMessenger driverMessenger;
    private CDFWEnv executionEnv;

    public CDFWExecutor(CDFWEnv executionEnv, IDriverMessenger messenger) {
        this.driverMessenger = messenger;
        this.executionEnv = executionEnv;
    }

    public void execute(DataFlowGraph graph) {
        LOG.info("Starting task graph Requirements:" + graph.getGraphName());
        if (this.driverState != DriverState.JOB_FINISHED && this.driverState != DriverState.INITIALIZE) {
            throw new RuntimeException("Invalid state to execute a job: " + (Object)((Object)this.driverState));
        }
        DefaultScheduler defaultScheduler = new DefaultScheduler(this.executionEnv.getWorkerInfoList());
        Set<Integer> workerIDs = defaultScheduler.schedule(graph);
        if (this.driverState == DriverState.INITIALIZE || this.driverState == DriverState.JOB_FINISHED) {
            try {
                DataFlowGraph dataFlowGraph = this.buildCDFWSchedulePlan(graph, workerIDs);
                CDFWJobAPI.SubGraph job = this.buildCDFWJob(dataFlowGraph);
                this.submitJob(job);
                this.driverState = DriverState.JOB_SUBMITTED;
                this.waitForEvent(DriveEventType.FINISHED_JOB);
                this.driverState = DriverState.JOB_FINISHED;
            }
            catch (Exception e) {
                throw new RuntimeException("Driver is not initialized", e);
            }
        } else {
            throw new RuntimeException("Failed to submit job while in this state: " + (Object)((Object)this.driverState));
        }
    }

    @Deprecated
    public void executeCDFW(DataFlowGraph ... graph) {
        if (this.driverState != DriverState.JOB_FINISHED && this.driverState != DriverState.INITIALIZE) {
            throw new RuntimeException("Invalid state to execute a job: " + (Object)((Object)this.driverState));
        }
        DefaultScheduler defaultScheduler = new DefaultScheduler(this.executionEnv.getWorkerInfoList());
        Map<DataFlowGraph, Set<Integer>> scheduleGraphMap = defaultScheduler.schedule(graph);
        for (Map.Entry<DataFlowGraph, Set<Integer>> dataFlowGraphEntry : scheduleGraphMap.entrySet()) {
            if (this.driverState != DriverState.INITIALIZE && this.driverState != DriverState.JOB_FINISHED) continue;
            try {
                DataFlowGraph dataFlowGraph = dataFlowGraphEntry.getKey();
                Set<Integer> workerIDs = dataFlowGraphEntry.getValue();
                dataFlowGraph = this.buildCDFWSchedulePlan(dataFlowGraph, workerIDs);
                CDFWJobAPI.SubGraph job = this.buildCDFWJob(dataFlowGraph);
                this.submitJob(job);
                this.driverState = DriverState.JOB_SUBMITTED;
                this.waitForEvent(DriveEventType.FINISHED_JOB);
                this.driverState = DriverState.JOB_FINISHED;
            }
            catch (Exception e) {
                throw new RuntimeException("Driver is not initialized", e);
            }
        }
    }

    void close() {
        this.sendCloseMessage();
    }

    private DataFlowGraph buildCDFWSchedulePlan(DataFlowGraph dataFlowGraph, Set<Integer> workerIDs) {
        dataFlowGraph.setCdfwSchedulePlans(CDFWJobAPI.CDFWSchedulePlan.newBuilder().addAllWorkers(workerIDs).build());
        return dataFlowGraph;
    }

    private void sendCloseMessage() {
        CDFWJobAPI.CDFWJobCompletedMessage.Builder builder = CDFWJobAPI.CDFWJobCompletedMessage.newBuilder().setHtgJobname("");
        this.driverMessenger.broadcastToAllWorkers((Message)builder.build());
    }

    private void submitJob(CDFWJobAPI.SubGraph job) {
        LOG.log(Level.INFO, "Sending graph to workers for execution: " + job.getName());
        CDFWJobAPI.ExecuteMessage.Builder builder = CDFWJobAPI.ExecuteMessage.newBuilder();
        builder.setSubgraphName(job.getName());
        builder.setGraph(job);
        this.driverMessenger.broadcastToAllWorkers((Message)builder.build());
    }

    private CDFWJobAPI.SubGraph buildCDFWJob(DataFlowGraph job) {
        return job.build();
    }

    void workerMessageReceived(Any anyMessage, int senderWorkerID) {
        LOG.log(Level.INFO, String.format("Received worker message %d: %s", senderWorkerID, anyMessage.getClass().getName()));
        this.driverEvents.offer(new DriverEvent(DriveEventType.FINISHED_JOB, anyMessage, senderWorkerID));
    }

    private DriverEvent waitForEvent(DriveEventType type) throws Exception {
        try {
            DriverEvent event = this.driverEvents.take();
            if (event.getType() != type) {
                throw new Exception("Un-expected event: " + (Object)((Object)type));
            }
            return event;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to take event", e);
        }
    }
}

