/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.client;

import io.reactivesocket.Availability;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.LoadBalancerSocketMetrics;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.exceptions.NoAvailableReactiveSocketException;
import io.reactivesocket.exceptions.TimeoutException;
import io.reactivesocket.exceptions.TransportException;
import io.reactivesocket.stat.Ewma;
import io.reactivesocket.stat.FrugalQuantile;
import io.reactivesocket.stat.Median;
import io.reactivesocket.stat.Quantile;
import io.reactivesocket.util.Clock;
import io.reactivesocket.util.ReactiveSocketProxy;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSource;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSource;
import reactor.core.publisher.Operators;

public class LoadBalancer
implements ReactiveSocket {
    private static final Logger logger = LoggerFactory.getLogger(LoadBalancer.class);
    public static final double DEFAULT_EXP_FACTOR = 4.0;
    public static final double DEFAULT_LOWER_QUANTILE = 0.2;
    public static final double DEFAULT_HIGHER_QUANTILE = 0.8;
    public static final double DEFAULT_MIN_PENDING = 1.0;
    public static final double DEFAULT_MAX_PENDING = 2.0;
    public static final int DEFAULT_MIN_APERTURE = 3;
    public static final int DEFAULT_MAX_APERTURE = 100;
    public static final long DEFAULT_MAX_REFRESH_PERIOD_MS = TimeUnit.MILLISECONDS.convert(5L, TimeUnit.MINUTES);
    private static final long APERTURE_REFRESH_PERIOD = Clock.unit().convert(15L, TimeUnit.SECONDS);
    private static final int EFFORT = 5;
    private static final long DEFAULT_INITIAL_INTER_ARRIVAL_TIME = Clock.unit().convert(1L, TimeUnit.SECONDS);
    private static final int DEFAULT_INTER_ARRIVAL_FACTOR = 500;
    private final double minPendings;
    private final double maxPendings;
    private final int minAperture;
    private final int maxAperture;
    private final long maxRefreshPeriod;
    private final double expFactor;
    private final Quantile lowerQuantile;
    private final Quantile higherQuantile;
    private Runnable readyCallback;
    private int pendingSockets;
    private final ArrayList<WeightedSocket> activeSockets;
    private final ArrayList<ReactiveSocketClient> activeFactories;
    private final FactoriesRefresher factoryRefresher;
    private final Mono<ReactiveSocket> selectSocket;
    private final Ewma pendings;
    private volatile int targetAperture;
    private long lastApertureRefresh;
    private long refreshPeriod;
    private volatile long lastRefresh;
    private final MonoProcessor<Void> closeSubject = MonoProcessor.create();
    private static final FailingReactiveSocket FAILING_REACTIVE_SOCKET = new FailingReactiveSocket();

    public LoadBalancer(Publisher<? extends Collection<ReactiveSocketClient>> factories, double expFactor, double lowQuantile, double highQuantile, double minPendings, double maxPendings, int minAperture, int maxAperture, long maxRefreshPeriodMs) {
        this.expFactor = expFactor;
        this.lowerQuantile = new FrugalQuantile(lowQuantile);
        this.higherQuantile = new FrugalQuantile(highQuantile);
        this.activeSockets = new ArrayList();
        this.activeFactories = new ArrayList();
        this.pendingSockets = 0;
        this.factoryRefresher = new FactoriesRefresher();
        this.selectSocket = Mono.fromCallable(this::select);
        this.minPendings = minPendings;
        this.maxPendings = maxPendings;
        this.pendings = new Ewma(15L, TimeUnit.SECONDS, (minPendings + maxPendings) / 2.0);
        this.minAperture = minAperture;
        this.maxAperture = maxAperture;
        this.targetAperture = minAperture;
        this.maxRefreshPeriod = Clock.unit().convert(maxRefreshPeriodMs, TimeUnit.MILLISECONDS);
        this.lastApertureRefresh = Clock.now();
        this.refreshPeriod = Clock.unit().convert(15L, TimeUnit.SECONDS);
        this.lastRefresh = Clock.now();
        factories.subscribe((Subscriber)this.factoryRefresher);
    }

    public LoadBalancer(Publisher<? extends Collection<ReactiveSocketClient>> factories) {
        this(factories, 4.0, 0.2, 0.8, 1.0, 2.0, 3, 100, DEFAULT_MAX_REFRESH_PERIOD_MS);
    }

    LoadBalancer(Publisher<? extends Collection<ReactiveSocketClient>> factories, Runnable readyCallback) {
        this(factories, 4.0, 0.2, 0.8, 1.0, 2.0, 3, 100, DEFAULT_MAX_REFRESH_PERIOD_MS);
        this.readyCallback = readyCallback;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return this.selectSocket.then(socket -> socket.fireAndForget(payload));
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return this.selectSocket.then(socket -> socket.requestResponse(payload));
    }

    public Flux<Payload> requestStream(Payload payload) {
        return this.selectSocket.flatMap(socket -> socket.requestStream(payload));
    }

    public Mono<Void> metadataPush(Payload payload) {
        return this.selectSocket.then(socket -> socket.metadataPush(payload));
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return this.selectSocket.flatMap(socket -> socket.requestChannel(payloads));
    }

    private synchronized void addSockets(int numberOfNewSocket) {
        int n = numberOfNewSocket;
        if (n > this.activeFactories.size()) {
            n = this.activeFactories.size();
            logger.debug("addSockets({}) restricted by the number of factories, i.e. addSockets({})", (Object)numberOfNewSocket, (Object)n);
        }
        ThreadLocalRandom rng = ThreadLocalRandom.current();
        while (n > 0) {
            int size = this.activeFactories.size();
            if (size == 1) {
                ReactiveSocketClient factory = this.activeFactories.get(0);
                if (!(factory.availability() > 0.0)) break;
                this.activeFactories.remove(0);
                ++this.pendingSockets;
                factory.connect().subscribe((Subscriber)new SocketAdder(factory));
                break;
            }
            ReactiveSocketClient factory0 = null;
            ReactiveSocketClient factory1 = null;
            int i0 = 0;
            int i1 = 0;
            for (int i = 0; i < 5; ++i) {
                i0 = ((Random)rng).nextInt(size);
                i1 = ((Random)rng).nextInt(size - 1);
                if (i1 >= i0) {
                    ++i1;
                }
                factory0 = this.activeFactories.get(i0);
                factory1 = this.activeFactories.get(i1);
                if (factory0.availability() > 0.0 && factory1.availability() > 0.0) break;
            }
            if (factory0.availability() < factory1.availability()) {
                --n;
                ++this.pendingSockets;
                if (i1 < size - 1) {
                    this.activeFactories.set(i1, this.activeFactories.get(size - 1));
                }
                this.activeFactories.remove(size - 1);
                factory1.connect().subscribe((Subscriber)new SocketAdder(factory1));
                continue;
            }
            --n;
            ++this.pendingSockets;
            if (i0 < size - 1) {
                this.activeFactories.set(i0, this.activeFactories.get(size - 1));
            }
            this.activeFactories.remove(size - 1);
            factory0.connect().subscribe((Subscriber)new SocketAdder(factory0));
        }
    }

    private synchronized void refreshAperture() {
        boolean underRateLimit;
        int n = this.activeSockets.size();
        if (n == 0) {
            return;
        }
        double p = 0.0;
        for (WeightedSocket wrs : this.activeSockets) {
            p += (double)wrs.getPending();
        }
        this.pendings.insert(p /= (double)(n + this.pendingSockets));
        double avgPending = this.pendings.value();
        long now = Clock.now();
        boolean bl = underRateLimit = now - this.lastApertureRefresh > APERTURE_REFRESH_PERIOD;
        if (avgPending < 1.0 && underRateLimit) {
            this.updateAperture(this.targetAperture - 1, now);
        } else if (2.0 < avgPending && underRateLimit) {
            this.updateAperture(this.targetAperture + 1, now);
        }
    }

    private void updateAperture(int newValue, long now) {
        int previous = this.targetAperture;
        this.targetAperture = newValue;
        this.targetAperture = Math.max(this.minAperture, this.targetAperture);
        int maxAperture = Math.min(this.maxAperture, this.activeSockets.size() + this.activeFactories.size());
        this.targetAperture = Math.min(maxAperture, this.targetAperture);
        this.lastApertureRefresh = now;
        this.pendings.reset((this.minPendings + this.maxPendings) / 2.0);
        if (this.targetAperture != previous) {
            logger.debug("Current pending={}, new target={}, previous target={}", new Object[]{this.pendings.value(), this.targetAperture, previous});
        }
    }

    private synchronized void refreshSockets() {
        this.refreshAperture();
        int n = this.pendingSockets + this.activeSockets.size();
        if (n < this.targetAperture && !this.activeFactories.isEmpty()) {
            logger.debug("aperture {} is below target {}, adding {} sockets", new Object[]{n, this.targetAperture, this.targetAperture - n});
            this.addSockets(this.targetAperture - n);
        } else if (this.targetAperture < this.activeSockets.size()) {
            logger.debug("aperture {} is above target {}, quicking 1 socket", (Object)n, (Object)this.targetAperture);
            this.quickSlowestRS();
        }
        long now = Clock.now();
        if (now - this.lastRefresh >= this.refreshPeriod) {
            long prev = this.refreshPeriod;
            this.refreshPeriod = (long)Math.min((double)this.refreshPeriod * 1.5, (double)this.maxRefreshPeriod);
            logger.debug("Bumping refresh period, {}->{}", (Object)(prev / 1000L), (Object)(this.refreshPeriod / 1000L));
            this.lastRefresh = now;
            this.addSockets(1);
        }
    }

    private synchronized void quickSlowestRS() {
        if (this.activeSockets.size() <= 1) {
            return;
        }
        WeightedSocket slowest = null;
        double lowestAvailability = Double.MAX_VALUE;
        for (WeightedSocket socket : this.activeSockets) {
            double load = socket.availability();
            if (load == 0.0) {
                slowest = socket;
                break;
            }
            if (socket.getPredictedLatency() != 0.0) {
                load *= 1.0 / socket.getPredictedLatency();
            }
            if (!(load < lowestAvailability)) continue;
            lowestAvailability = load;
            slowest = socket;
        }
        if (slowest != null) {
            this.removeSocket(slowest, false);
        }
    }

    private synchronized void removeSocket(WeightedSocket socket, boolean refresh) {
        try {
            logger.debug("Removing socket: -> " + socket);
            this.activeSockets.remove(socket);
            this.activeFactories.add(socket.getFactory());
            socket.close().subscribe();
            if (refresh) {
                this.refreshSockets();
            }
        }
        catch (Exception e) {
            logger.warn("Exception while closing a ReactiveSocket", (Throwable)e);
        }
    }

    public synchronized double availability() {
        double currentAvailability = 0.0;
        if (!this.activeSockets.isEmpty()) {
            for (WeightedSocket rs : this.activeSockets) {
                currentAvailability += rs.availability();
            }
            currentAvailability /= (double)this.activeSockets.size();
        }
        return currentAvailability;
    }

    private synchronized ReactiveSocket select() {
        double w2;
        double w1;
        if (this.activeSockets.isEmpty()) {
            return FAILING_REACTIVE_SOCKET;
        }
        this.refreshSockets();
        int size = this.activeSockets.size();
        if (size == 1) {
            return (ReactiveSocket)this.activeSockets.get(0);
        }
        WeightedSocket rsc1 = null;
        WeightedSocket rsc2 = null;
        ThreadLocalRandom rng = ThreadLocalRandom.current();
        for (int i = 0; i < 5; ++i) {
            int i1 = ((Random)rng).nextInt(size);
            int i2 = ((Random)rng).nextInt(size - 1);
            if (i2 >= i1) {
                ++i2;
            }
            rsc1 = this.activeSockets.get(i1);
            rsc2 = this.activeSockets.get(i2);
            if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) break;
            if (i + 1 != 5 || this.activeFactories.isEmpty()) continue;
            this.addSockets(1);
        }
        if ((w1 = this.algorithmicWeight(rsc1)) < (w2 = this.algorithmicWeight(rsc2))) {
            return rsc2;
        }
        return rsc1;
    }

    private double algorithmicWeight(WeightedSocket socket) {
        if (socket == null || socket.availability() == 0.0) {
            return 0.0;
        }
        int pendings = socket.getPending();
        double latency = socket.getPredictedLatency();
        double low = this.lowerQuantile.estimation();
        double high = Math.max(this.higherQuantile.estimation(), low * 1.001);
        double bandWidth = Math.max(high - low, 1.0);
        if (latency < low) {
            double alpha = (low - latency) / bandWidth;
            double bonusFactor = Math.pow(1.0 + alpha, this.expFactor);
            latency /= bonusFactor;
        } else if (latency > high) {
            double alpha = (latency - high) / bandWidth;
            double penaltyFactor = Math.pow(1.0 + alpha, this.expFactor);
            latency *= penaltyFactor;
        }
        return socket.availability() * 1.0 / (1.0 + latency * (double)(pendings + 1));
    }

    public synchronized String toString() {
        return "LoadBalancer(a:" + this.activeSockets.size() + ", f: " + this.activeFactories.size() + ", avgPendings=" + this.pendings.value() + ", targetAperture=" + this.targetAperture + ", band=[" + this.lowerQuantile.estimation() + ", " + this.higherQuantile.estimation() + "])";
    }

    public Mono<Void> close() {
        return MonoSource.wrap(subscriber -> {
            subscriber.onSubscribe(Operators.emptySubscription());
            LoadBalancer loadBalancer = this;
            synchronized (loadBalancer) {
                this.factoryRefresher.close();
                this.activeFactories.clear();
                final AtomicInteger n = new AtomicInteger(this.activeSockets.size());
                this.activeSockets.forEach(rs -> rs.close().subscribe((Subscriber)new Subscriber<Void>(){

                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE);
                    }

                    public void onNext(Void aVoid) {
                    }

                    public void onError(Throwable t) {
                        logger.warn("Exception while closing a ReactiveSocket", t);
                        this.onComplete();
                    }

                    public void onComplete() {
                        if (n.decrementAndGet() == 0) {
                            subscriber.onComplete();
                            LoadBalancer.this.closeSubject.onComplete();
                        }
                    }
                }));
            }
        });
    }

    public Mono<Void> onClose() {
        return this.closeSubject;
    }

    private class ActiveList<T extends Availability> {
        private final ArrayList<T> holder;
        private final boolean server;

        public ActiveList(boolean server) {
            this.server = server;
            this.holder = new ArrayList(128);
        }

        public void add(T item) {
            this.holder.add(item);
        }

        public T remove(int index) {
            Availability item = (Availability)this.holder.remove(index);
            return (T)item;
        }

        public boolean remove(T item) {
            boolean removed = this.holder.remove(item);
            return removed;
        }

        public T set(int index, T item) {
            Availability prev = (Availability)this.holder.set(index, item);
            return (T)prev;
        }

        public void addAll(Collection<? extends T> toAdd) {
            this.holder.addAll(toAdd);
        }

        public void clear() {
            this.holder.clear();
        }

        public int size() {
            return this.holder.size();
        }
    }

    private class WeightedSocket
    extends ReactiveSocketProxy
    implements LoadBalancerSocketMetrics {
        private static final double STARTUP_PENALTY = 2.251799813685247E15;
        private ReactiveSocketClient factory;
        private final Quantile lowerQuantile;
        private final Quantile higherQuantile;
        private final long inactivityFactor;
        private volatile int pending;
        private long stamp;
        private long stamp0;
        private long duration;
        private Median median;
        private Ewma interArrivalTime;
        private AtomicLong pendingStreams;

        WeightedSocket(ReactiveSocket child, ReactiveSocketClient factory, Quantile lowerQuantile, Quantile higherQuantile, int inactivityFactor) {
            long now;
            super(child);
            this.factory = factory;
            this.lowerQuantile = lowerQuantile;
            this.higherQuantile = higherQuantile;
            this.inactivityFactor = inactivityFactor;
            this.stamp = now = Clock.now();
            this.stamp0 = now;
            this.duration = 0L;
            this.pending = 0;
            this.median = new Median();
            this.interArrivalTime = new Ewma(1L, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
            this.pendingStreams = new AtomicLong();
            child.onClose().doFinally(signalType -> LoadBalancer.this.removeSocket(this, true)).subscribe();
        }

        WeightedSocket(ReactiveSocket child, ReactiveSocketClient factory, Quantile lowerQuantile, Quantile higherQuantile) {
            this(child, factory, lowerQuantile, higherQuantile, 500);
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return MonoSource.wrap(subscriber -> this.source.requestResponse(payload).subscribe(new LatencySubscriber(subscriber, this)));
        }

        public Flux<Payload> requestStream(Payload payload) {
            return FluxSource.wrap(subscriber -> this.source.requestStream(payload).subscribe(new CountingSubscriber(subscriber, this)));
        }

        public Mono<Void> fireAndForget(Payload payload) {
            return MonoSource.wrap(subscriber -> this.source.fireAndForget(payload).subscribe(new CountingSubscriber(subscriber, this)));
        }

        public Mono<Void> metadataPush(Payload payload) {
            return MonoSource.wrap(subscriber -> this.source.metadataPush(payload).subscribe(new CountingSubscriber(subscriber, this)));
        }

        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return FluxSource.wrap(subscriber -> this.source.requestChannel(payloads).subscribe(new CountingSubscriber(subscriber, this)));
        }

        ReactiveSocketClient getFactory() {
            return this.factory;
        }

        synchronized double getPredictedLatency() {
            double weight;
            long now = Clock.now();
            long elapsed = Math.max(now - this.stamp, 1L);
            double prediction = this.median.estimation();
            if (prediction == 0.0) {
                weight = this.pending == 0 ? 0.0 : 2.251799813685247E15 + (double)this.pending;
            } else if (this.pending == 0 && (double)elapsed > (double)this.inactivityFactor * this.interArrivalTime.value()) {
                this.median.insert(0.0);
                weight = this.median.estimation();
            } else {
                double predicted = prediction * (double)this.pending;
                double instant = this.instantaneous(now);
                weight = predicted < instant ? instant / (double)this.pending : prediction;
            }
            return weight;
        }

        int getPending() {
            return this.pending;
        }

        private synchronized long instantaneous(long now) {
            return this.duration + (now - this.stamp0) * (long)this.pending;
        }

        private synchronized long incr() {
            long now = Clock.now();
            this.interArrivalTime.insert(now - this.stamp);
            this.duration += Math.max(0L, now - this.stamp0) * (long)this.pending;
            ++this.pending;
            this.stamp = now;
            this.stamp0 = now;
            return now;
        }

        private synchronized long decr(long timestamp) {
            long now = Clock.now();
            this.duration += Math.max(0L, now - this.stamp0) * (long)this.pending - (now - timestamp);
            --this.pending;
            this.stamp0 = now;
            return now;
        }

        private synchronized void observe(double rtt) {
            this.median.insert(rtt);
            this.lowerQuantile.insert(rtt);
            this.higherQuantile.insert(rtt);
        }

        public Mono<Void> close() {
            return this.source.close();
        }

        public String toString() {
            return "WeightedSocket(median=" + this.median.estimation() + " quantile-low=" + this.lowerQuantile.estimation() + " quantile-high=" + this.higherQuantile.estimation() + " inter-arrival=" + this.interArrivalTime.value() + " duration/pending=" + (this.pending == 0 ? 0.0 : (double)this.duration / (double)this.pending) + " pending=" + this.pending + " availability= " + this.availability() + ")->" + this.source;
        }

        @Override
        public double medianLatency() {
            return this.median.estimation();
        }

        @Override
        public double lowerQuantileLatency() {
            return this.lowerQuantile.estimation();
        }

        @Override
        public double higherQuantileLatency() {
            return this.higherQuantile.estimation();
        }

        @Override
        public double interArrivalTime() {
            return this.interArrivalTime.value();
        }

        @Override
        public int pending() {
            return this.pending;
        }

        @Override
        public long lastTimeUsedMillis() {
            return this.stamp0;
        }

        private class CountingSubscriber<U>
        implements Subscriber<U> {
            private final Subscriber<U> child;
            private final WeightedSocket socket;

            CountingSubscriber(Subscriber<U> child, WeightedSocket socket) {
                this.child = child;
                this.socket = socket;
            }

            public void onSubscribe(Subscription s) {
                this.socket.pendingStreams.incrementAndGet();
                this.child.onSubscribe(s);
            }

            public void onNext(U u) {
                this.child.onNext(u);
            }

            public void onError(Throwable t) {
                this.socket.pendingStreams.decrementAndGet();
                this.child.onError(t);
                if (t instanceof TransportException || t instanceof ClosedChannelException) {
                    LoadBalancer.this.removeSocket(this.socket, true);
                }
            }

            public void onComplete() {
                this.socket.pendingStreams.decrementAndGet();
                this.child.onComplete();
            }
        }

        private class LatencySubscriber<U>
        implements Subscriber<U> {
            private final Subscriber<U> child;
            private final WeightedSocket socket;
            private final AtomicBoolean done;
            private long start;

            LatencySubscriber(Subscriber<U> child, WeightedSocket socket) {
                this.child = child;
                this.socket = socket;
                this.done = new AtomicBoolean(false);
            }

            public void onSubscribe(final Subscription s) {
                this.start = WeightedSocket.this.incr();
                this.child.onSubscribe(new Subscription(){

                    public void request(long n) {
                        s.request(n);
                    }

                    public void cancel() {
                        if (LatencySubscriber.this.done.compareAndSet(false, true)) {
                            s.cancel();
                            WeightedSocket.this.decr(LatencySubscriber.this.start);
                        }
                    }
                });
            }

            public void onNext(U u) {
                this.child.onNext(u);
            }

            public void onError(Throwable t) {
                if (this.done.compareAndSet(false, true)) {
                    this.child.onError(t);
                    long now = WeightedSocket.this.decr(this.start);
                    if (t instanceof TransportException || t instanceof ClosedChannelException) {
                        LoadBalancer.this.removeSocket(this.socket, true);
                    } else if (t instanceof TimeoutException) {
                        WeightedSocket.this.observe(now - this.start);
                    }
                }
            }

            public void onComplete() {
                if (this.done.compareAndSet(false, true)) {
                    long now = WeightedSocket.this.decr(this.start);
                    WeightedSocket.this.observe(now - this.start);
                    this.child.onComplete();
                }
            }
        }
    }

    private static class FailingReactiveSocket
    implements ReactiveSocket {
        private static final NoAvailableReactiveSocketException NO_AVAILABLE_RS_EXCEPTION = new NoAvailableReactiveSocketException();
        private static final Mono<Void> errorVoid = Mono.error((Throwable)NO_AVAILABLE_RS_EXCEPTION);
        private static final Mono<Payload> errorPayload = Mono.error((Throwable)NO_AVAILABLE_RS_EXCEPTION);

        private FailingReactiveSocket() {
        }

        public Mono<Void> fireAndForget(Payload payload) {
            return errorVoid;
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return errorPayload;
        }

        public Flux<Payload> requestStream(Payload payload) {
            return errorPayload.flux();
        }

        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return errorPayload.flux();
        }

        public Mono<Void> metadataPush(Payload payload) {
            return errorVoid;
        }

        public double availability() {
            return 0.0;
        }

        public Mono<Void> close() {
            return Mono.empty();
        }

        public Mono<Void> onClose() {
            return Mono.empty();
        }
    }

    private class SocketAdder
    implements Subscriber<ReactiveSocket> {
        private final ReactiveSocketClient factory;
        private int errors;

        private SocketAdder(ReactiveSocketClient factory) {
            this.factory = factory;
        }

        public void onSubscribe(Subscription s) {
            s.request(1L);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(ReactiveSocket rs) {
            LoadBalancer loadBalancer = LoadBalancer.this;
            synchronized (loadBalancer) {
                if (LoadBalancer.this.activeSockets.size() >= LoadBalancer.this.targetAperture) {
                    LoadBalancer.this.quickSlowestRS();
                }
                WeightedSocket weightedSocket = new WeightedSocket(rs, this.factory, LoadBalancer.this.lowerQuantile, LoadBalancer.this.higherQuantile);
                logger.debug("Adding new WeightedSocket {}", (Object)weightedSocket);
                LoadBalancer.this.activeSockets.add(weightedSocket);
                if (LoadBalancer.this.readyCallback != null) {
                    LoadBalancer.this.readyCallback.run();
                }
                LoadBalancer.this.pendingSockets = LoadBalancer.this.pendingSockets - 1;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onError(Throwable t) {
            logger.warn("Exception while subscribing to the ReactiveSocket source", t);
            LoadBalancer loadBalancer = LoadBalancer.this;
            synchronized (loadBalancer) {
                LoadBalancer.this.pendingSockets = LoadBalancer.this.pendingSockets - 1;
                if (++this.errors < 5) {
                    LoadBalancer.this.activeFactories.add(this.factory);
                } else {
                    logger.warn("Exception count greater than 5, not re-adding factory {}", (Object)this.factory.toString());
                }
            }
        }

        public void onComplete() {
        }
    }

    private class FactoriesRefresher
    implements Subscriber<Collection<ReactiveSocketClient>> {
        private Subscription subscription;

        private FactoriesRefresher() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(Long.MAX_VALUE);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(Collection<ReactiveSocketClient> newFactories) {
            LoadBalancer loadBalancer = LoadBalancer.this;
            synchronized (loadBalancer) {
                HashSet<ReactiveSocketClient> current = new HashSet<ReactiveSocketClient>(LoadBalancer.this.activeFactories.size() + LoadBalancer.this.activeSockets.size());
                current.addAll(LoadBalancer.this.activeFactories);
                for (WeightedSocket socket : LoadBalancer.this.activeSockets) {
                    ReactiveSocketClient factory = socket.getFactory();
                    current.add(factory);
                }
                HashSet removed = new HashSet(current);
                removed.removeAll(newFactories);
                HashSet<ReactiveSocketClient> added = new HashSet<ReactiveSocketClient>(newFactories);
                added.removeAll(current);
                boolean changed = false;
                Iterator it0 = LoadBalancer.this.activeSockets.iterator();
                while (it0.hasNext()) {
                    WeightedSocket socket = (WeightedSocket)it0.next();
                    if (!removed.contains(socket.getFactory())) continue;
                    it0.remove();
                    try {
                        changed = true;
                        socket.close();
                    }
                    catch (Exception e) {
                        logger.warn("Exception while closing a ReactiveSocket", (Throwable)e);
                    }
                }
                Iterator it1 = LoadBalancer.this.activeFactories.iterator();
                while (it1.hasNext()) {
                    ReactiveSocketClient factory = (ReactiveSocketClient)it1.next();
                    if (!removed.contains(factory)) continue;
                    it1.remove();
                    changed = true;
                }
                LoadBalancer.this.activeFactories.addAll(added);
                if (changed && logger.isDebugEnabled()) {
                    StringBuilder msgBuilder = new StringBuilder();
                    msgBuilder.append("\nUpdated active factories (size: " + LoadBalancer.this.activeFactories.size() + ")\n");
                    for (ReactiveSocketClient f : LoadBalancer.this.activeFactories) {
                        msgBuilder.append(" + ").append(f).append('\n');
                    }
                    msgBuilder.append("Active sockets:\n");
                    for (WeightedSocket socket : LoadBalancer.this.activeSockets) {
                        msgBuilder.append(" + ").append(socket).append('\n');
                    }
                    logger.debug(msgBuilder.toString());
                }
            }
            LoadBalancer.this.refreshSockets();
        }

        public void onError(Throwable t) {
            logger.error("Error refreshing ReactiveSocket factories. They would no longer be refreshed.", t);
        }

        public void onComplete() {
            logger.warn("ReactiveSocket factories source completed. They would no longer be refreshed.");
        }

        void close() {
            this.subscription.cancel();
        }
    }
}

