/*
 * Decompiled with CFR 0.152.
 */
package net.lightapi.portal.user.command.handler;

import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.httpstring.AttachmentConstants;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.rpc.HybridHandler;
import com.networknt.rpc.router.ServiceHandler;
import com.networknt.utility.NioUtils;
import io.undertow.server.HttpServerExchange;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ServiceHandler(id="lightapi.net/user/exportPortalEvent/0.1.0")
public class ExportPortalEvent
implements HybridHandler {
    private static final Logger logger = LoggerFactory.getLogger(ExportPortalEvent.class);
    protected static final String INCORRECT_TOKEN_TYPE = "ERR11601";
    protected static final String KAFKA_ACCESS_ERROR = "ERR11643";
    protected static final String INVALID_VARIABLE_FORMAT = "ERR11644";
    public static final KafkaConsumerConfig config = (KafkaConsumerConfig)Config.getInstance().getJsonObjectConfig("kafka-consumer", KafkaConsumerConfig.class);
    private static final Class<?> keyDeserializer = ByteArrayDeserializer.class;
    private static final Class<?> valueDeserializer = ByteArrayDeserializer.class;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ByteBuffer handle(HttpServerExchange exchange, Object input) {
        long endTimestampMs;
        long startTimestampMs;
        Map auditInfo;
        String userId;
        Map map = (Map)input;
        String hostId = (String)map.get("hostId");
        String startTsStr = (String)map.get("startTs");
        String endTsStr = (String)map.get("endTs");
        String portalServices = (String)map.get("portalServices");
        if (logger.isTraceEnabled()) {
            logger.trace("hostId = {}, startTs = {}, endTs = {}, portalServices = {}", new Object[]{hostId, startTsStr, endTsStr, portalServices});
        }
        if ((userId = (String)(auditInfo = (Map)exchange.getAttachment(AttachmentConstants.AUDIT_INFO)).get("user_id")) == null) {
            logger.error("Incorrect token type: userId is null. Must be Authorization Code Token.");
            return NioUtils.toByteBuffer((String)this.getStatus(exchange, INCORRECT_TOKEN_TYPE, new Object[]{"Authorization Code Token"}));
        }
        try {
            startTimestampMs = OffsetDateTime.parse(startTsStr).toInstant().toEpochMilli();
            endTimestampMs = OffsetDateTime.parse(endTsStr).toInstant().toEpochMilli();
        }
        catch (Exception e) {
            logger.error("Invalid timestamp format. startTs={}, endTs={}", new Object[]{startTsStr, endTsStr, e});
            return NioUtils.toByteBuffer((String)this.getStatus(exchange, INVALID_VARIABLE_FORMAT, new Object[]{"startTs or endTs", startTsStr + " or " + endTsStr}));
        }
        Properties props = new Properties();
        props.putAll((Map<?, ?>)config.getProperties());
        props.put("key.deserializer", keyDeserializer);
        props.put("value.deserializer", valueDeserializer);
        props.put("group.id", "export-group-" + userId + "-" + System.currentTimeMillis());
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "none");
        StringBuilder resultBuilder = new StringBuilder();
        KafkaConsumer consumer = null;
        if (logger.isTraceEnabled()) {
            logger.trace("props = {}", (Object)JsonMapper.toJson((Object)props));
        }
        try {
            List partitionInfoList;
            consumer = new KafkaConsumer(props);
            if (logger.isTraceEnabled()) {
                logger.trace("Kafka consumer created.");
            }
            if ((partitionInfoList = consumer.partitionsFor(config.getTopic())) == null || partitionInfoList.isEmpty()) {
                logger.warn("No partitions found for topic: {}", (Object)config.getTopic());
                ByteBuffer byteBuffer = NioUtils.toByteBuffer((String)"");
                return byteBuffer;
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Partitions for topic {}: {}", (Object)config.getTopic(), (Object)partitionInfoList);
            }
            List topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(config.getTopic(), pi.partition())).collect(Collectors.toList());
            if (logger.isTraceEnabled()) {
                logger.trace("Topic partitions: {}", topicPartitions);
            }
            Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> startTimestampMs));
            Map startingOffsets = consumer.offsetsForTimes(timestampsToSearch);
            if (logger.isTraceEnabled()) {
                logger.trace("Starting offsets for timestamps: {}", (Object)startingOffsets);
            }
            consumer.assign(topicPartitions);
            KafkaConsumer finalConsumer = consumer;
            if (logger.isTraceEnabled()) {
                logger.trace("Assigned partitions: {}", topicPartitions);
            }
            startingOffsets.forEach((tp, offsetAndTimestamp) -> {
                if (offsetAndTimestamp != null) {
                    logger.debug("Seeking partition {} to offset {}", (Object)tp.partition(), (Object)offsetAndTimestamp.offset());
                    finalConsumer.seek(tp, offsetAndTimestamp.offset());
                } else {
                    logger.debug("No offset found for partition {} >= startTs {}, seeking to end.", (Object)tp.partition(), (Object)startTsStr);
                    finalConsumer.seekToEnd(Collections.singletonList(tp));
                }
            });
            int giveUp = 5;
            int noRecordsCount = 0;
            boolean continuePolling = true;
            while (continuePolling) {
                ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000L));
                if (consumerRecords.isEmpty()) {
                    if (++noRecordsCount < 5) continue;
                    logger.info("No more records found after {} polls, stopping.", (Object)5);
                    continuePolling = false;
                    continue;
                }
                noRecordsCount = 0;
                block9: for (TopicPartition partition : consumerRecords.partitions()) {
                    List recordsForPartition = consumerRecords.records(partition);
                    for (ConsumerRecord record : recordsForPartition) {
                        if (record.timestamp() <= endTimestampMs) {
                            String keyStr = new String((byte[])record.key(), StandardCharsets.UTF_8);
                            String valueStr = new String((byte[])record.value(), StandardCharsets.UTF_8);
                            resultBuilder.append("key=").append(keyStr).append(" value=").append(valueStr).append("\n");
                            continue;
                        }
                        logger.debug("Record timestamp {} exceeds endTs {} for partition {}, stopping processing for this partition in this poll.", new Object[]{record.timestamp(), endTimestampMs, partition.partition()});
                        continue block9;
                    }
                }
            }
        }
        catch (Exception e) {
            logger.error("Error during Kafka export processing", (Throwable)e);
            ByteBuffer byteBuffer = NioUtils.toByteBuffer((String)this.getStatus(exchange, KAFKA_ACCESS_ERROR, new Object[]{"Kafka processing error: " + e.getMessage()}));
            return byteBuffer;
        }
        finally {
            if (consumer != null) {
                consumer.close();
                logger.debug("Kafka consumer closed.");
            }
        }
        String resultString = resultBuilder.toString();
        if (logger.isTraceEnabled()) {
            logger.trace("Export result length = {}", (Object)resultString.length());
        }
        return NioUtils.toByteBuffer((String)resultString);
    }

    private static class ExportConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private ExportConsumerRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Called onPartitionsRevoked with partitions:" + String.valueOf(partitions));
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Called onPartitionsAssigned with partitions:" + String.valueOf(partitions));
        }
    }
}

