/*
 * Decompiled with CFR 0.152.
 */
package com.spikeify.taskqueue.service;

import com.spikeify.taskqueue.TaskContext;
import com.spikeify.taskqueue.TaskResult;
import com.spikeify.taskqueue.entities.TaskResultState;
import com.spikeify.taskqueue.service.TaskExecutorService;
import com.spikeify.taskqueue.utils.Assert;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class QueueScheduler
implements Runnable {
    private static final Logger log = Logger.getLogger(QueueScheduler.class.getSimpleName());
    private final TaskExecutorService executor;
    private final TaskContext context;
    private final int taskTimeout;
    private final int taskInterruptTimeout;

    public QueueScheduler(TaskExecutorService executorService, int timeoutInSeconds, int interruptTimeoutSeconds, TaskContext threadContext) {
        Assert.notNull(executorService, "Missing queue executor service!");
        this.executor = executorService;
        this.taskTimeout = timeoutInSeconds;
        this.taskInterruptTimeout = interruptTimeoutSeconds;
        this.context = threadContext;
    }

    @Override
    public void run() {
        TaskResult result;
        TaskContext context = this.getContext();
        log.fine("Starting task execution ...");
        int successCount = 0;
        int allCount = 0;
        do {
            ExecutorService service = Executors.newSingleThreadExecutor();
            if (context.interrupted()) {
                log.info("Execution was interrupted from outside!");
                break;
            }
            WorkerThread worker = new WorkerThread(context);
            service.execute(worker);
            try {
                service.shutdown();
                if (!service.awaitTermination(this.taskTimeout, TimeUnit.SECONDS)) {
                    context.interrupt();
                    if (!service.awaitTermination(this.taskInterruptTimeout, TimeUnit.SECONDS)) {
                        log.warning("Failed to gracefully interrupt task, killing task instead!");
                        service.shutdownNow();
                        result = worker.getResult();
                        if (result == null) {
                            result = TaskResult.failed();
                            worker.reset();
                        }
                    } else {
                        result = worker.getResult();
                    }
                } else {
                    result = worker.getResult();
                }
            }
            catch (InterruptedException e) {
                log.log(Level.SEVERE, "Task has been timed out ...");
                result = TaskResult.failed();
            }
            if (result != null) {
                if (TaskResultState.ok.equals((Object)result.getState())) {
                    ++successCount;
                }
                ++allCount;
            }
            if (result == null) {
                log.info("No new tasks found ... exiting");
            } else {
                log.info("Last task result: " + result);
            }
            context.reset();
        } while (result != null);
        log.fine("No new tasks found, stopping after: " + successCount + "/" + allCount + " execution(s).");
    }

    private TaskContext getContext() {
        return this.context;
    }

    private class WorkerThread
    implements Runnable {
        private TaskContext context;
        private TaskResult result;

        public WorkerThread(TaskContext context) {
            this.context = context;
        }

        public TaskResult getResult() {
            return this.result;
        }

        @Override
        public void run() {
            this.result = QueueScheduler.this.executor.execute(this.context);
        }

        public void reset() {
            QueueScheduler.this.executor.reset();
        }
    }
}

