/*
 * Decompiled with CFR 0.152.
 */
package com.spikeify.taskqueue.service;

import com.spikeify.Spikeify;
import com.spikeify.Work;
import com.spikeify.commands.AcceptFilter;
import com.spikeify.taskqueue.entities.QueueInfo;
import com.spikeify.taskqueue.entities.QueueInfoUpdater;
import com.spikeify.taskqueue.entities.QueueSettings;
import com.spikeify.taskqueue.entities.TaskState;
import com.spikeify.taskqueue.service.DefaultTaskExecutorService;
import com.spikeify.taskqueue.service.QueuePurger;
import com.spikeify.taskqueue.service.QueueScheduler;
import com.spikeify.taskqueue.service.TaskExecutorService;
import com.spikeify.taskqueue.service.TaskQueueManager;
import com.spikeify.taskqueue.service.TaskQueueService;
import com.spikeify.taskqueue.service.TaskThreadPoolContext;
import com.spikeify.taskqueue.utils.Assert;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DefaultTaskQueueManager
implements TaskQueueManager {
    public static final Logger log = Logger.getLogger(DefaultTaskExecutorService.class.getSimpleName());
    private final Spikeify sfy;
    private final TaskQueueService queues;
    private final Map<String, ScheduledExecutorService> threadPool = new HashMap<String, ScheduledExecutorService>();

    public DefaultTaskQueueManager(Spikeify spikeify, TaskQueueService queueService) {
        Assert.notNull(spikeify, "Missing spikeify!");
        this.sfy = spikeify;
        this.queues = queueService;
    }

    @Override
    public QueueInfo register(String queueName, boolean autoStart) {
        return this.register(queueName, null, autoStart);
    }

    @Override
    public QueueInfo register(final String queueName, final QueueSettings settings, final boolean autoStart) {
        Assert.notNullOrEmpty(queueName, "Missing queue name!");
        QueueInfo queue = (QueueInfo)this.sfy.transact(5, (Work)new Work<QueueInfo>(){

            public QueueInfo run() {
                QueueInfo original = (QueueInfo)DefaultTaskQueueManager.this.sfy.get(QueueInfo.class).key(queueName.trim()).now();
                if (original != null) {
                    if (settings != null) {
                        original.setSettings(settings);
                        DefaultTaskQueueManager.this.sfy.update((Object)original).now();
                    }
                    return original;
                }
                QueueInfo newQueue = new QueueInfo(queueName);
                if (settings != null) {
                    newQueue.setSettings(settings);
                }
                newQueue.setStarted(autoStart);
                DefaultTaskQueueManager.this.sfy.create((Object)newQueue).now();
                return newQueue;
            }
        });
        log.info("Queue: " + queueName + ", registered!");
        return queue;
    }

    @Override
    public QueueInfo info(String queueName) {
        Assert.notNullOrEmpty(queueName, "Missing queue name!");
        String name = queueName.trim();
        return (QueueInfo)this.sfy.get(QueueInfo.class).key(name).now();
    }

    @Override
    public void resetStatistics(String queueName, final boolean force) {
        this.save(queueName, new QueueInfoUpdater(){

            @Override
            public void update(QueueInfo info) {
                info.reset(force);
            }
        });
    }

    @Override
    public List<QueueInfo> list(final Boolean active) {
        return this.sfy.scanAll(QueueInfo.class).filter((AcceptFilter)new AcceptFilter<QueueInfo>(){

            public boolean accept(QueueInfo queueInfo) {
                return active == null || active.booleanValue() == queueInfo.isEnabled();
            }
        }).now();
    }

    @Override
    public void unregister(String queueName) {
        QueueInfo found = this.info(queueName);
        if (found != null) {
            try {
                this.stop(queueName);
                this.queues.purge(TaskState.queued, 0, queueName);
                this.queues.purge(TaskState.failed, 0, queueName);
                this.queues.purge(TaskState.finished, 0, queueName);
                this.queues.purge(TaskState.interrupted, 0, queueName);
                log.info("Queue: " + queueName + ", unregistered!");
                this.sfy.delete((Object)found).now();
            }
            catch (InterruptedException e) {
                log.log(Level.SEVERE, "Failed to stop queue: " + queueName + ", can't unregister!", e);
            }
        }
    }

    @Override
    public void start(String ... queueNames) throws InterruptedException {
        List<QueueInfo> found = this.getQueues(queueNames);
        for (QueueInfo queue : found) {
            String name = queue.getName();
            QueueSettings settings = queue.getSettings();
            this.startQueue(name, true);
            this.stopRunningThreads(name, settings);
            ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(settings.getMaxThreads());
            TaskThreadPoolContext context = new TaskThreadPoolContext(executorService);
            for (int i = 0; i < settings.getMaxThreads(); ++i) {
                TaskExecutorService executor = this.getExecutor(name);
                executorService.scheduleAtFixedRate(new QueueScheduler(executor, settings.getTaskTimeoutSeconds(), settings.getTaskInterruptTimeoutSeconds(), context), settings.getQueueMaxSleepTimeSeconds() + (long)(100 * i), settings.getQueueMaxSleepTimeSeconds() * 1000L, TimeUnit.MILLISECONDS);
            }
            executorService.scheduleAtFixedRate(new QueuePurger(this.queues, name, settings), settings.getQueuePurgeSleepTimeSeconds(), settings.getQueuePurgeSleepTimeSeconds(), TimeUnit.SECONDS);
            this.threadPool.put(name, executorService);
            log.info("Started queue: " + name);
        }
    }

    @Override
    public TaskExecutorService getExecutor(String queueName) {
        return new DefaultTaskExecutorService(this.queues, queueName);
    }

    @Override
    public void stop(String ... queueNames) throws InterruptedException {
        List<QueueInfo> found = this.getQueues(queueNames);
        for (QueueInfo queue : found) {
            String name = queue.getName();
            this.startQueue(name, false);
            this.stopRunningThreads(name, queue.getSettings());
            log.info("Stopped queue: " + name);
        }
    }

    @Override
    public boolean isRunning(String queueName) {
        return this.threadPool.get(queueName) != null;
    }

    @Override
    public QueueInfo enable(String queueName) {
        return this.save(queueName, new QueueInfoUpdater(){

            @Override
            public void update(QueueInfo info) {
                info.setEnabled(true);
            }
        });
    }

    @Override
    public QueueInfo disable(String queueName) {
        return this.save(queueName, new QueueInfoUpdater(){

            @Override
            public void update(QueueInfo info) {
                info.setEnabled(false);
                info.setStarted(false);
            }
        });
    }

    @Override
    public void check(String ... queueNames) throws InterruptedException {
        List<QueueInfo> queues = this.getQueues(queueNames);
        for (QueueInfo info : queues) {
            QueueInfo original = (QueueInfo)this.sfy.get(QueueInfo.class).key(info.getName()).now();
            boolean isStarted = original.isStarted();
            boolean isInPool = this.threadPool.containsKey(original.getName());
            if (isStarted && !isInPool) {
                this.start(original.getName());
                continue;
            }
            if (isStarted || !isInPool) continue;
            this.stop(original.getName());
        }
    }

    @Override
    public void set(String queueName, final QueueSettings settings) {
        Assert.notNullOrEmpty(queueName, "Missing queue name!");
        this.save(queueName, new QueueInfoUpdater(){

            @Override
            public void update(QueueInfo info) {
                info.setSettings(settings);
            }
        });
    }

    private List<QueueInfo> getQueues(String ... queueNames) {
        if (queueNames == null || queueNames.length == 0) {
            return this.list(true);
        }
        ArrayList<QueueInfo> list = new ArrayList<QueueInfo>();
        for (String name : queueNames) {
            QueueInfo queue = this.info(name);
            Assert.notNull(queue, "Queue: " + name + ", is not registered!");
            if (!queue.isEnabled()) continue;
            list.add(queue);
        }
        return list;
    }

    private QueueInfo startQueue(String queueName, final boolean start) {
        return this.save(queueName, new QueueInfoUpdater(){

            @Override
            public void update(QueueInfo info) {
                info.setStarted(start);
            }
        });
    }

    private QueueInfo save(final String queueName, final QueueInfoUpdater updater) {
        return (QueueInfo)this.sfy.transact(5, (Work)new Work<QueueInfo>(){

            public QueueInfo run() {
                QueueInfo original = (QueueInfo)DefaultTaskQueueManager.this.sfy.get(QueueInfo.class).key(queueName).now();
                updater.update(original);
                DefaultTaskQueueManager.this.sfy.update((Object)original).now();
                return original;
            }
        });
    }

    protected void stopRunningThreads(String queueName, QueueSettings settings) throws InterruptedException {
        ScheduledExecutorService running = this.threadPool.get(queueName);
        if (running != null) {
            running.shutdown();
            if (!running.awaitTermination(settings.getTaskInterruptTimeoutSeconds(), TimeUnit.SECONDS)) {
                log.warning("Executor did not terminate in the specified time.");
                List<Runnable> droppedTasks = running.shutdownNow();
                log.warning("Executor was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
            }
        }
        this.threadPool.remove(queueName);
    }
}

