/*
 * Decompiled with CFR 0.152.
 */
package org.codetome.riptide.core.service.impl;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.collections.IntIterator;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.ranges.IntRange;
import org.codetome.riptide.core.api.exception.MissingNameException;
import org.codetome.riptide.core.domain.Message;
import org.codetome.riptide.core.domain.OperationDescriptor;
import org.codetome.riptide.core.domain.ProcessEvent;
import org.codetome.riptide.core.service.impl.SingleThreadDispatchMessageReceiver$WhenMappings;
import org.codetome.riptide.core.service.messaging.MessagePublisher;
import org.codetome.riptide.core.service.messaging.MessageReceiver;
import org.codetome.riptide.core.service.process.ProcessEventService;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metadata(mv={1, 1, 6}, bv={1, 0, 1}, k=1, d1={"\u0000R\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0000\u0018\u00002\u00020\u00012\u00020\u0002B\u001f\u0012\u0006\u0010\u0003\u001a\u00020\u0004\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\b\b\u0002\u0010\u0007\u001a\u00020\b\u00a2\u0006\u0002\u0010\tJ\b\u0010\u0011\u001a\u00020\u0012H\u0002J\b\u0010\u0013\u001a\u00020\u0012H\u0016J\u001e\u0010\u0014\u001a\u00020\u00122\f\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00170\u00162\u0006\u0010\u0018\u001a\u00020\u0019H\u0002J\u0016\u0010\u001a\u001a\u00020\b2\f\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00170\u0016H\u0002J\u001e\u0010\u001b\u001a\u00020\u00122\f\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00170\u00162\u0006\u0010\u0018\u001a\u00020\u0019H\u0016R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\f\u001a\b\u0012\u0004\u0012\u00020\u000e0\rX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001c"}, d2={"Lorg/codetome/riptide/core/service/impl/SingleThreadDispatchMessageReceiver;", "Lorg/codetome/riptide/core/service/messaging/MessageReceiver;", "Ljava/io/Closeable;", "processEventService", "Lorg/codetome/riptide/core/service/process/ProcessEventService;", "messagePublisher", "Lorg/codetome/riptide/core/service/messaging/MessagePublisher;", "numThreads", "", "(Lorg/codetome/riptide/core/service/process/ProcessEventService;Lorg/codetome/riptide/core/service/messaging/MessagePublisher;I)V", "closed", "Ljava/util/concurrent/atomic/AtomicBoolean;", "executors", "", "Ljava/util/concurrent/ExecutorService;", "logger", "Lorg/slf4j/Logger;", "checkClosed", "", "close", "doReceive", "message", "Lorg/codetome/riptide/core/domain/Message;", "", "operation", "Lorg/codetome/riptide/core/domain/OperationDescriptor;", "fetchSubjectIndexForMessage", "receive", "riptide.core.java_main"})
public final class SingleThreadDispatchMessageReceiver
implements MessageReceiver,
Closeable {
    private final Logger logger;
    private final List<ExecutorService> executors;
    private final AtomicBoolean closed;
    private final ProcessEventService processEventService;
    private MessagePublisher messagePublisher;
    private final int numThreads;

    @Override
    public void receive(@NotNull Message<Object> message, @NotNull OperationDescriptor operation) {
        Intrinsics.checkParameterIsNotNull(message, (String)"message");
        Intrinsics.checkParameterIsNotNull((Object)operation, (String)"operation");
        this.checkClosed();
        this.executors.get(this.fetchSubjectIndexForMessage(message)).submit(new Runnable(this, message, operation){
            final /* synthetic */ SingleThreadDispatchMessageReceiver this$0;
            final /* synthetic */ Message $message;
            final /* synthetic */ OperationDescriptor $operation;

            public final void run() {
                SingleThreadDispatchMessageReceiver.access$doReceive(this.this$0, this.$message, this.$operation);
            }
            {
                this.this$0 = singleThreadDispatchMessageReceiver;
                this.$message = message;
                this.$operation = operationDescriptor;
            }
        });
    }

    private final void doReceive(Message<Object> message, OperationDescriptor operation) {
        try {
            switch (SingleThreadDispatchMessageReceiver$WhenMappings.$EnumSwitchMapping$0[operation.getOperationType().ordinal()]) {
                case 1: {
                    operation.getConsumerFn().invoke(message);
                    break;
                }
                case 2: {
                    this.messagePublisher.publish((Message)operation.getProcessorFn().invoke(message));
                    break;
                }
                default: {
                    String msg = "No operation found for received message: " + message + "!";
                    this.logger.error(msg);
                    this.processEventService.sendProcessEvent(ProcessEvent.Companion.createFromException$riptide_core_java_main(new MissingNameException(), msg));
                    break;
                }
            }
        }
        catch (Exception e) {
            String msg = "Failed to receive message!";
            this.logger.error(msg, (Throwable)e);
            this.processEventService.sendProcessEvent(ProcessEvent.Companion.createFromException$riptide_core_java_main(e, msg));
        }
    }

    @Override
    public void close() {
        this.closed.set(true);
        Iterable $receiver$iv = this.executors;
        for (Object element$iv : $receiver$iv) {
            ExecutorService it = (ExecutorService)element$iv;
            it.shutdown();
        }
    }

    private final void checkClosed() {
        if (this.closed.get()) {
            throw (Throwable)new IllegalStateException("This MessageReceiver is closed!");
        }
    }

    private final int fetchSubjectIndexForMessage(Message<Object> message) {
        return (int)Math.abs(message.getPid().getMostSignificantBits() % (long)this.numThreads);
    }

    public SingleThreadDispatchMessageReceiver(@NotNull ProcessEventService processEventService, @NotNull MessagePublisher messagePublisher, int numThreads) {
        List list;
        Intrinsics.checkParameterIsNotNull((Object)processEventService, (String)"processEventService");
        Intrinsics.checkParameterIsNotNull((Object)messagePublisher, (String)"messagePublisher");
        this.processEventService = processEventService;
        this.messagePublisher = messagePublisher;
        this.numThreads = numThreads;
        Logger logger = LoggerFactory.getLogger(this.getClass());
        Intrinsics.checkExpressionValueIsNotNull((Object)logger, (String)"LoggerFactory.getLogger(javaClass)");
        this.logger = logger;
        SingleThreadDispatchMessageReceiver singleThreadDispatchMessageReceiver = this;
        singleThreadDispatchMessageReceiver.executors = list = (List)new ArrayList();
        this.closed = new AtomicBoolean(false);
        Iterable $receiver$iv = (Iterable)new IntRange(1, this.numThreads);
        Iterator iterator = $receiver$iv.iterator();
        while (iterator.hasNext()) {
            int element$iv;
            int it = element$iv = ((IntIterator)iterator).nextInt();
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Intrinsics.checkExpressionValueIsNotNull((Object)executorService, (String)"Executors.newSingleThreadExecutor()");
            this.executors.add(executorService);
        }
    }

    public /* synthetic */ SingleThreadDispatchMessageReceiver(ProcessEventService processEventService, MessagePublisher messagePublisher, int n, int n2, DefaultConstructorMarker defaultConstructorMarker) {
        if ((n2 & 4) != 0) {
            n = Runtime.getRuntime().availableProcessors() * 2;
        }
        this(processEventService, messagePublisher, n);
    }

    public static final /* synthetic */ void access$doReceive(SingleThreadDispatchMessageReceiver $this, @NotNull Message message, @NotNull OperationDescriptor operation) {
        $this.doReceive(message, operation);
    }
}

