/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.client.filter;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.util.ReactiveSocketProxy;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class ReactiveSockets {
    private ReactiveSockets() {
    }

    public static Function<ReactiveSocket, ReactiveSocket> timeout(final Duration timeout) {
        return source -> new ReactiveSocketProxy((ReactiveSocket)source){

            public Mono<Void> fireAndForget(Payload payload) {
                return this.source.fireAndForget(payload).timeout(timeout);
            }

            public Mono<Payload> requestResponse(Payload payload) {
                return this.source.requestResponse(payload).timeout(timeout);
            }

            public Flux<Payload> requestStream(Payload payload) {
                return this.source.requestStream(payload).timeout(timeout);
            }

            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return this.source.requestChannel(payloads).timeout(timeout);
            }

            public Mono<Void> metadataPush(Payload payload) {
                return this.source.metadataPush(payload).timeout(timeout);
            }
        };
    }

    public static Function<ReactiveSocket, ReactiveSocket> safeClose() {
        return source -> new ReactiveSocketProxy((ReactiveSocket)source){
            final AtomicInteger count = new AtomicInteger();
            final AtomicBoolean closed = new AtomicBoolean();

            public Mono<Void> fireAndForget(Payload payload) {
                return this.source.fireAndForget(payload).doOnSubscribe(s -> this.count.incrementAndGet()).doFinally(signalType -> {
                    if (this.count.decrementAndGet() == 0 && this.closed.get()) {
                        this.source.close().subscribe();
                    }
                });
            }

            public Mono<Payload> requestResponse(Payload payload) {
                return this.source.requestResponse(payload).doOnSubscribe(s -> this.count.incrementAndGet()).doFinally(signalType -> {
                    if (this.count.decrementAndGet() == 0 && this.closed.get()) {
                        this.source.close().subscribe();
                    }
                });
            }

            public Flux<Payload> requestStream(Payload payload) {
                return this.source.requestStream(payload).doOnSubscribe(s -> this.count.incrementAndGet()).doFinally(signalType -> {
                    if (this.count.decrementAndGet() == 0 && this.closed.get()) {
                        this.source.close().subscribe();
                    }
                });
            }

            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return this.source.requestChannel(payloads).doOnSubscribe(s -> this.count.incrementAndGet()).doFinally(signalType -> {
                    if (this.count.decrementAndGet() == 0 && this.closed.get()) {
                        this.source.close().subscribe();
                    }
                });
            }

            public Mono<Void> metadataPush(Payload payload) {
                return this.source.metadataPush(payload).doOnSubscribe(s -> this.count.incrementAndGet()).doFinally(signalType -> {
                    if (this.count.decrementAndGet() == 0 && this.closed.get()) {
                        this.source.close().subscribe();
                    }
                });
            }

            public Mono<Void> close() {
                return Mono.defer(() -> {
                    if (this.closed.compareAndSet(false, true)) {
                        if (this.count.get() == 0) {
                            return this.source.close();
                        }
                        return this.source.onClose();
                    }
                    return this.source.onClose();
                });
            }
        };
    }
}

