/*
 * Decompiled with CFR 0.152.
 */
package com.spikeify.taskqueue.service;

import com.aerospike.client.AerospikeException;
import com.spikeify.ResultSet;
import com.spikeify.Spikeify;
import com.spikeify.SpikeifyService;
import com.spikeify.Work;
import com.spikeify.taskqueue.Job;
import com.spikeify.taskqueue.TaskQueueError;
import com.spikeify.taskqueue.entities.QueueInfo;
import com.spikeify.taskqueue.entities.QueueTask;
import com.spikeify.taskqueue.entities.TaskState;
import com.spikeify.taskqueue.entities.TaskStatistics;
import com.spikeify.taskqueue.service.TaskQueueService;
import com.spikeify.taskqueue.utils.Assert;
import java.util.Collections;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DefaultTaskQueueService
implements TaskQueueService {
    private static final Logger log = Logger.getLogger(DefaultTaskQueueService.class.getSimpleName());
    public static final String DEFAULT_QUEUE_NAME = "default";
    private static final int CHOOSE_NEXT_TASK_RETRIES = 10;
    private static final int MAX_TOP_ITEMS = 5;
    private final Spikeify sfy;

    public DefaultTaskQueueService(Spikeify spikeify) {
        Assert.notNull(spikeify, "Missing spikeify!");
        this.sfy = spikeify;
        SpikeifyService.register(QueueTask.class);
    }

    @Override
    public QueueTask add(Job job, String queueName) {
        Assert.notNull(job, "Missing job!");
        Assert.notNullOrEmpty(queueName, "Missing queue name!");
        QueueTask task = new QueueTask(job, queueName);
        this.sfy.create((Object)task).now();
        this.setQueueInfoCount(queueName, null, TaskState.queued);
        return task;
    }

    @Override
    public QueueTask next(String queueName) {
        Assert.notNullOrEmpty(queueName, "Missing queue name!");
        ResultSet openTasks = this.sfy.query(QueueTask.class).filter("lockFilter", QueueTask.getLockedFilter(queueName, false)).now();
        List list = openTasks.toList();
        if (list.size() == 0) {
            return null;
        }
        Collections.sort(list, new Comparator<QueueTask>(){

            @Override
            public int compare(QueueTask o1, QueueTask o2) {
                return o1.getUpdateTime().compareTo(o2.getUpdateTime());
            }
        });
        int size = Math.min(5, list.size());
        QueueTask proposed = null;
        for (int i = 1; i <= 10; ++i) {
            Random rand = new Random();
            int idx = rand.nextInt(size);
            proposed = this.transition((QueueTask)list.get(idx), TaskState.running);
            if (proposed != null && TaskState.running.equals((Object)proposed.getState())) {
                return proposed;
            }
            size = Math.min(10 * i, list.size());
        }
        return proposed;
    }

    @Override
    public List<QueueTask> list(TaskState state, String queueName) {
        Assert.notNull((Object)state, "Missing job state!");
        Assert.notNullOrEmpty(queueName, "Missing queue name!");
        ResultSet query = this.sfy.query(QueueTask.class).filter("stateFilter", QueueTask.getStateFilter(queueName, state)).now();
        return query.toList();
    }

    @Override
    public QueueTask transition(QueueTask task, final TaskState newState) {
        Assert.notNull(task, "Missing job!");
        Assert.notNull((Object)newState, "Missing state!");
        try {
            final String taskId = task.getId();
            final long updateTime = task.getUpdateTime();
            QueueTask updated = (QueueTask)this.sfy.transact(1, (Work)new Work<QueueTask>(){

                public QueueTask run() {
                    QueueTask original = (QueueTask)DefaultTaskQueueService.this.sfy.get(QueueTask.class).key(taskId).now();
                    if (original == null) {
                        return null;
                    }
                    if (!original.getUpdateTime().equals(updateTime)) {
                        throw new TaskQueueError("Thread collision, some other thread already modified task!");
                    }
                    original.setState(newState);
                    DefaultTaskQueueService.this.sfy.update((Object)original).now();
                    return original;
                }
            });
            if (updated != null) {
                this.setQueueInfoCount(updated.getQueue(), task.getState(), newState);
            }
            return updated;
        }
        catch (AerospikeException | ConcurrentModificationException e) {
            log.fine("Could not transition job: " + task + " to: " + (Object)((Object)newState) + ", thread collision!");
            return null;
        }
        catch (TaskQueueError e) {
            log.log(Level.SEVERE, "Transition failed, thread collision.", e);
            return null;
        }
    }

    private void setQueueInfoCount(String queue, TaskState oldState, TaskState newState) {
        QueueInfo exists = (QueueInfo)this.sfy.get(QueueInfo.class).key(queue).now();
        if (exists == null) {
            return;
        }
        try {
            if (oldState != null) {
                this.sfy.command(QueueInfo.class).key(queue).add(oldState.name(), -1L).now();
            }
            if (newState != null) {
                this.sfy.command(QueueInfo.class).key(queue).add(newState.name(), 1L).now();
            }
            if (TaskState.queued.equals((Object)newState)) {
                this.sfy.command(QueueInfo.class).key(queue).add("totalTasks", 1L).now();
            }
            if (TaskState.finished.equals((Object)newState)) {
                this.sfy.command(QueueInfo.class).key(queue).add("totalFinished", 1L).now();
            }
            if (TaskState.failed.equals((Object)newState)) {
                this.sfy.command(QueueInfo.class).key(queue).add("totalFailed", 1L).now();
            }
            if (TaskState.failed.equals((Object)oldState) && TaskState.running.equals((Object)newState)) {
                this.sfy.command(QueueInfo.class).key(queue).add("totalRetries", 1L).now();
            }
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Failed to count tasks!", e);
        }
    }

    protected boolean remove(QueueTask task) {
        Assert.notNull(task, "Missing job to be removed!");
        Assert.notNull(task.getId(), "Missing job id: " + task);
        task = this.transition(task, TaskState.purge);
        if (task != null && TaskState.purge.equals((Object)task.getState())) {
            this.sfy.delete((Object)task).now();
            return true;
        }
        return false;
    }

    @Override
    public TaskStatistics purge(TaskState state, int taskAge, String queueName) {
        Assert.notNull((Object)state, "Missing job state!");
        Assert.isTrue(state.canTransition(TaskState.purge), "Can't purge tasks in: " + (Object)((Object)state) + " state!");
        List<QueueTask> list = this.list(state, queueName);
        TaskStatistics.Builder statistics = new TaskStatistics.Builder();
        for (QueueTask item : list) {
            if (!item.isLocked() || !item.isOlderThan(taskAge) || !this.remove(item)) continue;
            statistics.include(item);
        }
        TaskStatistics output = statistics.build();
        this.setQueueInfoStatistics(state, queueName, output);
        return output;
    }

    private void setQueueInfoStatistics(final TaskState state, final String queueName, final TaskStatistics output) {
        this.sfy.transact(5, (Work)new Work<QueueInfo>(){

            public QueueInfo run() {
                QueueInfo original = (QueueInfo)DefaultTaskQueueService.this.sfy.get(QueueInfo.class).key(queueName).now();
                if (original == null) {
                    return null;
                }
                original.setStatistics(state, output);
                DefaultTaskQueueService.this.sfy.update((Object)original).now();
                return original;
            }
        });
    }
}

