/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.scheduling;

import io.fluxcapacitor.common.IndexUtils;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.reflection.ReflectionUtils;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.publishing.DispatchInterceptor;
import io.fluxcapacitor.javaclient.scheduling.Periodic;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.tracking.handling.HandleSchedule;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerInterceptor;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.User;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulingInterceptor
implements DispatchInterceptor,
HandlerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(SchedulingInterceptor.class);

    @Override
    public Handler<DeserializingMessage> wrap(Handler<DeserializingMessage> handler, String consumer) {
        Object target = handler.getTarget();
        List methods = ReflectionUtils.getAnnotatedMethods((Object)target, HandleSchedule.class);
        for (Method method : methods) {
            Periodic periodic = method.getAnnotation(Periodic.class);
            if (method.getParameterCount() <= 0) continue;
            Class<?> type = method.getParameters()[0].getType();
            if (periodic == null) {
                periodic = (Periodic)ReflectionUtils.getTypeAnnotation(type, Periodic.class);
            }
            if (periodic == null) continue;
            try {
                this.initializePeriodicSchedule(type, periodic);
            }
            catch (Exception e) {
                log.error("Failed to initialize periodic schedule on method {}. Continuing...", (Object)method, (Object)e);
            }
        }
        return HandlerInterceptor.super.wrap(handler, consumer);
    }

    protected void initializePeriodicSchedule(Class<?> payloadType, Periodic periodic) {
        if (periodic.value() <= 0L) {
            throw new IllegalStateException(String.format("Periodic annotation on type %s is invalid. Period should be a positive number of  milliseconds.", payloadType));
        }
        if (periodic.autoStart()) {
            String scheduleId = periodic.scheduleId().isEmpty() ? payloadType.getName() : periodic.scheduleId();
            FluxCapacitor fluxCapacitor = FluxCapacitor.get();
            if (fluxCapacitor.keyValueStore().storeIfAbsent("SchedulingInterceptor:initialized:" + scheduleId, true)) {
                Object payload;
                try {
                    payload = ((Constructor)ReflectionUtils.ensureAccessible(payloadType.getConstructor(new Class[0]))).newInstance(new Object[0]);
                }
                catch (Exception e) {
                    log.error("No default constructor found on @Periodic type: {}. Add a public default constructor or initialize this periodic schedule by hand", payloadType, (Object)e);
                    return;
                }
                Clock clock = fluxCapacitor.clock();
                Metadata metadata = Optional.ofNullable(fluxCapacitor.userProvider()).flatMap(p -> Optional.ofNullable(p.getSystemUser()).map(u -> p.addToMetadata(Metadata.empty(), (User)u))).orElse(Metadata.empty());
                fluxCapacitor.scheduler().schedule(new Schedule(payload, metadata, scheduleId, clock.instant().plusMillis(periodic.initialDelay())));
            }
        }
    }

    @Override
    public Function<Message, SerializedMessage> interceptDispatch(Function<Message, SerializedMessage> function, MessageType messageType) {
        return message -> {
            if (messageType == MessageType.SCHEDULE) {
                message = message.withMetadata(message.getMetadata().with(Schedule.scheduleIdMetadataKey, (Object)((Schedule)message).getScheduleId()));
            }
            return (SerializedMessage)function.apply((Message)message);
        };
    }

    @Override
    public Function<DeserializingMessage, Object> interceptHandling(Function<DeserializingMessage, Object> function, Handler<DeserializingMessage> handler, String consumer) {
        return m -> {
            if (m.getMessageType() == MessageType.SCHEDULE) {
                Object result;
                long deadline = IndexUtils.millisFromIndex((long)m.getSerializedObject().getIndex());
                Periodic periodic = Optional.ofNullable(handler.getMethod(m)).map(method -> method.getAnnotation(Periodic.class)).orElse((Periodic)ReflectionUtils.getTypeAnnotation(m.getPayloadClass(), Periodic.class));
                Instant now = Instant.ofEpochMilli(deadline);
                try {
                    result = function.apply((DeserializingMessage)m);
                }
                catch (Exception e) {
                    if (periodic != null && periodic.continueOnError()) {
                        this.reschedule((DeserializingMessage)m, now.plusMillis(periodic.value()));
                    }
                    throw e;
                }
                if (result instanceof TemporalAmount) {
                    this.reschedule((DeserializingMessage)m, now.plus((TemporalAmount)result));
                } else if (result instanceof TemporalAccessor) {
                    this.reschedule((DeserializingMessage)m, Instant.from((TemporalAccessor)result));
                } else if (result instanceof Schedule) {
                    Schedule schedule = (Schedule)result;
                    this.reschedule(schedule.getPayload(), schedule.getMetadata(), schedule.getDeadline());
                } else if (result != null) {
                    Metadata metadata = m.getMetadata();
                    Object nextPayload = result;
                    if (result instanceof Message) {
                        metadata = ((Message)result).getMetadata();
                        nextPayload = ((Message)result).getPayload();
                    }
                    if (nextPayload != null && m.getPayloadClass().isAssignableFrom(nextPayload.getClass())) {
                        if (periodic == null) {
                            Instant dispatched = Instant.ofEpochMilli(m.getSerializedObject().getTimestamp());
                            Duration previousDelay = Duration.between(dispatched, now);
                            if (previousDelay.compareTo(Duration.ZERO) > 0) {
                                this.reschedule(nextPayload, metadata, now.plus(previousDelay));
                            } else {
                                log.info("Delay between the time this schedule was created and scheduled is <= 0, rescheduling with delay of 1 minute");
                                this.reschedule(nextPayload, metadata, now.plus(Duration.of(1L, ChronoUnit.MINUTES)));
                            }
                        } else {
                            this.reschedule(nextPayload, metadata, now.plusMillis(periodic.value()));
                        }
                    } else if (periodic != null) {
                        this.reschedule((DeserializingMessage)m, now.plusMillis(periodic.value()));
                    }
                } else if (periodic != null) {
                    this.reschedule((DeserializingMessage)m, now.plusMillis(periodic.value()));
                }
                return result;
            }
            return function.apply((DeserializingMessage)m);
        };
    }

    private void reschedule(DeserializingMessage message, Instant instant) {
        this.reschedule(message.getPayload(), message.getMetadata(), instant);
    }

    private void reschedule(Object payload, Metadata metadata, Instant instant) {
        try {
            FluxCapacitor.get().scheduler().schedule(new Schedule(payload, metadata.getOrDefault(Schedule.scheduleIdMetadataKey, UUID.randomUUID().toString()), instant));
        }
        catch (Exception e) {
            log.error("Failed to reschedule a {}", payload.getClass(), (Object)e);
        }
    }
}

