/*
 * Decompiled with CFR 0.152.
 */
package com.uber.cherami.example;

import com.uber.cherami.client.CheramiConsumer;
import com.uber.cherami.client.CheramiDelivery;
import com.uber.cherami.client.CheramiPublisher;
import com.uber.cherami.client.CreateConsumerRequest;
import com.uber.cherami.client.CreatePublisherRequest;
import com.uber.cherami.client.PublisherMessage;
import com.uber.cherami.client.SendReceipt;
import com.uber.cherami.example.AppData;
import com.uber.cherami.example.Config;
import com.uber.cherami.example.Context;
import com.uber.cherami.example.Daemon;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Async {
    private static final int MAX_INFLIGHT_MESSAGES = 4096;
    private static final int MAX_PUBLISH_ATTEMPTS = 16;

    private static class InFlightMsgState {
        public int attempts;
        public final long id;
        public final long sendTime;
        public final Future<SendReceipt> future;

        InFlightMsgState(long id, Future<SendReceipt> future, int attempts) {
            this.id = id;
            this.future = future;
            this.attempts = attempts;
            this.sendTime = System.currentTimeMillis();
        }
    }

    public static class Consumer
    implements Runnable,
    Daemon {
        private static final long READ_TIMEOUT_MILLIS = 500L;
        private final String name;
        private final Context context;
        private final CountDownLatch quitter = new CountDownLatch(1);
        private final CountDownLatch stopped = new CountDownLatch(1);

        public Consumer(String name, Context context) {
            this.name = name;
            this.context = context;
        }

        @Override
        public void start() {
            new Thread(this).start();
        }

        @Override
        public void stop() {
            this.quitter.countDown();
            try {
                if (!this.stopped.await(1L, TimeUnit.SECONDS)) {
                    System.out.println(this.name + ": shutdown timed out");
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block13: {
                CheramiDelivery delivery;
                CheramiConsumer consumer = null;
                try {
                    System.out.println(this.name + " started");
                    Config config = this.context.config;
                    CreateConsumerRequest.Builder builder = new CreateConsumerRequest.Builder(config.destinationPath, config.consumergroupName);
                    CreateConsumerRequest request = builder.setPrefetchCount(4096).build();
                    consumer = this.context.client.createConsumer(request);
                    consumer.open();
                    while (this.quitter.getCount() > 0L) {
                        long startTime = System.currentTimeMillis();
                        Future future = consumer.readAsync();
                        delivery = null;
                        while (true) {
                            try {
                                delivery = (CheramiDelivery)future.get(500L, TimeUnit.MILLISECONDS);
                            }
                            catch (TimeoutException e) {
                                if (this.quitter.getCount() > 0L) continue;
                                if (consumer != null) {
                                    consumer.close();
                                }
                                System.out.println(this.name + " stopped");
                                this.stopped.countDown();
                                return;
                                continue;
                            }
                            break;
                        }
                        this.context.stats.readLatency.add(System.currentTimeMillis() - startTime);
                    }
                    break block13;
                }
                catch (Throwable e) {
                    System.out.println(this.name + " caught unexpected exception:" + e);
                    break block13;
                }
                finally {
                    if (consumer != null) {
                        consumer.close();
                    }
                    System.out.println(this.name + " stopped");
                    this.stopped.countDown();
                }
                {
                    AppData appData = AppData.deserialize(delivery.getMessage().getPayload().getData());
                    if (this.context.consumedMsgIds.putIfAbsent(appData.id, true) != null) {
                        this.context.stats.messagesInDupCount.incrementAndGet();
                    }
                    delivery.ack();
                    this.context.stats.messagesInCount.incrementAndGet();
                    this.context.stats.bytesInCount.addAndGet(delivery.getMessage().getPayload().getData().length);
                    continue;
                    break;
                }
            }
        }
    }

    public static class Publisher
    implements Runnable,
    Daemon {
        private final String name;
        private final Context context;
        private final CountDownLatch quitter = new CountDownLatch(1);
        private final CountDownLatch stopped = new CountDownLatch(1);

        public Publisher(String name, Context context) {
            this.name = name;
            this.context = context;
        }

        @Override
        public void start() {
            new Thread(this).start();
        }

        @Override
        public void stop() {
            this.quitter.countDown();
            try {
                if (!this.stopped.await(1L, TimeUnit.SECONDS)) {
                    System.out.println(this.name + ": shutdown timed out");
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            CheramiPublisher publisher = null;
            try {
                System.out.println(this.name + " started");
                Config config = this.context.config;
                CreatePublisherRequest.Builder builder = new CreatePublisherRequest.Builder(config.destinationPath);
                publisher = this.context.client.createPublisher(builder.build());
                publisher.open();
                Random rand = new Random(System.currentTimeMillis());
                int remaining = config.nMessagesToSend;
                LinkedList<InFlightMsgState> inflight = new LinkedList<InFlightMsgState>();
                while (inflight.size() > 0 || remaining > 0) {
                    int attempts = 0;
                    long retryMsgId = 0L;
                    if (inflight.size() >= 4096 || remaining == 0) {
                        InFlightMsgState state = (InFlightMsgState)inflight.poll();
                        SendReceipt receipt = state.future.get();
                        this.context.stats.writeLatency.add(System.currentTimeMillis() - state.sendTime);
                        if (receipt.getStatus() != SendReceipt.ReceiptStatus.OK) {
                            this.context.stats.messagesOutErrCount.incrementAndGet();
                            if (state.attempts >= 16) {
                                System.out.println("Max attempts exceeded at sending a message");
                                return;
                            }
                            if (receipt.getStatus() == SendReceipt.ReceiptStatus.ERR_THROTTLED) {
                                Thread.sleep(1L);
                                this.context.stats.messagesOutThrottledCount.incrementAndGet();
                            }
                            attempts = state.attempts;
                            retryMsgId = state.id;
                        }
                    }
                    if (retryMsgId == 0L && remaining == 0) continue;
                    long msgId = retryMsgId != 0L ? retryMsgId : this.context.msgIdCounter.incrementAndGet();
                    PublisherMessage message = this.createPubMessage(msgId, rand);
                    InFlightMsgState state = new InFlightMsgState(msgId, publisher.writeAsync(message), attempts + 1);
                    inflight.add(state);
                    this.context.stats.messagesOutCount.incrementAndGet();
                    this.context.stats.bytesOutCount.addAndGet(message.getData().length);
                    int n = remaining = retryMsgId == 0L ? remaining - 1 : remaining;
                    if (this.quitter.getCount() > 0L) continue;
                    break;
                }
            }
            catch (Throwable t) {
                System.out.println(this.name + " caught unexpected exception:" + t);
            }
            finally {
                if (publisher != null) {
                    publisher.close();
                }
                System.out.println(this.name + " stopped");
                this.stopped.countDown();
            }
        }

        private PublisherMessage createPubMessage(long id, Random random) {
            byte[] payload = new byte[this.context.config.messageSize];
            random.nextBytes(payload);
            AppData data = new AppData(id, payload);
            return new PublisherMessage(data.serialize());
        }
    }
}

