/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.client.filter;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.stat.Ewma;
import io.reactivesocket.util.Clock;
import io.reactivesocket.util.ReactiveSocketProxy;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FailureAwareClient
implements ReactiveSocketClient {
    private static final double EPSILON = 1.0E-4;
    private final ReactiveSocketClient delegate;
    private final long tau;
    private long stamp;
    private final Ewma errorPercentage;

    public FailureAwareClient(ReactiveSocketClient delegate, long halfLife, TimeUnit unit) {
        this.delegate = delegate;
        this.tau = Clock.unit().convert((long)((double)halfLife / Math.log(2.0)), unit);
        this.stamp = Clock.now();
        this.errorPercentage = new Ewma(halfLife, unit, 1.0);
    }

    public FailureAwareClient(ReactiveSocketClient delegate) {
        this(delegate, 5L, TimeUnit.SECONDS);
    }

    public Mono<? extends ReactiveSocket> connect() {
        return this.delegate.connect().doOnNext(o -> this.updateErrorPercentage(1.0)).doOnError(t -> this.updateErrorPercentage(0.0)).map(socket -> new ReactiveSocketProxy((ReactiveSocket)socket){

            public Mono<Void> fireAndForget(Payload payload) {
                return this.source.fireAndForget(payload).doOnError(t -> FailureAwareClient.this.errorPercentage.insert(0.0)).doOnSuccess(v -> FailureAwareClient.this.updateErrorPercentage(1.0));
            }

            public Mono<Payload> requestResponse(Payload payload) {
                return this.source.requestResponse(payload).doOnError(t -> FailureAwareClient.this.errorPercentage.insert(0.0)).doOnSuccess(p -> FailureAwareClient.this.updateErrorPercentage(1.0));
            }

            public Flux<Payload> requestStream(Payload payload) {
                return this.source.requestStream(payload).doOnError(th -> FailureAwareClient.this.errorPercentage.insert(0.0)).doOnComplete(() -> FailureAwareClient.this.updateErrorPercentage(1.0));
            }

            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return this.source.requestChannel(payloads).doOnError(th -> FailureAwareClient.this.errorPercentage.insert(0.0)).doOnComplete(() -> FailureAwareClient.this.updateErrorPercentage(1.0));
            }

            public Mono<Void> metadataPush(Payload payload) {
                return this.source.metadataPush(payload).doOnError(t -> FailureAwareClient.this.errorPercentage.insert(0.0)).doOnSuccess(v -> FailureAwareClient.this.updateErrorPercentage(1.0));
            }

            public double availability() {
                if (Clock.now() - FailureAwareClient.this.stamp > FailureAwareClient.this.tau) {
                    FailureAwareClient.this.updateErrorPercentage(1.0);
                }
                return this.source.availability() * FailureAwareClient.this.errorPercentage.value();
            }
        });
    }

    public double availability() {
        double e = this.errorPercentage.value();
        if (Clock.now() - this.stamp > this.tau) {
            double a = Math.min(1.0, e + 0.5);
            this.errorPercentage.reset(a);
        }
        if (e < 1.0E-4) {
            e = 0.0;
        } else if (0.9999 < e) {
            e = 1.0;
        }
        return e;
    }

    private synchronized void updateErrorPercentage(double value) {
        this.errorPercentage.insert(value);
        this.stamp = Clock.now();
    }

    public String toString() {
        return "FailureAwareClient(" + this.errorPercentage.value() + ")->" + this.delegate;
    }
}

