/*
 * Decompiled with CFR 0.152.
 */
package org.cometd.benchmark.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.AbstractHistogram;
import org.HdrHistogram.Histogram;
import org.cometd.bayeux.ChannelId;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.benchmark.Atomics;
import org.cometd.benchmark.Config;
import org.cometd.benchmark.MonitoringQueuedThreadPool;
import org.cometd.client.BayeuxClient;
import org.cometd.client.ext.AckExtension;
import org.cometd.client.http.jetty.JettyHttpClientTransport;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.TransportListener;
import org.cometd.client.websocket.javax.WebSocketTransport;
import org.cometd.client.websocket.jetty.JettyWebSocketTransport;
import org.cometd.common.JacksonJSONContextClient;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.toolchain.perf.HistogramSnapshot;
import org.eclipse.jetty.toolchain.perf.MeasureConverter;
import org.eclipse.jetty.toolchain.perf.PlatformMonitor;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;

public class CometDLoadClient
implements MeasureConverter {
    private static final String START_FIELD = "start";
    private final Collection<Histogram> allHistograms = new CopyOnWriteArrayList<Histogram>();
    private final ThreadLocal<Histogram> histogram = ThreadLocal.withInitial(() -> {
        Histogram histogram = new Histogram(TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MINUTES.toNanos(1L), 3);
        this.allHistograms.add(histogram);
        return histogram;
    });
    private final PlatformMonitor monitor = new PlatformMonitor();
    private final AtomicLong ids = new AtomicLong();
    private final List<LoadBayeuxClient> bayeuxClients = Collections.synchronizedList(new ArrayList());
    private final ConcurrentMap<String, ChannelId> channelIds = new ConcurrentHashMap<String, ChannelId>();
    private final ConcurrentMap<Integer, AtomicInteger> roomMap = new ConcurrentHashMap<Integer, AtomicInteger>();
    private final AtomicLong start = new AtomicLong();
    private final AtomicLong end = new AtomicLong();
    private final AtomicLong responses = new AtomicLong();
    private final AtomicLong messages = new AtomicLong();
    private final AtomicLong minLatency = new AtomicLong();
    private final AtomicLong maxLatency = new AtomicLong();
    private final AtomicLong totLatency = new AtomicLong();
    private final AtomicStampedReference<String> maxTime = new AtomicStampedReference<Object>(null, 0);
    private final Map<String, AtomicStampedReference<Long>> sendTimes = new ConcurrentHashMap<String, AtomicStampedReference<Long>>();
    private final Map<String, AtomicStampedReference<List<Long>>> arrivalTimes = new ConcurrentHashMap<String, AtomicStampedReference<List<Long>>>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(8);
    private final MonitoringQueuedThreadPool threadPool = new MonitoringQueuedThreadPool(0);
    private final DynamicConnectionStatistics connectionStatistics = new DynamicConnectionStatistics();
    private HttpClient httpClient;
    private WebSocketClient webSocketClient;
    private ClientContainer webSocketContainer;
    private LatencyListener latencyListener;
    private HandshakeListener handshakeListener;
    private DisconnectListener disconnectListener;
    private boolean interactive = true;
    private String host = "localhost";
    private int port = 8080;
    private ClientTransportType transport = ClientTransportType.LONG_POLLING;
    private boolean http2 = false;
    private boolean tls = false;
    private int selectors = 1;
    private int maxThreads = 256;
    private String context = "/cometd";
    private String channel = "/a";
    private int rooms = 100;
    private int roomsPerClient = 10;
    private boolean ackExtension = false;
    private int iterations = 5;
    private int clients = 1000;
    private int batches = 1000;
    private int batchSize = 10;
    private long batchPause = 10000L;
    private int messageSize = 50;
    private boolean randomize = false;
    private String file = "./result.json";

    public static void main(String[] args) throws Exception {
        CometDLoadClient client = new CometDLoadClient();
        CometDLoadClient.parseArguments(args, client);
        client.run();
    }

    private static void parseArguments(String[] args, CometDLoadClient client) {
        for (String arg : args) {
            if (arg.equals("--auto")) {
                client.interactive = false;
                continue;
            }
            if (arg.startsWith("--host=")) {
                client.host = arg.substring("--host=".length());
                continue;
            }
            if (arg.startsWith("--port=")) {
                client.port = Integer.parseInt(arg.substring("--port=".length()));
                continue;
            }
            if (arg.startsWith("--transport=")) {
                client.transport = ClientTransportType.valueOf(arg.substring("--transport=".length()));
                continue;
            }
            if (arg.equals("--http2")) {
                client.http2 = true;
                continue;
            }
            if (arg.equals("--tls")) {
                client.tls = true;
                continue;
            }
            if (arg.startsWith("--selectors=")) {
                client.selectors = Integer.parseInt(arg.substring("--selectors=".length()));
                continue;
            }
            if (arg.startsWith("--maxThreads=")) {
                client.maxThreads = Integer.parseInt(arg.substring("--maxThreads=".length()));
                continue;
            }
            if (arg.startsWith("--context=")) {
                client.context = arg.substring("--context=".length());
                continue;
            }
            if (arg.startsWith("--channel=")) {
                client.channel = arg.substring("--channel=".length());
                continue;
            }
            if (arg.startsWith("--rooms=")) {
                client.rooms = Integer.parseInt(arg.substring("--rooms=".length()));
                continue;
            }
            if (arg.startsWith("--roomsPerClient=")) {
                client.roomsPerClient = Integer.parseInt(arg.substring("--roomsPerClient=".length()));
                continue;
            }
            if (arg.equals("--ackExtension")) {
                client.ackExtension = true;
                continue;
            }
            if (arg.startsWith("--iterations=")) {
                client.iterations = Integer.parseInt(arg.substring("--iterations=".length()));
                continue;
            }
            if (arg.startsWith("--clients=")) {
                client.clients = Integer.parseInt(arg.substring("--clients=".length()));
                continue;
            }
            if (arg.startsWith("--batches=")) {
                client.batches = Integer.parseInt(arg.substring("--batches=".length()));
                continue;
            }
            if (arg.startsWith("--batchSize=")) {
                client.batchSize = Integer.parseInt(arg.substring("--batchSize=".length()));
                continue;
            }
            if (arg.startsWith("--batchPause=")) {
                client.batchPause = Long.parseLong(arg.substring("--batchPause=".length()));
                continue;
            }
            if (arg.startsWith("--messageSize=")) {
                client.messageSize = Integer.parseInt(arg.substring("--messageSize=".length()));
                continue;
            }
            if (arg.equals("--randomize")) {
                client.randomize = true;
                continue;
            }
            if (!arg.startsWith("--file=")) continue;
            client.file = arg.substring("--file=".length());
        }
    }

    public void run() throws Exception {
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        String host = this.host;
        if (this.interactive) {
            host = System.getProperty("cometd.server", host);
            System.err.printf("server [%s]: ", host);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = host;
            }
            host = value;
        }
        int port = this.port;
        if (this.interactive) {
            port = Integer.parseInt(System.getProperty("cometd.port", String.valueOf(port)));
            System.err.printf("port [%d]: ", port);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(port);
            }
            port = Integer.parseInt(value);
        }
        boolean tls = this.tls;
        if (this.interactive) {
            System.err.printf("use tls [%b]: ", tls);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(tls);
            }
            tls = Boolean.parseBoolean(value);
        }
        int selectors = this.selectors;
        if (this.interactive) {
            System.err.printf("selectors [%d]: ", selectors);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(selectors);
            }
            selectors = Integer.parseInt(value);
        }
        int maxThreads = this.maxThreads;
        if (this.interactive) {
            maxThreads = Integer.parseInt(System.getProperty("cometd.threads", String.valueOf(maxThreads)));
            System.err.printf("max threads [%d]: ", maxThreads);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(maxThreads);
            }
            maxThreads = Integer.parseInt(value);
        }
        ClientTransportType transport = this.transport;
        if (this.interactive) {
            System.err.printf("transports:%n", new Object[0]);
            for (ClientTransportType type : ClientTransportType.values()) {
                System.err.printf("  %d - %s%n", type.ordinal(), type.getName());
            }
            System.err.printf("transport [%d]: ", transport.ordinal());
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(transport.ordinal());
            }
            transport = ClientTransportType.values()[Integer.parseInt(value)];
        }
        boolean http2 = this.http2;
        if (transport == ClientTransportType.LONG_POLLING) {
            if (this.interactive) {
                System.err.printf("use HTTP/2 [%b]: ", http2);
                String value = console.readLine().trim();
                if (value.length() == 0) {
                    value = String.valueOf(http2);
                }
                http2 = Boolean.parseBoolean(value);
            }
        } else {
            http2 = false;
        }
        String contextPath = this.context;
        if (this.interactive) {
            System.err.printf("context [%s]: ", contextPath);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = contextPath;
            }
            contextPath = value;
        }
        String url = (tls ? "https" : "http") + "://" + host + ":" + port + contextPath + "/cometd";
        String channel = this.channel;
        if (this.interactive) {
            channel = System.getProperty("cometd.channel", channel);
            System.err.printf("channel [%s]: ", channel);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = channel;
            }
            channel = value;
        }
        channel = "/bench/" + (channel.startsWith("/") ? channel.substring(1) : channel);
        int rooms = this.rooms;
        if (this.interactive) {
            rooms = Integer.parseInt(System.getProperty("cometd.rooms", String.valueOf(rooms)));
            System.err.printf("rooms [%d]: ", rooms);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(rooms);
            }
            rooms = Integer.parseInt(value);
        }
        int roomsPerClient = this.roomsPerClient;
        if (this.interactive) {
            System.err.printf("rooms per client [%d]: ", roomsPerClient);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(roomsPerClient);
            }
            roomsPerClient = Integer.parseInt(value);
        }
        boolean ackExtension = this.ackExtension;
        if (this.interactive) {
            System.err.printf("enable ack extension [%b]: ", ackExtension);
            String value = console.readLine().trim();
            if (value.length() == 0) {
                value = String.valueOf(ackExtension);
            }
            ackExtension = Boolean.parseBoolean(value);
        }
        MBeanContainer mbeanContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
        mbeanContainer.beanAdded(null, this);
        this.threadPool.setMaxThreads(maxThreads);
        this.threadPool.setDaemon(true);
        this.threadPool.start();
        mbeanContainer.beanAdded(null, this.threadPool);
        AbstractHttpClientTransport httpClientTransport = new HttpClientTransportOverHTTP(selectors);
        if (http2) {
            HTTP2Client http2Client = new HTTP2Client();
            http2Client.setSelectors(selectors);
            httpClientTransport = new HttpClientTransportOverHTTP2(http2Client);
        }
        this.httpClient = new HttpClient(httpClientTransport, new SslContextFactory.Client(true));
        this.httpClient.setMaxConnectionsPerDestination(60000);
        this.httpClient.setMaxRequestsQueuedPerDestination(10000);
        this.httpClient.setExecutor(this.threadPool);
        this.httpClient.setIdleTimeout(30000L);
        this.httpClient.setSocketAddressResolver(new SocketAddressResolver.Sync());
        this.httpClient.addBean(mbeanContainer);
        this.httpClient.addBean(this.connectionStatistics);
        LifeCycle.start(this.httpClient);
        mbeanContainer.beanAdded(null, this.httpClient);
        this.webSocketClient = new WebSocketClient(this.httpClient);
        this.webSocketClient.getPolicy().setInputBufferSize(8192);
        this.webSocketClient.getPolicy().setMaxTextMessageSize(-1);
        this.webSocketClient.addBean(mbeanContainer);
        this.webSocketClient.addBean(this.connectionStatistics);
        LifeCycle.start(this.webSocketClient);
        mbeanContainer.beanAdded(null, this.webSocketClient);
        this.webSocketContainer = new ClientContainer(this.httpClient);
        this.webSocketContainer.addBean(mbeanContainer);
        this.webSocketContainer.addBean(this.connectionStatistics);
        LifeCycle.start(this.webSocketContainer);
        mbeanContainer.beanAdded(null, this.webSocketContainer);
        this.latencyListener = new LatencyListener();
        this.handshakeListener = new HandshakeListener(channel, rooms, roomsPerClient);
        this.disconnectListener = new DisconnectListener();
        LoadBayeuxClient statsClient = new LoadBayeuxClient(url, this.scheduler, this.newClientTransport(transport));
        statsClient.handshake();
        int clients = this.clients;
        int batches = this.batches;
        int batchSize = this.batchSize;
        long batchPause = this.batchPause;
        int messageSize = this.messageSize;
        boolean randomize = this.randomize;
        while (true) {
            String value;
            int currentSize;
            int i;
            System.err.println();
            System.err.println("-----");
            if (this.interactive) {
                System.err.printf("clients [%d]: ", clients);
                String value2 = console.readLine();
                if (value2 == null) break;
                if ((value2 = value2.trim()).length() == 0) {
                    value2 = String.valueOf(clients);
                }
                clients = Integer.parseInt(value2);
            } else if (this.iterations-- == 0) {
                clients = 0;
            }
            System.err.println("Waiting for clients to be ready...");
            int currentClients = this.bayeuxClients.size();
            if (currentClients < clients) {
                for (i = 0; i < clients - currentClients; ++i) {
                    this.bayeuxClients.add(this.handshakeClient(url, transport, ackExtension));
                }
            } else if (currentClients > clients) {
                for (i = 0; i < currentClients - clients; ++i) {
                    this.disconnectClient(this.bayeuxClients.remove(currentClients - i - 1));
                }
            }
            if ((currentSize = this.bayeuxClients.size()) == 0) {
                System.err.println("All clients disconnected, exiting");
                break;
            }
            System.err.printf("Clients ready: %d%n", currentSize);
            this.reset();
            if (this.interactive) {
                System.err.printf("batch count [%d]: ", batches);
                value = console.readLine().trim();
                if (value.length() == 0) {
                    value = String.valueOf(batches);
                }
                batches = Integer.parseInt(value);
            }
            if (this.interactive) {
                System.err.printf("batch size [%d]: ", batchSize);
                value = console.readLine().trim();
                if (value.length() == 0) {
                    value = String.valueOf(batchSize);
                }
                batchSize = Integer.parseInt(value);
            }
            if (this.interactive) {
                System.err.printf("batch pause (\u00b5s) [%d]: ", batchPause);
                value = console.readLine().trim();
                if (value.length() == 0) {
                    value = String.valueOf(batchPause);
                }
                batchPause = Long.parseLong(value);
            }
            if (this.interactive) {
                System.err.printf("message size [%d]: ", messageSize);
                value = console.readLine().trim();
                if (value.length() == 0) {
                    value = String.valueOf(messageSize);
                }
                messageSize = Integer.parseInt(value);
            }
            char[] chars = new char[messageSize];
            Arrays.fill(chars, 'x');
            String chat = new String(chars);
            if (this.interactive) {
                System.err.printf("randomize sends [%b]: ", randomize);
                String value3 = console.readLine().trim();
                if (value3.length() == 0) {
                    value3 = String.valueOf(randomize);
                }
                randomize = Boolean.parseBoolean(value3);
            }
            statsClient.begin();
            PlatformMonitor.Start start = this.monitor.start();
            System.err.println();
            System.err.println(start);
            System.err.printf("Testing %d clients in %d rooms, %d rooms/client%n", this.bayeuxClients.size(), rooms, roomsPerClient);
            System.err.printf("Sending %d batches of %dx%d bytes messages every %d \u00b5s%n", batches, batchSize, messageSize, batchPause);
            long begin = System.nanoTime();
            long expected = this.runBatches(channel, batches, batchSize, TimeUnit.MICROSECONDS.toNanos(batchPause), chat, randomize);
            long end = System.nanoTime();
            long sendElapsed = end - begin;
            PlatformMonitor.Stop stop = this.monitor.stop();
            System.err.println(stop);
            this.waitForMessages(expected);
            long messages = this.messages.get();
            long receiveElapsed = this.end.get() - this.start.get();
            statsClient.end();
            Histogram histogram = this.printResults(messages, expected, sendElapsed, receiveElapsed);
            if (this.interactive) continue;
            LinkedHashMap<String, Object> run = new LinkedHashMap<String, Object>();
            LinkedHashMap<String, Object> config = new LinkedHashMap<String, Object>();
            run.put("config", config);
            config.put("cores", start.cores);
            config.put("totalMemory", new Measure(Float.valueOf(start.gibiBytes(start.totalMemory)), "GiB"));
            config.put("os", start.os);
            config.put("jvm", start.jvm);
            config.put("totalHeap", new Measure(Float.valueOf(start.gibiBytes(start.heap.getMax())), "GiB"));
            config.put("date", new Date(start.date).toString());
            config.put("transport", transport.getName());
            config.put("clients", this.bayeuxClients.size());
            config.put("rooms", rooms);
            config.put("roomsPerClient", roomsPerClient);
            config.put("batches", batches);
            config.put("batchSize", batchSize);
            config.put("batchPause", new Measure(batchPause, "\u00b5s"));
            config.put("messageSize", new Measure((Object)messageSize, "B"));
            LinkedHashMap<String, Serializable> results = new LinkedHashMap<String, Serializable>();
            run.put("results", results);
            results.put("cpu", new Measure(Float.valueOf(stop.percent(stop.cpuTime, stop.time) / (float)start.cores), "%"));
            results.put("jitTime", new Measure(stop.jitTime, "ms"));
            results.put("messages", Long.valueOf(messages));
            results.put("sendTime", new Measure(TimeUnit.NANOSECONDS.toMillis(sendElapsed), "ms"));
            results.put("sendRate", new Measure(messages * 1000L * 1000L * 1000L / sendElapsed, "messages/s"));
            results.put("receiveTime", new Measure(TimeUnit.NANOSECONDS.toMillis(receiveElapsed), "ms"));
            results.put("receiveRate", new Measure(messages * 1000L * 1000L * 1000L / receiveElapsed, "messages/s"));
            LinkedHashMap<String, Measure> latency = new LinkedHashMap<String, Measure>();
            results.put("latency", latency);
            latency.put("min", new Measure(this.convert(histogram.getMinValue()), "\u00b5s"));
            latency.put("p50", new Measure(this.convert(histogram.getValueAtPercentile(50.0)), "\u00b5s"));
            latency.put("p99", new Measure(this.convert(histogram.getValueAtPercentile(99.0)), "\u00b5s"));
            latency.put("max", new Measure(this.convert(histogram.getMaxValue()), "\u00b5s"));
            LinkedHashMap<String, Serializable> threadPool = new LinkedHashMap<String, Serializable>();
            results.put("threadPool", threadPool);
            threadPool.put("tasks", Long.valueOf(this.threadPool.getTasks()));
            threadPool.put("queueSizeMax", Integer.valueOf(this.threadPool.getMaxQueueSize()));
            threadPool.put("activeThreadsMax", Integer.valueOf(this.threadPool.getMaxActiveThreads()));
            threadPool.put("queueLatencyAverage", new Measure(TimeUnit.NANOSECONDS.toMillis(this.threadPool.getAverageQueueLatency()), "ms"));
            threadPool.put("queueLatencyMax", new Measure(TimeUnit.NANOSECONDS.toMillis(this.threadPool.getMaxQueueLatency()), "ms"));
            threadPool.put("taskTimeAverage", new Measure(TimeUnit.NANOSECONDS.toMillis(this.threadPool.getAverageTaskLatency()), "ms"));
            threadPool.put("taskTimeMax", new Measure(TimeUnit.NANOSECONDS.toMillis(this.threadPool.getMaxTaskLatency()), "ms"));
            LinkedHashMap<String, Serializable> gc = new LinkedHashMap<String, Serializable>();
            results.put("gc", gc);
            gc.put("youngCount", Long.valueOf(stop.youngCount));
            gc.put("youngTime", new Measure(stop.youngTime, "ms"));
            gc.put("oldCount", Long.valueOf(stop.oldCount));
            gc.put("oldTime", new Measure(stop.oldTime, "ms"));
            gc.put("youngGarbage", new Measure(Float.valueOf(stop.mebiBytes(stop.edenBytes + stop.survivorBytes)), "MiB"));
            gc.put("oldGarbage", new Measure(Float.valueOf(stop.mebiBytes(stop.tenuredBytes)), "MiB"));
            this.saveResults(run, this.file);
        }
        statsClient.exit();
        LifeCycle.stop(this.webSocketContainer);
        LifeCycle.stop(this.webSocketClient);
        LifeCycle.stop(this.httpClient);
        this.scheduler.shutdown();
    }

    private long runBatches(String channel, int batchCount, int batchSize, long batchPauseNanos, String chat, boolean randomize) {
        long begin = System.nanoTime();
        int index = -1;
        long expected = 0L;
        for (int i = 1; i <= batchCount; ++i) {
            long pause = begin + (long)i * batchPauseNanos - System.nanoTime();
            if (pause > 0L) {
                this.nanoSleep(pause);
            }
            if (randomize) {
                index = this.nextRandom(this.bayeuxClients.size());
            } else if (++index == this.bayeuxClients.size()) {
                index = 0;
            }
            LoadBayeuxClient client = this.bayeuxClients.get(index);
            expected += this.sendBatch(client, channel, batchSize, chat);
        }
        return expected;
    }

    protected LoadBayeuxClient handshakeClient(String url, ClientTransportType transport, boolean ackExtension) {
        LoadBayeuxClient client = new LoadBayeuxClient(url, this.scheduler, this.newClientTransport(transport));
        if (ackExtension) {
            client.addExtension(new AckExtension());
        }
        client.getChannel("/meta/handshake").addListener(this.handshakeListener);
        client.getChannel("/meta/disconnect").addListener(this.disconnectListener);
        client.handshake();
        client.waitForInit();
        return client;
    }

    protected void disconnectClient(LoadBayeuxClient client) {
        client.disconnect();
        client.waitFor(1000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0]);
    }

    private void nanoSleep(long pause) {
        try {
            TimeUnit.NANOSECONDS.sleep(pause);
        }
        catch (InterruptedException x) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(x);
        }
    }

    private long sendBatch(LoadBayeuxClient client, String channel, int batchSize, String chat) {
        long expected = 0L;
        ArrayList rooms = new ArrayList(this.roomMap.keySet());
        client.startBatch();
        for (int b = 0; b < batchSize; ++b) {
            int room = -1;
            AtomicInteger clientsPerRoom = null;
            while (clientsPerRoom == null || clientsPerRoom.get() == 0) {
                room = (Integer)rooms.get(this.nextRandom(rooms.size()));
                clientsPerRoom = (AtomicInteger)this.roomMap.get(room);
            }
            HashMap<String, Object> message = new HashMap<String, Object>(5);
            message.put("room", room);
            message.put("user", client.hashCode());
            message.put("chat", chat);
            message.put(START_FIELD, System.nanoTime());
            message.put("msg_id", this.ids.incrementAndGet() + channel);
            ClientSessionChannel clientChannel = client.getChannel(this.getChannelId(channel + "/" + room));
            clientChannel.publish(message);
            clientChannel.release();
            expected += (long)clientsPerRoom.get();
        }
        client.endBatch();
        return expected;
    }

    private ClientTransport newClientTransport(ClientTransportType clientTransportType) {
        switch (clientTransportType) {
            case LONG_POLLING: {
                HashMap<String, Object> options = new HashMap<String, Object>();
                options.put("jsonContext", new JacksonJSONContextClient());
                options.put("maxNetworkDelay", 5000L);
                options.put("maxMessageSize", Integer.MAX_VALUE);
                return new JettyHttpClientTransport(options, this.httpClient){

                    @Override
                    protected void customize(Request request) {
                        super.customize(request);
                        if (request.getPath().endsWith("/disconnect")) {
                            request.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
                        }
                    }
                };
            }
            case JSR_WEBSOCKET: {
                HashMap<String, Object> options = new HashMap<String, Object>();
                options.put("jsonContext", new JacksonJSONContextClient());
                options.put("maxNetworkDelay", 5000L);
                options.put("maxMessageSize", Integer.MAX_VALUE);
                options.put("idleTimeout", 20000L + this.httpClient.getIdleTimeout());
                return new WebSocketTransport(options, this.scheduler, this.webSocketContainer);
            }
            case JETTY_WEBSOCKET: {
                HashMap<String, Object> options = new HashMap<String, Object>();
                options.put("jsonContext", new JacksonJSONContextClient());
                options.put("maxNetworkDelay", 5000L);
                options.put("maxMessageSize", Integer.MAX_VALUE);
                options.put("idleTimeout", 20000L + this.httpClient.getIdleTimeout());
                return new JettyWebSocketTransport(options, this.scheduler, this.webSocketClient);
            }
        }
        throw new IllegalArgumentException();
    }

    private int nextRandom(int limit) {
        return ThreadLocalRandom.current().nextInt(limit);
    }

    private void updateLatencies(long startTime, long sendTime, long arrivalTime, long endTime) {
        long wallLatency = endTime - startTime;
        this.histogram.get().recordValue(wallLatency);
        long latency = TimeUnit.MICROSECONDS.toNanos(TimeUnit.NANOSECONDS.toMicros(arrivalTime - sendTime));
        Atomics.updateMin(this.minLatency, latency);
        Atomics.updateMax(this.maxLatency, latency);
        this.totLatency.addAndGet(latency);
    }

    private void waitForMessages(long expected) throws InterruptedException {
        int maxRetries;
        long arrived = this.messages.get();
        long lastArrived = 0L;
        int retries = maxRetries = 20;
        while (arrived < expected) {
            System.err.printf("Waiting for messages to arrive %d/%d%n", arrived, expected);
            Thread.sleep(500L);
            if (lastArrived == arrived) {
                if (--retries == 0) {
                    break;
                }
            } else {
                lastArrived = arrived;
                retries = maxRetries;
            }
            arrived = this.messages.get();
        }
        if (arrived < expected) {
            System.err.printf("Interrupting wait for messages %d/%d%n", arrived, expected);
        } else {
            System.err.printf("All messages arrived %d/%d%n", arrived, expected);
        }
    }

    private Histogram printResults(long messageCount, long expectedCount, long sendElapsed, long receiveElapsed) {
        System.err.printf("Messages - Success/Expected = %d/%d%n", messageCount, expectedCount);
        DynamicConnectionStatistics.Data data = this.connectionStatistics.collect();
        if (sendElapsed > 0L) {
            long batchRate = (long)this.batches * 1000L * 1000L * 1000L / sendElapsed;
            float uploadRate = (float)data.sentBytes * 1000.0f * 1000.0f * 1000.0f / (float)sendElapsed / 1024.0f / 1024.0f;
            System.err.printf("Outgoing: Elapsed = %d ms | Rate = %d messages/s - %d batches/s - %.3f MiB/s%n", TimeUnit.NANOSECONDS.toMillis(sendElapsed), (long)this.batchSize * batchRate, batchRate, Float.valueOf(uploadRate));
        }
        if (receiveElapsed > 0L) {
            float downloadRate = (float)data.receivedBytes * 1000.0f * 1000.0f * 1000.0f / (float)receiveElapsed / 1024.0f / 1024.0f;
            System.err.printf("Incoming - Elapsed = %d ms | Rate = %d messages/s - %d batches/s(%.2f%%) - %.3f MiB/s%n", TimeUnit.NANOSECONDS.toMillis(receiveElapsed), messageCount * 1000L * 1000L * 1000L / receiveElapsed, this.responses.get() * 1000L * 1000L * 1000L / receiveElapsed, Float.valueOf(100.0f * (float)this.responses.get() / (float)messageCount), Float.valueOf(downloadRate));
        }
        Histogram histogram = this.allHistograms.stream().reduce(new Histogram(TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MINUTES.toNanos(1L), 3), (h1, h2) -> {
            h1.add((AbstractHistogram)h2);
            return h1;
        });
        System.err.println(new HistogramSnapshot(histogram, 20L, "Messages - Latency", "\u00b5s", this));
        System.err.printf("Messages - Network Latency Min/Ave/Max = %d/%d/%d ms%n", TimeUnit.NANOSECONDS.toMillis(this.minLatency.get()), messageCount == 0L ? -1L : TimeUnit.NANOSECONDS.toMillis(this.totLatency.get() / messageCount), TimeUnit.NANOSECONDS.toMillis(this.maxLatency.get()));
        System.err.printf("Slowest Message ID = %s time = %d ms%n", this.maxTime.getReference(), this.maxTime.getStamp());
        Config.printThreadPool("Thread Pool", this.threadPool);
        return histogram;
    }

    private void saveResults(Map<String, Object> run, String path) {
        try {
            File file = new File(path);
            ObjectMapper mapper = new ObjectMapper();
            mapper.enable(SerializationFeature.INDENT_OUTPUT);
            mapper.writeValue(file, run);
            System.err.printf("Results saved to file %s%n", file.getAbsolutePath());
        }
        catch (IOException x) {
            System.err.printf("Could not save results to file %s%n", path);
        }
    }

    @Override
    public long convert(long measure) {
        return TimeUnit.NANOSECONDS.toMicros(measure);
    }

    private void reset() {
        this.allHistograms.forEach(AbstractHistogram::reset);
        this.threadPool.reset();
        this.start.set(0L);
        this.end.set(0L);
        this.responses.set(0L);
        this.messages.set(0L);
        this.minLatency.set(Long.MAX_VALUE);
        this.maxLatency.set(0L);
        this.totLatency.set(0L);
        this.maxTime.set(null, 0);
        this.sendTimes.clear();
        this.arrivalTimes.clear();
        this.connectionStatistics.reset();
    }

    private ChannelId getChannelId(String channelName) {
        ChannelId existing;
        ChannelId result = (ChannelId)this.channelIds.get(channelName);
        if (result == null && (existing = this.channelIds.putIfAbsent(channelName, result = new ChannelId(channelName))) != null) {
            result = existing;
        }
        return result;
    }

    private class LatencyListener
    implements ClientSessionChannel.MessageListener {
        private LatencyListener() {
        }

        @Override
        public void onMessage(ClientSessionChannel channel, Message message) {
            long arrivalTime;
            long sendTime;
            String id;
            long endTime;
            long startTime;
            Map<String, Object> data = message.getDataAsMap();
            if (data != null) {
                startTime = ((Number)data.get(CometDLoadClient.START_FIELD)).longValue();
                endTime = System.nanoTime();
                CometDLoadClient.this.start.compareAndSet(0L, endTime);
                CometDLoadClient.this.end.set(endTime);
                CometDLoadClient.this.messages.incrementAndGet();
                id = (String)data.get("msg_id");
                AtomicStampedReference sendTimeRef = (AtomicStampedReference)CometDLoadClient.this.sendTimes.get(id);
                sendTime = (Long)sendTimeRef.getReference();
                if (Atomics.decrement(sendTimeRef) == 0) {
                    CometDLoadClient.this.sendTimes.remove(id);
                }
                AtomicStampedReference arrivalTimeRef = (AtomicStampedReference)CometDLoadClient.this.arrivalTimes.get(id);
                arrivalTime = (Long)((List)arrivalTimeRef.getReference()).remove(0);
                if (Atomics.decrement(arrivalTimeRef) == 0) {
                    CometDLoadClient.this.arrivalTimes.remove(id);
                }
            } else {
                throw new IllegalStateException("No 'data' field in message " + message);
            }
            long delayMs = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
            Atomics.updateMax(CometDLoadClient.this.maxTime, id, (int)delayMs);
            CometDLoadClient.this.updateLatencies(startTime, sendTime, arrivalTime, endTime);
        }
    }

    private static class DynamicConnectionStatistics
    implements Connection.Listener {
        private final Set<Connection> _connections = Collections.newSetFromMap(new ConcurrentHashMap());
        private final LongAdder _rcvdBytes = new LongAdder();
        private final LongAdder _sentBytes = new LongAdder();
        private Data _lastData = new Data(0L, 0L);

        private DynamicConnectionStatistics() {
        }

        @Override
        public void onOpened(Connection connection) {
            this._connections.add(connection);
        }

        @Override
        public void onClosed(Connection connection) {
            this._connections.remove(connection);
            this.collect(connection);
        }

        public void reset() {
            this._lastData = new Data(this._rcvdBytes.sumThenReset(), this._sentBytes.sumThenReset());
        }

        public Data collect() {
            this._connections.forEach(this::collect);
            return new Data(this._rcvdBytes.longValue() - this._lastData.receivedBytes, this._sentBytes.longValue() - this._lastData.sentBytes);
        }

        private void collect(Connection connection) {
            long bytesOut;
            long bytesIn = connection.getBytesIn();
            if (bytesIn > 0L) {
                this._rcvdBytes.add(bytesIn);
            }
            if ((bytesOut = connection.getBytesOut()) > 0L) {
                this._sentBytes.add(bytesOut);
            }
        }

        public static class Data {
            public final long receivedBytes;
            public final long sentBytes;

            private Data(long receivedBytes, long sentBytes) {
                this.receivedBytes = receivedBytes;
                this.sentBytes = sentBytes;
            }
        }
    }

    private static enum ClientTransportType {
        LONG_POLLING("long-polling"),
        JSR_WEBSOCKET("jsr-websocket"),
        JETTY_WEBSOCKET("jetty-websocket");

        private final String name;

        private ClientTransportType(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }
    }

    private class HandshakeListener
    implements ClientSessionChannel.MessageListener {
        private static final String SESSION_ID_ATTRIBUTE = "session_id";
        private final String channel;
        private final int rooms;
        private final int roomsPerClient;

        private HandshakeListener(String channel, int rooms, int roomsPerClient) {
            this.channel = channel;
            this.rooms = rooms;
            this.roomsPerClient = roomsPerClient;
        }

        @Override
        public void onMessage(ClientSessionChannel handshakeChannel, Message handshakeReply) {
            if (handshakeReply.isSuccessful()) {
                LoadBayeuxClient client = (LoadBayeuxClient)handshakeChannel.getSession();
                String sessionId = (String)client.getAttribute(SESSION_ID_ATTRIBUTE);
                if (sessionId == null) {
                    client.setAttribute(SESSION_ID_ATTRIBUTE, client.getId());
                    client.batch(() -> {
                        ArrayList<Integer> roomsSubscribedTo = new ArrayList<Integer>();
                        for (int j = 0; j < this.roomsPerClient; ++j) {
                            int room = CometDLoadClient.this.nextRandom(this.rooms);
                            while (roomsSubscribedTo.contains(room)) {
                                room = CometDLoadClient.this.nextRandom(this.rooms);
                            }
                            roomsSubscribedTo.add(room);
                            client.setupRoom(room);
                            client.getChannel(this.channel + "/" + room).subscribe(CometDLoadClient.this.latencyListener);
                        }
                        client.init();
                    });
                } else {
                    System.err.printf("Second handshake for client %s: old session %s, new session %s%n", this, sessionId, client.getId());
                }
            }
        }
    }

    private class DisconnectListener
    implements ClientSessionChannel.MessageListener {
        private DisconnectListener() {
        }

        @Override
        public void onMessage(ClientSessionChannel channel, Message message) {
            if (message.isSuccessful()) {
                LoadBayeuxClient client = (LoadBayeuxClient)channel.getSession();
                client.destroy();
            }
        }
    }

    private class LoadBayeuxClient
    extends BayeuxClient {
        private final List<Integer> subscriptions;
        private final CountDownLatch initLatch;

        private LoadBayeuxClient(String url, ScheduledExecutorService scheduler, ClientTransport transport) {
            super(url, scheduler, transport, new ClientTransport[0]);
            this.subscriptions = new ArrayList<Integer>();
            this.initLatch = new CountDownLatch(1);
            this.addTransportListener(new TransportListener(){

                @Override
                public void onSending(List<? extends Message> messages) {
                    LoadBayeuxClient.this.recordSentMessages(messages);
                }

                @Override
                public void onMessages(List<Message.Mutable> messages) {
                    LoadBayeuxClient.this.recordReceivedMessages(messages);
                }
            });
        }

        public void setupRoom(int room) {
            AtomicInteger clientsPerRoom = (AtomicInteger)CometDLoadClient.this.roomMap.get(room);
            if (clientsPerRoom == null) {
                clientsPerRoom = new AtomicInteger();
                AtomicInteger existing = CometDLoadClient.this.roomMap.putIfAbsent(room, clientsPerRoom);
                if (existing != null) {
                    clientsPerRoom = existing;
                }
            }
            clientsPerRoom.incrementAndGet();
            this.subscriptions.add(room);
        }

        public void init() {
            this.getChannel("/service/init").publish(new HashMap(), message -> this.initLatch.countDown());
        }

        public void waitForInit() {
            try {
                this.initLatch.await();
            }
            catch (InterruptedException x) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(x);
            }
        }

        public void destroy() {
            for (Integer room : this.subscriptions) {
                AtomicInteger clientsPerRoom = (AtomicInteger)CometDLoadClient.this.roomMap.get(room);
                clientsPerRoom.decrementAndGet();
            }
            this.subscriptions.clear();
        }

        public void begin() throws InterruptedException {
            this.notifyServer("/service/statistics/start");
        }

        public void end() throws InterruptedException {
            this.notifyServer("/service/statistics/stop");
        }

        public void exit() throws InterruptedException {
            this.notifyServer("/service/statistics/exit");
        }

        private void notifyServer(String channelName) throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(1);
            ClientSessionChannel channel = this.getChannel(channelName);
            channel.publish(new HashMap(1), message -> latch.countDown());
            latch.await();
        }

        private void recordSentMessages(List<? extends Message> messages) {
            long now = System.nanoTime();
            for (Message message : messages) {
                Map<String, Object> data = message.getDataAsMap();
                if (data == null || !message.getChannelId().isBroadcast()) continue;
                int room = (Integer)data.get("room");
                int clientsInRoom = ((AtomicInteger)CometDLoadClient.this.roomMap.get(room)).get();
                String id = (String)data.get("msg_id");
                CometDLoadClient.this.sendTimes.put(id, new AtomicStampedReference<Long>(now, clientsInRoom));
                CometDLoadClient.this.arrivalTimes.put(id, new AtomicStampedReference(Collections.synchronizedList(new LinkedList()), clientsInRoom));
            }
        }

        private void recordReceivedMessages(List<Message.Mutable> messages) {
            long now = System.nanoTime();
            boolean response = false;
            for (Message message : messages) {
                Map<String, Object> data = message.getDataAsMap();
                if (data == null) continue;
                response = true;
                String id = (String)data.get("msg_id");
                ((List)((AtomicStampedReference)CometDLoadClient.this.arrivalTimes.get(id)).getReference()).add(now);
            }
            if (response) {
                CometDLoadClient.this.responses.incrementAndGet();
            }
        }
    }

    private static class Measure
    extends HashMap<String, Object> {
        public Measure(Object value, String unit) {
            super(2);
            this.put("value", value);
            this.put("unit", unit);
        }
    }
}

