/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.TrackingStrategy;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class InMemoryMessageStore
implements GatewayClient,
TrackingClient {
    private final MessageType messageType;
    private final AtomicLong nextIndex = new AtomicLong();
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog = new ConcurrentSkipListMap();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap<String, Long>();
    private final List<Consumer<SerializedMessage>> monitors = new CopyOnWriteArrayList<Consumer<SerializedMessage>>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Awaitable send(SerializedMessage ... messages) {
        Arrays.stream(messages).forEach(m -> {
            if (m.getIndex() == null) {
                m.setIndex(Long.valueOf(this.nextIndex.getAndIncrement()));
            }
            this.messageLog.put(m.getIndex(), (SerializedMessage)m);
            this.monitors.forEach(monitor -> monitor.accept(m));
        });
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            this.notifyAll();
        }
        return Awaitable.ready();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageBatch read(String consumer, int channel, int maxSize, Duration maxTimeout, String typeFilter, boolean ignoreMessageTarget, TrackingStrategy strategy) {
        if (channel != 0) {
            return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), null);
        }
        long deadline = System.currentTimeMillis() + maxTimeout.toMillis();
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            Long lastIndex;
            NavigableMap tailMap = Collections.emptyMap();
            while (System.currentTimeMillis() < deadline && (tailMap = this.messageLog.tailMap((Object)this.getLastIndex(consumer, strategy), false)).isEmpty()) {
                try {
                    this.wait(deadline - System.currentTimeMillis());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), null);
                }
            }
            List<Object> messages = new ArrayList(tailMap.values());
            Long l = lastIndex = messages.isEmpty() ? null : ((SerializedMessage)messages.get(messages.size() - 1)).getIndex();
            if (typeFilter != null) {
                messages = messages.stream().filter(m -> m.getData().getType().matches(typeFilter)).collect(Collectors.toList());
            }
            return new MessageBatch(new int[]{0, 1}, messages, lastIndex);
        }
    }

    private long getLastIndex(String consumer, TrackingStrategy strategy) {
        TrackingStrategy s = strategy == TrackingStrategy.TYPE_DEFAULT ? this.messageType.getDefaultReadStrategy() : strategy;
        return this.consumerTokens.computeIfAbsent(consumer, k -> -1L);
    }

    @Override
    public Awaitable storePosition(String consumer, int[] segment, long lastIndex) {
        this.consumerTokens.put(consumer, lastIndex);
        return Awaitable.ready();
    }

    public Registration registerMonitor(Consumer<SerializedMessage> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }

    @Override
    public void close() {
    }

    @ConstructorProperties(value={"messageType"})
    public InMemoryMessageStore(MessageType messageType) {
        this.messageType = messageType;
    }
}

