/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.scheduling.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.scheduling.ScheduledMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.TrackingStrategy;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;

public class InMemorySchedulingClient
extends InMemoryMessageStore
implements SchedulingClient {
    private final ConcurrentSkipListMap<Long, String> times = new ConcurrentSkipListMap();

    @Override
    public MessageBatch readAndWait(String consumer, String trackerId, int maxSize, Duration maxTimeout, String typeFilter, boolean ignoreMessageTarget, TrackingStrategy readStrategy) {
        Long lastIndex;
        MessageBatch messageBatch = super.readAndWait(consumer, trackerId, maxSize, maxTimeout, typeFilter, ignoreMessageTarget, readStrategy);
        List messages = messageBatch.getMessages().stream().filter(m -> this.times.containsKey(m.getIndex())).filter(m -> TimingUtils.isMissedDeadline((long)m.getIndex())).collect(Collectors.toList());
        Long l = lastIndex = messages.isEmpty() ? null : ((SerializedMessage)messages.get(messages.size() - 1)).getIndex();
        if (typeFilter != null) {
            messages = messages.stream().filter(m -> m.getData().getType().matches(typeFilter)).collect(Collectors.toList());
        }
        return new MessageBatch(messageBatch.getSegment(), messages, lastIndex);
    }

    @Override
    public Awaitable storePosition(String consumer, int[] segment, long lastIndex) {
        this.times.headMap((Object)lastIndex).clear();
        return super.storePosition(consumer, segment, lastIndex);
    }

    @Override
    public Awaitable schedule(ScheduledMessage ... schedules) {
        for (ScheduledMessage schedule : schedules) {
            long index = System.currentTimeMillis();
            while (this.times.putIfAbsent(index, schedule.getScheduleId()) != null) {
                ++index;
            }
            schedule.getMessage().setIndex(Long.valueOf(index));
        }
        super.send((SerializedMessage[])Arrays.stream(schedules).map(ScheduledMessage::getMessage).toArray(SerializedMessage[]::new));
        return Awaitable.ready();
    }

    @Override
    public Awaitable cancelSchedule(String scheduleId) {
        this.times.values().removeIf(s -> s.equals(scheduleId));
        return Awaitable.ready();
    }

    @Override
    public Awaitable send(SerializedMessage ... messages) {
        throw new UnsupportedOperationException("Use method #schedule instead");
    }
}

