/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.caching;

import io.fluxcapacitor.common.IndexUtils;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingObject;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.modeling.AggregateIdResolver;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.modeling.AggregateRoot;
import io.fluxcapacitor.javaclient.modeling.AggregateTypeResolver;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandler;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import io.fluxcapacitor.javaclient.tracking.handling.validation.ValidationUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingAggregateRepository
implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(CachingAggregateRepository.class);
    private static final Function<String, String> keyFunction = aggregateId -> CachingAggregateRepository.class.getSimpleName() + ":" + aggregateId;
    public static final Duration slowTrackingThreshold = Duration.ofSeconds(5L);
    private final AggregateRepository delegate;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Cache cache;
    private final Client client;
    private final Serializer serializer;
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile long lastEventIndex = -1L;

    @Override
    public <T> AggregateRoot<T> load(@NonNull String aggregateId, @NonNull Class<T> aggregateType, boolean readOnly, boolean onlyCached) {
        if (aggregateId == null) {
            throw new NullPointerException("aggregateId is marked non-null but is null");
        }
        if (aggregateType == null) {
            throw new NullPointerException("aggregateType is marked non-null but is null");
        }
        if (!this.delegate.cachingAllowed(aggregateType) || !readOnly) {
            return this.delegate.load(aggregateId, aggregateType, readOnly, onlyCached);
        }
        AggregateRoot<T> result = this.delegate.load(aggregateId, aggregateType, true, true);
        if (result == null) {
            return Optional.ofNullable(this.doLoad(aggregateId, aggregateType, onlyCached)).filter(a -> Optional.ofNullable(a.get()).map(m -> aggregateType.isAssignableFrom(m.getClass())).orElse(true)).orElseGet(() -> this.delegate.load(aggregateId, aggregateType, true, onlyCached));
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> RefreshingAggregateRoot<T> doLoad(String aggregateId, Class<T> type, boolean onlyCached) {
        DeserializingMessage current;
        if (this.started.compareAndSet(false, true)) {
            this.lastEventIndex = IndexUtils.indexForCurrentTime();
            DefaultTracker.start(this::handleEvents, ConsumerConfiguration.builder().messageType(MessageType.NOTIFICATION).lastIndex(this.lastEventIndex).name(CachingAggregateRepository.class.getSimpleName()).build(), this.client);
            Cache cache = this.cache;
            synchronized (cache) {
                this.cache.notifyAll();
            }
        }
        if ((current = DeserializingMessage.getCurrent()) != null) {
            switch (current.getMessageType()) {
                case EVENT: 
                case NOTIFICATION: {
                    Long eventIndex = current.getSerializedObject().getIndex();
                    if (eventIndex == null || this.lastEventIndex >= eventIndex) break;
                    Cache cache = this.cache;
                    synchronized (cache) {
                        Instant start = Instant.now();
                        while (this.lastEventIndex < eventIndex) {
                            try {
                                this.cache.wait(5000L);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                log.warn("Failed to load aggregate for event {}", (Object)current, (Object)e);
                                return null;
                            }
                        }
                        Duration fetchDuration = Duration.between(start, Instant.now());
                        if (fetchDuration.compareTo(slowTrackingThreshold) > 0) {
                            log.warn("It took over {} to load aggregate {} of type {}. This indicates that the tracker in the caching aggregate repo has trouble keeping up.", new Object[]{fetchDuration, aggregateId, type});
                        }
                        break;
                    }
                }
            }
        }
        if (onlyCached) {
            return (RefreshingAggregateRoot)this.cache.getIfPresent(keyFunction.apply(aggregateId));
        }
        return this.cache.get(keyFunction.apply(aggregateId), cacheKey -> Optional.ofNullable(this.delegate.load(aggregateId, type)).map(a -> {
            EventSourcingHandler handler = this.handlerFactory.forType(type);
            return new RefreshingAggregateRoot(a.get(), handler, this.serializer, aggregateId, type, a.previous(), a.lastEventId(), a.lastEventIndex(), a.timestamp(), Status.UNVERIFIED);
        }).orElse(null));
    }

    protected void handleEvents(List<SerializedMessage> messages) {
        try {
            this.serializer.deserializeMessages(messages.stream(), false, MessageType.EVENT).forEach(m -> {
                String aggregateId = AggregateIdResolver.getAggregateId(m);
                Class<?> aggregateType = AggregateTypeResolver.getAggregateType(m);
                if (aggregateId != null && aggregateType != null && this.delegate.cachingAllowed(aggregateType)) {
                    try {
                        this.handleEvent((DeserializingMessage)m, aggregateId, aggregateType);
                    }
                    catch (Exception e) {
                        log.error("Failed to handle event for aggregate with id {} of type {}", new Object[]{aggregateId, aggregateType, e});
                    }
                }
            });
        }
        finally {
            messages.stream().reduce((a, b) -> b).map(SerializedMessage::getIndex).ifPresent(index -> {
                this.lastEventIndex = index;
                Cache cache = this.cache;
                synchronized (cache) {
                    this.cache.notifyAll();
                }
            });
        }
    }

    protected <T> void handleEvent(DeserializingMessage event, String aggregateId, Class<T> type) {
        EventSourcingHandler handler = this.handlerFactory.forType(type);
        String cacheKey = keyFunction.apply(aggregateId);
        String eventId = event.getSerializedObject().getMessageId();
        Long eventIndex = event.getSerializedObject().getIndex();
        Instant timestamp = Instant.ofEpochMilli(event.getSerializedObject().getTimestamp());
        RefreshingAggregateRoot aggregate = (RefreshingAggregateRoot)this.cache.getIfPresent(cacheKey);
        if (aggregate == null || aggregate.status == Status.UNVERIFIED) {
            aggregate = Optional.ofNullable(this.delegate.load(aggregateId, type, true, false)).map(a -> new RefreshingAggregateRoot(a.get(), handler, this.serializer, a.id(), a.type(), a.previous(), a.lastEventId(), a.lastEventIndex(), a.timestamp(), Objects.equals(a.lastEventId(), eventId) ? Status.IN_SYNC : Status.AHEAD)).orElseGet(() -> {
                log.warn("Delegate repository did not contain aggregate with id {} of type {}", (Object)aggregateId, (Object)type);
                return null;
            });
        } else if (aggregate.status == Status.IN_SYNC) {
            try {
                aggregate = new RefreshingAggregateRoot(handler.invoke(aggregate.get(), event), handler, this.serializer, aggregate.id(), aggregate.type(), aggregate, eventId, eventIndex, timestamp, Status.IN_SYNC);
            }
            catch (Exception e) {
                log.error("Failed to update aggregate with id {} of type {}", new Object[]{aggregateId, type, e});
                aggregate = null;
            }
        } else if (eventId.equals(aggregate.lastEventId)) {
            aggregate = aggregate.toBuilder().status(Status.IN_SYNC).build();
        }
        if (aggregate == null) {
            this.cache.invalidate(cacheKey);
        } else {
            this.cache.put(cacheKey, aggregate);
        }
    }

    @Override
    public boolean supports(Class<?> aggregateType) {
        return this.delegate.supports(aggregateType);
    }

    @Override
    public boolean cachingAllowed(Class<?> aggregateType) {
        return this.delegate.cachingAllowed(aggregateType);
    }

    @ConstructorProperties(value={"delegate", "handlerFactory", "cache", "client", "serializer"})
    public CachingAggregateRepository(AggregateRepository delegate, EventSourcingHandlerFactory handlerFactory, Cache cache, Client client, Serializer serializer) {
        this.delegate = delegate;
        this.handlerFactory = handlerFactory;
        this.cache = cache;
        this.client = client;
        this.serializer = serializer;
    }

    private static final class RefreshingAggregateRoot<T>
    implements AggregateRoot<T> {
        private final T model;
        private final EventSourcingHandler<T> handler;
        private final Serializer serializer;
        private final String id;
        private final Class<T> type;
        private final AggregateRoot<T> previous;
        private final String lastEventId;
        private final Long lastEventIndex;
        private final Instant timestamp;
        private final Status status;

        @Override
        public T get() {
            return this.model;
        }

        @Override
        public AggregateRoot<T> apply(Message eventMessage) {
            throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The aggregate is readonly.", eventMessage));
        }

        @Override
        public <E extends Exception> AggregateRoot<T> assertLegal(Object ... commands) throws E {
            switch (commands.length) {
                case 0: {
                    return this;
                }
                case 1: {
                    ValidationUtils.assertLegal(commands[0], this.model);
                    return this;
                }
            }
            RefreshingAggregateRoot<T> result = this;
            Iterator iterator = Arrays.stream(commands).iterator();
            while (iterator.hasNext()) {
                Object c = iterator.next();
                ValidationUtils.assertLegal(c, result.get());
                if (!iterator.hasNext()) continue;
                result = result.forceApply(c);
            }
            return this;
        }

        private RefreshingAggregateRoot<T> forceApply(Object event) {
            Message message = event instanceof Message ? (Message)event : new Message(event);
            Message eventMessage = message.withMetadata(message.getMetadata().with(new Object[]{"$aggregateId", this.id, "$aggregateType", this.type.getName()}));
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject<byte[], SerializedMessage>(eventMessage.serialize(this.serializer), type -> this.serializer.convert(eventMessage.getPayload(), type)), MessageType.EVENT);
            return new RefreshingAggregateRoot<T>(this.handler.invoke(this.model, deserializingMessage), this.handler, this.serializer, this.id, this.type, this, eventMessage.getMessageId(), null, this.timestamp, this.status);
        }

        public static <T> RefreshingAggregateRootBuilder<T> builder() {
            return new RefreshingAggregateRootBuilder();
        }

        public RefreshingAggregateRootBuilder<T> toBuilder() {
            return new RefreshingAggregateRootBuilder<T>().model(this.model).handler(this.handler).serializer(this.serializer).id(this.id).type(this.type).previous(this.previous).lastEventId(this.lastEventId).lastEventIndex(this.lastEventIndex).timestamp(this.timestamp).status(this.status);
        }

        public T model() {
            return this.model;
        }

        public EventSourcingHandler<T> handler() {
            return this.handler;
        }

        public Serializer serializer() {
            return this.serializer;
        }

        @Override
        public String id() {
            return this.id;
        }

        @Override
        public Class<T> type() {
            return this.type;
        }

        @Override
        public AggregateRoot<T> previous() {
            return this.previous;
        }

        @Override
        public String lastEventId() {
            return this.lastEventId;
        }

        @Override
        public Long lastEventIndex() {
            return this.lastEventIndex;
        }

        @Override
        public Instant timestamp() {
            return this.timestamp;
        }

        public Status status() {
            return this.status;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RefreshingAggregateRoot)) {
                return false;
            }
            RefreshingAggregateRoot other = (RefreshingAggregateRoot)o;
            Long this$lastEventIndex = this.lastEventIndex();
            Long other$lastEventIndex = other.lastEventIndex();
            if (this$lastEventIndex == null ? other$lastEventIndex != null : !((Object)this$lastEventIndex).equals(other$lastEventIndex)) {
                return false;
            }
            T this$model = this.model();
            T other$model = other.model();
            if (this$model == null ? other$model != null : !this$model.equals(other$model)) {
                return false;
            }
            EventSourcingHandler<T> this$handler = this.handler();
            EventSourcingHandler<T> other$handler = other.handler();
            if (this$handler == null ? other$handler != null : !this$handler.equals(other$handler)) {
                return false;
            }
            Serializer this$serializer = this.serializer();
            Serializer other$serializer = other.serializer();
            if (this$serializer == null ? other$serializer != null : !this$serializer.equals(other$serializer)) {
                return false;
            }
            String this$id = this.id();
            String other$id = other.id();
            if (this$id == null ? other$id != null : !this$id.equals(other$id)) {
                return false;
            }
            Class<T> this$type = this.type();
            Class<T> other$type = other.type();
            if (this$type == null ? other$type != null : !this$type.equals(other$type)) {
                return false;
            }
            AggregateRoot<T> this$previous = this.previous();
            AggregateRoot<T> other$previous = other.previous();
            if (this$previous == null ? other$previous != null : !this$previous.equals(other$previous)) {
                return false;
            }
            String this$lastEventId = this.lastEventId();
            String other$lastEventId = other.lastEventId();
            if (this$lastEventId == null ? other$lastEventId != null : !this$lastEventId.equals(other$lastEventId)) {
                return false;
            }
            Instant this$timestamp = this.timestamp();
            Instant other$timestamp = other.timestamp();
            if (this$timestamp == null ? other$timestamp != null : !((Object)this$timestamp).equals(other$timestamp)) {
                return false;
            }
            Status this$status = this.status();
            Status other$status = other.status();
            return !(this$status == null ? other$status != null : !((Object)((Object)this$status)).equals((Object)other$status));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Long $lastEventIndex = this.lastEventIndex();
            result = result * 59 + ($lastEventIndex == null ? 43 : ((Object)$lastEventIndex).hashCode());
            T $model = this.model();
            result = result * 59 + ($model == null ? 43 : $model.hashCode());
            EventSourcingHandler<T> $handler = this.handler();
            result = result * 59 + ($handler == null ? 43 : $handler.hashCode());
            Serializer $serializer = this.serializer();
            result = result * 59 + ($serializer == null ? 43 : $serializer.hashCode());
            String $id = this.id();
            result = result * 59 + ($id == null ? 43 : $id.hashCode());
            Class<T> $type = this.type();
            result = result * 59 + ($type == null ? 43 : $type.hashCode());
            AggregateRoot<T> $previous = this.previous();
            result = result * 59 + ($previous == null ? 43 : $previous.hashCode());
            String $lastEventId = this.lastEventId();
            result = result * 59 + ($lastEventId == null ? 43 : $lastEventId.hashCode());
            Instant $timestamp = this.timestamp();
            result = result * 59 + ($timestamp == null ? 43 : ((Object)$timestamp).hashCode());
            Status $status = this.status();
            result = result * 59 + ($status == null ? 43 : ((Object)((Object)$status)).hashCode());
            return result;
        }

        public String toString() {
            return "CachingAggregateRepository.RefreshingAggregateRoot(handler=" + this.handler() + ", serializer=" + this.serializer() + ", id=" + this.id() + ", type=" + this.type() + ", lastEventId=" + this.lastEventId() + ", lastEventIndex=" + this.lastEventIndex() + ", timestamp=" + this.timestamp() + ", status=" + this.status() + ")";
        }

        @ConstructorProperties(value={"model", "handler", "serializer", "id", "type", "previous", "lastEventId", "lastEventIndex", "timestamp", "status"})
        public RefreshingAggregateRoot(T model, EventSourcingHandler<T> handler, Serializer serializer, String id, Class<T> type, AggregateRoot<T> previous, String lastEventId, Long lastEventIndex, Instant timestamp, Status status) {
            this.model = model;
            this.handler = handler;
            this.serializer = serializer;
            this.id = id;
            this.type = type;
            this.previous = previous;
            this.lastEventId = lastEventId;
            this.lastEventIndex = lastEventIndex;
            this.timestamp = timestamp;
            this.status = status;
        }

        public static class RefreshingAggregateRootBuilder<T> {
            private T model;
            private EventSourcingHandler<T> handler;
            private Serializer serializer;
            private String id;
            private Class<T> type;
            private AggregateRoot<T> previous;
            private String lastEventId;
            private Long lastEventIndex;
            private Instant timestamp;
            private Status status;

            RefreshingAggregateRootBuilder() {
            }

            public RefreshingAggregateRootBuilder<T> model(T model) {
                this.model = model;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> handler(EventSourcingHandler<T> handler) {
                this.handler = handler;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> serializer(Serializer serializer) {
                this.serializer = serializer;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> id(String id) {
                this.id = id;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> type(Class<T> type) {
                this.type = type;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> previous(AggregateRoot<T> previous) {
                this.previous = previous;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> lastEventId(String lastEventId) {
                this.lastEventId = lastEventId;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> lastEventIndex(Long lastEventIndex) {
                this.lastEventIndex = lastEventIndex;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> timestamp(Instant timestamp) {
                this.timestamp = timestamp;
                return this;
            }

            public RefreshingAggregateRootBuilder<T> status(Status status) {
                this.status = status;
                return this;
            }

            public RefreshingAggregateRoot<T> build() {
                return new RefreshingAggregateRoot<T>(this.model, this.handler, this.serializer, this.id, this.type, this.previous, this.lastEventId, this.lastEventIndex, this.timestamp, this.status);
            }

            public String toString() {
                return "CachingAggregateRepository.RefreshingAggregateRoot.RefreshingAggregateRootBuilder(model=" + this.model + ", handler=" + this.handler + ", serializer=" + this.serializer + ", id=" + this.id + ", type=" + this.type + ", previous=" + this.previous + ", lastEventId=" + this.lastEventId + ", lastEventIndex=" + this.lastEventIndex + ", timestamp=" + this.timestamp + ", status=" + this.status + ")";
            }
        }
    }

    private static enum Status {
        IN_SYNC,
        AHEAD,
        UNVERIFIED;

    }
}

