/*
 * Decompiled with CFR 0.152.
 */
package com.uber.cherami.example;

import com.uber.cherami.client.CheramiConsumer;
import com.uber.cherami.client.CheramiDelivery;
import com.uber.cherami.client.CheramiPublisher;
import com.uber.cherami.client.CreateConsumerRequest;
import com.uber.cherami.client.CreatePublisherRequest;
import com.uber.cherami.client.PublisherMessage;
import com.uber.cherami.client.SendReceipt;
import com.uber.cherami.example.AppData;
import com.uber.cherami.example.Config;
import com.uber.cherami.example.Context;
import com.uber.cherami.example.Daemon;
import com.uber.cherami.example.Stats;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Sync {

    public static class Consumer
    implements Runnable,
    Daemon {
        private static final int PREFETCH_COUNT = 256;
        private final String name;
        private final Context context;
        private final CountDownLatch quitter = new CountDownLatch(1);
        private final CountDownLatch stopped = new CountDownLatch(1);
        private CheramiConsumer consumer;

        public Consumer(String name, Context context) {
            this.name = name;
            this.context = context;
        }

        @Override
        public void start() {
            new Thread(this).start();
        }

        @Override
        public void stop() {
            this.quitter.countDown();
            if (this.consumer != null) {
                this.consumer.close();
            }
            try {
                if (!this.stopped.await(1L, TimeUnit.SECONDS)) {
                    System.out.println(this.name + ": shutdown timed out");
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                System.out.println(this.name + " started");
                Config config = this.context.config;
                CreateConsumerRequest.Builder builder = new CreateConsumerRequest.Builder(config.destinationPath, config.consumergroupName);
                CreateConsumerRequest request = builder.setPrefetchCount(256).build();
                this.consumer = this.context.client.createConsumer(request);
                this.consumer.open();
                Stats.Profiler readLatencyProfiler = new Stats.Profiler();
                while (this.quitter.getCount() > 0L) {
                    CheramiDelivery delivery = null;
                    try {
                        readLatencyProfiler.start();
                        delivery = this.consumer.read();
                        this.context.stats.readLatency.add(readLatencyProfiler.elapsed());
                    }
                    catch (InterruptedException e) {
                        continue;
                    }
                    byte[] data = delivery.getMessage().getPayload().getData();
                    AppData appData = AppData.deserialize(data);
                    if (this.context.consumedMsgIds.putIfAbsent(appData.id, true) != null) {
                        this.context.stats.messagesInDupCount.incrementAndGet();
                    }
                    delivery.ack();
                    this.context.stats.messagesInCount.incrementAndGet();
                    this.context.stats.bytesInCount.addAndGet(data.length);
                }
            }
            catch (Throwable e) {
                System.out.println(this.name + " caught unexpected exception:" + e);
            }
            finally {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                System.out.println(this.name + " stopped");
                this.stopped.countDown();
            }
        }
    }

    public static class Publisher
    implements Runnable,
    Daemon {
        private final String name;
        private final Context context;
        private final Random random = new Random();
        private final CountDownLatch quitter = new CountDownLatch(1);
        private final CountDownLatch stopped = new CountDownLatch(1);

        public Publisher(String name, Context context) {
            this.name = name;
            this.context = context;
        }

        @Override
        public void start() {
            new Thread(this).start();
        }

        @Override
        public void stop() {
            this.quitter.countDown();
            try {
                if (!this.stopped.await(1L, TimeUnit.SECONDS)) {
                    System.out.println(this.name + ": shutdown timed out");
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            CheramiPublisher publisher = null;
            try {
                System.out.println(this.name + " started");
                Config config = this.context.config;
                CreatePublisherRequest.Builder builder = new CreatePublisherRequest.Builder(config.destinationPath);
                publisher = this.context.client.createPublisher(builder.build());
                publisher.open();
                long remaining = config.nMessagesToSend;
                Stats.Profiler writeLatencyProfiler = new Stats.Profiler();
                while (remaining > 0L && this.quitter.getCount() > 0L) {
                    long id = this.context.msgIdCounter.incrementAndGet();
                    PublisherMessage message = this.createMessage(id);
                    writeLatencyProfiler.start();
                    SendReceipt receipt = null;
                    do {
                        receipt = publisher.write(message);
                        switch (receipt.getStatus()) {
                            case OK: {
                                --remaining;
                                this.context.stats.writeLatency.add(writeLatencyProfiler.elapsed());
                                this.context.stats.messagesOutCount.incrementAndGet();
                                this.context.stats.bytesOutCount.addAndGet(message.getData().length);
                                break;
                            }
                            case ERR_THROTTLED: {
                                this.context.stats.messagesOutThrottledCount.incrementAndGet();
                                this.context.stats.messagesOutErrCount.incrementAndGet();
                                this.sleep(100L);
                                break;
                            }
                            default: {
                                this.context.stats.messagesOutErrCount.incrementAndGet();
                            }
                        }
                    } while (receipt.getStatus() != SendReceipt.ReceiptStatus.OK && this.quitter.getCount() > 0L);
                }
            }
            catch (Throwable e) {
                System.out.println("Publisher caught unexpected exception: " + e);
            }
            finally {
                if (publisher != null) {
                    publisher.close();
                }
                System.out.println(this.name + " closed");
                this.stopped.countDown();
            }
        }

        private void sleep(long millis) {
            try {
                Thread.sleep(millis);
            }
            catch (InterruptedException e) {
                return;
            }
        }

        private PublisherMessage createMessage(long id) {
            byte[] payload = new byte[this.context.config.messageSize];
            this.random.nextBytes(payload);
            AppData data = new AppData(id, payload);
            return new PublisherMessage(data.serialize());
        }
    }
}

