/*
 * Decompiled with CFR 0.152.
 */
package no.skatteetaten.fastsetting.formueinntekt.felles.task.processor;

import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.api.Task;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskCallback;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskHandler;
import no.skatteetaten.fastsetting.formueinntekt.felles.task.processor.TaskHandlerShutdownException;

public class BufferingTaskHandlerFactory<TRANSACTION, EXCEPTION extends Exception, BAGGAGE>
implements Function<String, TaskHandler<TRANSACTION, EXCEPTION>> {
    private final Executor executor;
    private final boolean failPendingOnClose;
    private final int buffer;
    private final int workers;
    private final long poll;
    private final TimeUnit timeUnit;
    private final Function<String, ? extends TaskHandler<TRANSACTION, EXCEPTION>> delegate;
    private final Supplier<BAGGAGE> baggageSupplier;
    private final Consumer<BAGGAGE> baggageConsumer;

    public BufferingTaskHandlerFactory(Executor executor, boolean failPendingOnClose, int buffer, int workers, long poll, TimeUnit timeUnit, Function<String, ? extends TaskHandler<TRANSACTION, EXCEPTION>> delegate, Supplier<BAGGAGE> baggageSupplier, Consumer<BAGGAGE> baggageConsumer) {
        this.executor = executor;
        this.failPendingOnClose = failPendingOnClose;
        this.buffer = buffer;
        this.workers = workers;
        this.poll = poll;
        this.timeUnit = timeUnit;
        this.delegate = delegate;
        this.baggageConsumer = baggageConsumer;
        this.baggageSupplier = baggageSupplier;
    }

    public static <TRANSACTION, EXCEPTION extends Exception> BufferingTaskHandlerFactory<TRANSACTION, EXCEPTION, Void> withoutBaggage(Executor executor, boolean failPendingOnClose, int buffer, int workers, long poll, TimeUnit timeUnit, Function<String, ? extends TaskHandler<TRANSACTION, EXCEPTION>> delegate) {
        return new BufferingTaskHandlerFactory<TRANSACTION, EXCEPTION, Void>(executor, failPendingOnClose, buffer, workers, poll, timeUnit, delegate, () -> null, ignored -> {});
    }

    @Override
    public TaskHandler<TRANSACTION, EXCEPTION> apply(String topic) {
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(this.buffer);
        final CountDownLatch latch = new CountDownLatch(this.workers);
        final ConcurrentHashMap.KeySetView threads = ConcurrentHashMap.newKeySet();
        final AtomicBoolean closed = new AtomicBoolean();
        for (int index = 0; index < this.workers; ++index) {
            TaskHandler<TRANSACTION, EXCEPTION> handler = this.delegate.apply(topic);
            this.executor.execute(() -> {
                threads.add(Thread.currentThread());
                try {
                    Unit unit;
                    while (!closed.get() && !Thread.interrupted()) {
                        try {
                            unit = (Unit)queue.poll(this.poll, this.timeUnit);
                            if (unit == null) continue;
                            try {
                                this.baggageConsumer.accept(unit.getBaggage());
                                handler.accept(unit.getTasks(), unit.getCallback(), unit.getOnFailure());
                            }
                            catch (Throwable throwable) {
                                try {
                                    unit.getOnFailure().accept(throwable);
                                }
                                catch (Throwable throwable2) {
                                    // empty catch block
                                }
                                if (!(throwable instanceof InterruptedException)) continue;
                                throw (InterruptedException)throwable;
                            }
                        }
                        catch (InterruptedException ignored) {
                            // empty catch block
                            break;
                        }
                    }
                    while ((unit = (Unit)queue.poll()) != null) {
                        Thread.currentThread().interrupt();
                        try {
                            this.baggageConsumer.accept(unit.getBaggage());
                            if (this.failPendingOnClose) {
                                unit.getOnFailure().accept(new TaskHandlerShutdownException());
                                continue;
                            }
                            try {
                                handler.accept(unit.getTasks(), unit.getCallback(), unit.getOnFailure());
                            }
                            catch (Throwable throwable) {
                                unit.getOnFailure().accept(throwable);
                            }
                        }
                        catch (Throwable throwable) {}
                    }
                    return;
                }
                finally {
                    latch.countDown();
                }
            });
        }
        return new TaskHandler<TRANSACTION, EXCEPTION>(){

            @Override
            public void accept(Set<Task> tasks, TaskCallback<TRANSACTION, EXCEPTION> callback, Consumer<Throwable> onFailure) {
                Unit unit = new Unit(tasks, callback, onFailure, BufferingTaskHandlerFactory.this.baggageSupplier.get());
                try {
                    queue.put(unit);
                    return;
                }
                catch (InterruptedException interruptedException) {
                    if (BufferingTaskHandlerFactory.this.failPendingOnClose) {
                        try {
                            unit.getOnFailure().accept(new TaskHandlerShutdownException());
                        }
                        catch (Throwable throwable) {}
                    } else {
                        while (true) {
                            try {
                                queue.put(unit);
                            }
                            catch (Throwable throwable) {
                                continue;
                            }
                            break;
                        }
                    }
                    return;
                }
            }

            @Override
            public void close() {
                closed.set(true);
                threads.forEach(Thread::interrupt);
                while (true) {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }
        };
    }

    static class Unit<TRANSACTION, EXCEPTION extends Exception, BAGGAGE> {
        private final Set<Task> tasks;
        private final TaskCallback<TRANSACTION, EXCEPTION> callback;
        private final Consumer<Throwable> onFailure;
        private final BAGGAGE baggage;

        Unit(Set<Task> tasks, TaskCallback<TRANSACTION, EXCEPTION> callback, Consumer<Throwable> onFailure, BAGGAGE baggage) {
            this.tasks = tasks;
            this.callback = callback;
            this.onFailure = onFailure;
            this.baggage = baggage;
        }

        Set<Task> getTasks() {
            return this.tasks;
        }

        TaskCallback<TRANSACTION, EXCEPTION> getCallback() {
            return this.callback;
        }

        Consumer<Throwable> getOnFailure() {
            return this.onFailure;
        }

        public BAGGAGE getBaggage() {
            return this.baggage;
        }
    }
}

