/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;

public class KafkaMessageNewReceiver<K, V>
implements KafkaMessageReceiver<K, V> {
    protected final AtomicReference<KafkaConsumer<K, V>> consumer = new AtomicReference();

    public KafkaMessageNewReceiver(Properties props) {
        this.consumer.set(new KafkaConsumer(props));
    }

    @Override
    public synchronized List<V> receive(String topic, int partition, long beginOffset, long readOffset) {
        long latestOffset;
        if (readOffset <= 0L) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        long earliestOffset = this.getEarliestOffset(topic, partition);
        if (beginOffset < earliestOffset) {
            beginOffset = earliestOffset;
        }
        if (beginOffset + readOffset > (latestOffset = this.getLatestOffset(topic, partition))) {
            readOffset = latestOffset - beginOffset;
        }
        ArrayList<Object> list = new ArrayList<Object>();
        KafkaConsumer<K, V> kafkaConsumer = this.consumer.get();
        kafkaConsumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
        kafkaConsumer.seek(new TopicPartition(topic, partition), beginOffset);
        boolean flag = true;
        block0: while (flag) {
            ConsumerRecords records = kafkaConsumer.poll(0L);
            for (ConsumerRecord record : records) {
                long currentOffset = record.offset();
                if (currentOffset == latestOffset - 1L || currentOffset > beginOffset + readOffset - 1L) {
                    flag = false;
                    continue block0;
                }
                list.add(record.value());
            }
        }
        return list;
    }

    @Override
    public synchronized Map<K, V> receiveWithKey(String topic, int partition, long beginOffset, long readOffset) {
        long latestOffset;
        if (readOffset <= 0L) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        long earliestOffset = this.getEarliestOffset(topic, partition);
        if (beginOffset < earliestOffset) {
            beginOffset = earliestOffset;
        }
        if (beginOffset + readOffset > (latestOffset = this.getLatestOffset(topic, partition))) {
            readOffset = latestOffset - beginOffset;
        }
        HashMap<Object, Object> map = new HashMap<Object, Object>();
        KafkaConsumer<K, V> kafkaConsumer = this.consumer.get();
        kafkaConsumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
        kafkaConsumer.seek(new TopicPartition(topic, partition), beginOffset);
        boolean flag = true;
        block0: while (flag) {
            ConsumerRecords records = kafkaConsumer.poll(0L);
            for (ConsumerRecord record : records) {
                long currentOffset = record.offset();
                if (currentOffset == latestOffset - 1L || currentOffset > beginOffset + readOffset - 1L) {
                    flag = false;
                    continue block0;
                }
                map.put(record.key(), record.value());
            }
        }
        return map;
    }

    @Override
    public synchronized long getLatestOffset(String topic, int partition) {
        KafkaConsumer<K, V> kafkaConsumer = this.consumer.get();
        kafkaConsumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
        kafkaConsumer.seekToEnd(new TopicPartition[]{new TopicPartition(topic, partition)});
        long latestOffset = kafkaConsumer.position(new TopicPartition(topic, partition));
        return latestOffset;
    }

    @Override
    public synchronized long getEarliestOffset(String topic, int partition) {
        KafkaConsumer<K, V> kafkaConsumer = this.consumer.get();
        kafkaConsumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
        kafkaConsumer.seekToBeginning(new TopicPartition[]{new TopicPartition(topic, partition)});
        long earliestOffset = kafkaConsumer.position(new TopicPartition(topic, partition));
        return earliestOffset;
    }

    @Override
    public int getPartitionCount(String topic) {
        KafkaConsumer<K, V> kafkaConsumer = this.consumer.get();
        return kafkaConsumer.partitionsFor(topic).size();
    }

    @Override
    public synchronized void shutDown() {
        if (this.consumer.get() != null) {
            this.consumer.get().close();
            this.consumer.set(null);
        }
    }
}

