/*
 * Decompiled with CFR 0.152.
 */
package io.contextmap.spring.runtime.scanner.events.kafka;

import io.contextmap.spring.runtime.model.Event;
import io.contextmap.spring.runtime.model.Scan;
import io.contextmap.spring.runtime.model.ScanApplicationContext;
import io.contextmap.spring.runtime.scanner.AbstractRuntimeScanner;
import io.contextmap.spring.runtime.scanner.events.EventFunctions;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;

public class KafkaScanner
extends AbstractRuntimeScanner {
    private static final Logger logger = LoggerFactory.getLogger(KafkaScanner.class);
    private final ScanApplicationContext context;

    public KafkaScanner(ScanApplicationContext context) {
        this.context = context;
    }

    @Override
    public void scan(Scan data) {
        Set<String> subscribedTopics = this.getSubscribedTopics();
        Set<Event> publishedEvents = this.getPublishedTopics(subscribedTopics);
        data.addPublishedEvents(publishedEvents);
        data.getExecution().setScannedPublishedEvents(true);
        Set<Event> subscribedEvents = subscribedTopics.stream().map(name -> new Event((String)name, (String)name)).collect(Collectors.toSet());
        data.addSubscribedEvents(subscribedEvents);
        data.getExecution().setScannedSubscribedEvents(true);
    }

    private Set<String> getSubscribedTopics() {
        Optional<Object> optionalBean = this.context.getBeanByName("org.springframework.kafka.config.internalKafkaListenerEndpointRegistry");
        if (optionalBean.isPresent()) {
            KafkaListenerEndpointRegistry reg = (KafkaListenerEndpointRegistry)optionalBean.get();
            return reg.getListenerContainers().stream().map(lc -> {
                try {
                    ContainerProperties props = lc.getContainerProperties();
                    if (props.getTopics() != null && props.getTopics().length > 0) {
                        return props.getTopics()[0];
                    }
                    return null;
                }
                catch (Exception e) {
                    return null;
                }
            }).filter(Objects::nonNull).collect(Collectors.toSet());
        }
        return Collections.emptySet();
    }

    private Set<Event> getPublishedTopics(Set<String> subscribedTopics) {
        HashMap<String, Event> publishedEvents = new HashMap<String, Event>();
        this.getPublishedTopicsFromKafkaTemplates(subscribedTopics).forEach(e -> publishedEvents.put(e.getName(), (Event)e));
        this.getPublishedTopicsFromNewTopics(subscribedTopics).forEach(e -> publishedEvents.put(e.getName(), (Event)e));
        Map<Class<?>, Set<EventFunctions.PayloadExchangeProperties>> eventClassToPublishers = EventFunctions.getPayloadProperties(this.context, this::resolveTopicNameFromPublishedByName);
        EventFunctions.addPayloadsToEvents(eventClassToPublishers, publishedEvents);
        this.addPropertiesFromKafkaTemplates(publishedEvents);
        this.addPropertiesFromNewTopics(publishedEvents);
        return new HashSet<Event>(publishedEvents.values());
    }

    private Set<Event> getPublishedTopicsFromNewTopics(Set<String> subscribedTopics) {
        HashSet<Event> events = new HashSet<Event>();
        Map<String, ?> topicBeans = this.context.getBeansOfType("org.apache.kafka.clients.admin.NewTopic");
        if (topicBeans.isEmpty()) {
            return Collections.emptySet();
        }
        topicBeans.values().forEach(ex -> {
            NewTopic newTopic = (NewTopic)ex;
            String topicName = newTopic.name();
            if (topicName != null && !topicName.isEmpty() && !subscribedTopics.contains(topicName)) {
                events.add(new Event(topicName, topicName));
            }
        });
        return events;
    }

    private Set<Event> getPublishedTopicsFromKafkaTemplates(Set<String> subscribedTopics) {
        Map<String, ?> kafkaTemplateBeans = this.context.getBeansOfType("org.springframework.kafka.core.KafkaTemplate");
        if (kafkaTemplateBeans.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet<Event> events = new HashSet<Event>();
        kafkaTemplateBeans.values().forEach(tmpl -> {
            KafkaTemplate kafkaTemplate = (KafkaTemplate)tmpl;
            String topicName = kafkaTemplate.getDefaultTopic();
            if (topicName != null && !topicName.isEmpty() && !subscribedTopics.contains(topicName)) {
                events.add(new Event(topicName, topicName));
            }
        });
        return events;
    }

    private void addPropertiesFromKafkaTemplates(Map<String, Event> publishedEvents) {
        Map<String, ?> kafkaTemplateBeans = this.context.getBeansOfType("org.springframework.kafka.core.KafkaTemplate");
        kafkaTemplateBeans.values().forEach(tmpl -> {
            KafkaTemplate kafkaTemplate = (KafkaTemplate)tmpl;
            String topicName = kafkaTemplate.getDefaultTopic();
            if (topicName != null && publishedEvents.containsKey(topicName) && !topicName.isEmpty()) {
                Event event = (Event)publishedEvents.get(topicName);
                event.addPropertyIfValueNotBlank("Message Broker", "Kafka");
                event.addPropertyIfValueNotBlank("Topic Name", topicName);
            }
        });
    }

    private void addPropertiesFromNewTopics(Map<String, Event> publishedEvents) {
        Map<String, ?> topicBeans = this.context.getBeansOfType("org.apache.kafka.clients.admin.NewTopic");
        topicBeans.values().forEach(ex -> {
            NewTopic newTopic = (NewTopic)ex;
            String topicName = newTopic.name();
            if (topicName != null && publishedEvents.containsKey(topicName) && !topicName.isEmpty()) {
                Event event = (Event)publishedEvents.get(topicName);
                event.addPropertyIfValueNotBlank("Partitions", String.valueOf(newTopic.numPartitions()));
                event.addPropertyIfValueNotBlank("Replication Factor", String.valueOf(newTopic.replicationFactor()));
                event.addPropertyIfValueNotBlank("Message Broker", "Kafka");
                event.addPropertyIfValueNotBlank("Topic Name", topicName);
            }
        });
    }

    private String resolveTopicNameFromPublishedByName(Class<?> eventClass, String publishedByName) {
        Optional<Object> optionalBean = this.context.getBeanByName(publishedByName);
        if (optionalBean.isPresent()) {
            Object bean = optionalBean.get();
            if (bean instanceof NewTopic) {
                return ((NewTopic)bean).name();
            }
            if (bean instanceof KafkaTemplate) {
                String name = ((KafkaTemplate)bean).getDefaultTopic();
                if (name == null || name.isEmpty()) {
                    logger.warn("Unable to extract topic name from KafkaTemplate, since the default topic property is blank.");
                }
                return name;
            }
            logger.warn("Unable to extract topic name of ContextEvent {} since the bean is not a KafkaTemplate or NewTopic, but it's of type {}", eventClass, bean.getClass());
            return "";
        }
        return this.context.resolveSpELStringValue(publishedByName);
    }
}

