/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.eventsourcing.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.eventsourcing.EventBatch;
import io.fluxcapacitor.javaclient.eventsourcing.client.EventStoreClient;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

public class InMemoryEventStoreClient
extends InMemoryMessageStore
implements EventStoreClient {
    private final Map<String, List<EventBatch>> domainEvents = new ConcurrentHashMap<String, List<EventBatch>>();

    public InMemoryEventStoreClient() {
        super(MessageType.EVENT);
    }

    @Override
    public Awaitable storeEvents(String aggregateId, String domain, long lastSequenceNumber, List<SerializedMessage> events) {
        this.domainEvents.compute(aggregateId, (id, list) -> {
            if (list == null) {
                list = new ArrayList<EventBatch>();
            }
            list.add(new EventBatch(aggregateId, domain, lastSequenceNumber, events));
            return list;
        });
        return super.send(events.toArray(new SerializedMessage[0]));
    }

    @Override
    public Stream<SerializedMessage> getEvents(String aggregateId, long lastSequenceNumber) {
        return this.domainEvents.getOrDefault(aggregateId, Collections.emptyList()).stream().filter(batch -> batch.getLastSequenceNumber() > lastSequenceNumber).flatMap(batch -> {
            List events = batch.getEvents();
            if (batch.getFirstSequenceNumber() > lastSequenceNumber) {
                return events.stream();
            }
            return events.stream().skip(lastSequenceNumber - batch.getFirstSequenceNumber() + 1L);
        });
    }
}

