/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.core.KafkaMessageNewReceiver;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;
import org.darkphoenixs.kafka.pool.KafkaPoolThreadFactory;
import org.darkphoenixs.kafka.pool.MessageReceiverPool;
import org.darkphoenixs.mq.exception.MQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

public class KafkaMessageNewReceiverPool<K, V>
implements MessageReceiverPool<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageNewReceiverPool.class);
    protected ExecutorService receivPool;
    protected ExecutorService handlePool;
    protected List<ReceiverThread> threads = new ArrayList<ReceiverThread>();
    private MODEL model = MODEL.MODEL_1;
    private COMMIT commit = COMMIT.AUTO_COMMIT;
    private BATCH batch = BATCH.NON_BATCH;
    private Properties props = new Properties();
    private Resource config;
    private int poolSize;
    private int handleMultiple = 2;
    private int retryCount = 3;
    private KafkaMessageAdapter<?, ?> messageAdapter;

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties props) {
        this.props = props;
    }

    public int getHandleMultiple() {
        return this.handleMultiple;
    }

    public void setHandleMultiple(int handleMultiple) {
        this.handleMultiple = handleMultiple;
    }

    public int getRetryCount() {
        return this.retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public Resource getConfig() {
        return this.config;
    }

    public void setConfig(Resource config) {
        this.config = config;
        try {
            PropertiesLoaderUtils.fillProperties((Properties)this.props, (Resource)this.config);
        }
        catch (IOException e) {
            logger.error(e.getMessage());
        }
    }

    public String getModel() {
        return this.model.name();
    }

    public void setModel(String model) {
        this.model = MODEL.valueOf(model);
    }

    public String getBatch() {
        return this.batch.name();
    }

    public void setBatch(String batch) {
        this.batch = BATCH.valueOf(batch);
    }

    public String getCommit() {
        return this.commit.name();
    }

    public void setCommit(String commit) {
        this.commit = COMMIT.valueOf(commit);
        if (!commit.equals((Object)COMMIT.AUTO_COMMIT)) {
            this.props.setProperty("enable.auto.commit", "false");
        }
    }

    public KafkaMessageAdapter<?, ?> getMessageAdapter() {
        return this.messageAdapter;
    }

    public void setMessageAdapter(KafkaMessageAdapter<?, ?> messageAdapter) {
        this.messageAdapter = messageAdapter;
    }

    public String getClientId() {
        return this.props.getProperty("client.id", "client_new_consumer");
    }

    public String getGroupId() {
        return this.props.getProperty("group.id", "group_new_consumer");
    }

    @Override
    public KafkaMessageReceiver<K, V> getReceiver() {
        Properties properties = (Properties)this.props.clone();
        properties.setProperty("group.id", "group_new_consumer");
        properties.setProperty("client.id", "client_new_consumer");
        return new KafkaMessageNewReceiver(properties);
    }

    @Override
    public void returnReceiver(KafkaMessageReceiver<K, V> receiver) {
        if (receiver != null) {
            receiver.shutDown();
        }
    }

    @Override
    public synchronized void init() {
        String topic = this.messageAdapter.getDestination().getDestinationName();
        KafkaMessageReceiver<K, V> receiver = this.getReceiver();
        int partSize = receiver.getPartitionCount(topic);
        if (this.poolSize == 0 || this.poolSize > partSize) {
            this.setPoolSize(partSize);
        }
        this.returnReceiver(receiver);
        switch (this.model) {
            case MODEL_1: {
                this.receivPool = Executors.newFixedThreadPool(this.poolSize, new KafkaPoolThreadFactory("ReceiverThread-" + topic));
                logger.info("Message Receiver Pool initializing. poolSize : " + this.poolSize);
                break;
            }
            case MODEL_2: {
                int handSize = this.poolSize * this.handleMultiple + 1;
                this.receivPool = Executors.newFixedThreadPool(this.poolSize, new KafkaPoolThreadFactory("ReceiverThread-" + topic));
                this.handlePool = Executors.newFixedThreadPool(handSize, new KafkaPoolThreadFactory("HandlerThread-" + topic));
                logger.info("Message Receiver Pool initializing poolSize : " + this.poolSize);
                logger.info("Message Handler Pool initializing poolSize : " + handSize);
            }
        }
        for (int i = 0; i < this.poolSize; ++i) {
            Properties properties = (Properties)this.props.clone();
            properties.setProperty("client.id", this.getClientId() + "-" + topic + "-" + i);
            ReceiverThread receiverThread = new ReceiverThread(properties, topic, this.messageAdapter);
            this.threads.add(receiverThread);
            this.receivPool.submit(receiverThread);
        }
    }

    @Override
    public synchronized void destroy() {
        if (this.handlePool != null) {
            this.handlePool.shutdown();
        }
        for (ReceiverThread thread : this.threads) {
            thread.shutdown();
        }
        if (this.receivPool != null) {
            this.receivPool.shutdown();
        }
    }

    private void commit(KafkaConsumer<K, V> consumer, ConsumerRecord<K, V> record, COMMIT commit) {
        switch (commit) {
            case SYNC_COMMIT: {
                consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)));
                break;
            }
            case ASYNC_COMMIT: {
                consumer.commitAsync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)), (OffsetCommitCallback)new ConsumerCoordinator.DefaultOffsetCommitCallback());
            }
        }
    }

    private void batchCommit(KafkaConsumer<K, V> consumer, COMMIT commit) {
        switch (commit) {
            case SYNC_COMMIT: {
                consumer.commitSync();
                break;
            }
            case ASYNC_COMMIT: {
                consumer.commitAsync();
            }
        }
    }

    class HandlerThread
    implements Runnable {
        public static final String tagger = "HandlerThread";
        private final KafkaMessageAdapter<?, ?> adapter;
        private final ConsumerRecords<K, V> records;

        public HandlerThread(KafkaMessageAdapter<?, ?> adapter, ConsumerRecords<K, V> records) {
            this.adapter = adapter;
            this.records = records;
        }

        @Override
        public void run() {
            logger.info(Thread.currentThread().getName() + " start.");
            switch (KafkaMessageNewReceiverPool.this.batch) {
                case BATCH: {
                    try {
                        this.adapter.messageAdapter(this.records);
                    }
                    catch (MQException e) {
                        logger.error(Thread.currentThread().getName() + " failNumber: " + this.records.count() + " Exception: " + e.getMessage());
                    }
                    break;
                }
                case NON_BATCH: {
                    for (ConsumerRecord record : this.records) {
                        try {
                            this.adapter.messageAdapter(record);
                        }
                        catch (MQException e) {
                            logger.error(Thread.currentThread().getName() + " topic: " + record.topic() + " offset: " + record.offset() + " partition: " + record.partition() + " Exception: " + e.getMessage());
                        }
                    }
                    break;
                }
            }
            logger.info(Thread.currentThread().getName() + " end.");
        }
    }

    class ReceiverThread
    implements Runnable {
        public static final String tagger = "ReceiverThread";
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<K, V> consumer;
        private final KafkaMessageAdapter<?, ?> adapter;
        private final String topic;

        public ReceiverThread(Properties props, String topic, KafkaMessageAdapter<?, ?> adapter) {
            this.topic = topic;
            this.adapter = adapter;
            this.consumer = new KafkaConsumer(props);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            logger.info(Thread.currentThread().getName() + " start.");
            try {
                this.consumer.subscribe(Arrays.asList(this.topic));
                int failCount = 0;
                block23: while (!this.closed.get()) {
                    ConsumerRecords records = this.consumer.poll(Long.MAX_VALUE);
                    switch (KafkaMessageNewReceiverPool.this.model) {
                        case MODEL_1: {
                            switch (KafkaMessageNewReceiverPool.this.batch) {
                                case BATCH: {
                                    try {
                                        this.adapter.messageAdapter(records);
                                        continue block23;
                                    }
                                    catch (MQException e) {
                                        logger.error(Thread.currentThread().getName() + " failNumber: " + records.count() + " Exception: " + e.getMessage());
                                        continue block23;
                                    }
                                    finally {
                                        KafkaMessageNewReceiverPool.this.batchCommit(this.consumer, KafkaMessageNewReceiverPool.this.commit);
                                        continue block23;
                                    }
                                }
                                case NON_BATCH: {
                                    for (ConsumerRecord record : records) {
                                        try {
                                            this.adapter.messageAdapter(record);
                                        }
                                        catch (MQException e) {
                                            logger.error(Thread.currentThread().getName() + " failCount: " + ++failCount + " topic: " + record.topic() + " offset: " + record.offset() + " partition: " + record.partition() + " Exception: " + e.getMessage());
                                        }
                                        finally {
                                            if (failCount != 0 && failCount <= KafkaMessageNewReceiverPool.this.retryCount) continue;
                                            failCount = 0;
                                            KafkaMessageNewReceiverPool.this.commit(this.consumer, record, KafkaMessageNewReceiverPool.this.commit);
                                        }
                                    }
                                    break;
                                }
                            }
                            break;
                        }
                        case MODEL_2: {
                            KafkaMessageNewReceiverPool.this.handlePool.execute(new HandlerThread(this.adapter, records));
                            KafkaMessageNewReceiverPool.this.batchCommit(this.consumer, KafkaMessageNewReceiverPool.this.commit);
                        }
                    }
                }
            }
            catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
            }
            finally {
                this.consumer.close();
            }
            logger.info(Thread.currentThread().getName() + " end.");
        }

        public void shutdown() {
            this.closed.set(true);
            this.consumer.wakeup();
        }
    }

    public static enum COMMIT {
        AUTO_COMMIT,
        SYNC_COMMIT,
        ASYNC_COMMIT;

    }

    public static enum BATCH {
        NON_BATCH,
        BATCH;

    }

    public static enum MODEL {
        MODEL_1,
        MODEL_2;

    }
}

