/*
 * Decompiled with CFR 0.152.
 */
package com.eventstore.dbclient;

import com.eventstore.dbclient.ResolvedEvent;
import com.eventstore.dbclient.StreamNotFoundException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class ReadSubscription
implements Subscription {
    private final Subscriber<? super ResolvedEvent> subscriber;
    private ClientCallStreamObserver<?> streamObserver;
    private final AtomicLong requested = new AtomicLong(0L);
    private final AtomicBoolean terminated = new AtomicBoolean(false);
    private final Lock lock = new ReentrantLock();
    private final Condition hasRequested = this.lock.newCondition();

    ReadSubscription(Subscriber<? super ResolvedEvent> subscriber) {
        this.subscriber = subscriber;
    }

    public void setStreamObserver(ClientCallStreamObserver<?> streamObserver) {
        this.streamObserver = streamObserver;
    }

    public void onStreamNotFound() {
        this.subscriber.onError((Throwable)new StreamNotFoundException());
    }

    public void onError(Throwable error) {
        StatusRuntimeException statusRuntimeException;
        if (error instanceof StatusRuntimeException && (statusRuntimeException = (StatusRuntimeException)error).getStatus().getCode() == Status.Code.CANCELLED) {
            return;
        }
        this.cancel();
        this.subscriber.onError(error);
    }

    public void onNext(ResolvedEvent event) {
        this.lock.lock();
        while (this.requested.get() == 0L && !this.terminated.get()) {
            this.hasRequested.awaitUninterruptibly();
        }
        if (!this.terminated.get()) {
            this.subscriber.onNext((Object)event);
            this.requested.decrementAndGet();
        }
        this.lock.unlock();
    }

    public void onCompleted() {
        if (!this.terminated.get()) {
            this.subscriber.onComplete();
        }
        this.terminated.compareAndSet(false, true);
    }

    public void request(long n) {
        if (n <= 0L) {
            this.subscriber.onError((Throwable)new IllegalArgumentException("non-positive subscription request: " + n));
        }
        this.lock.lock();
        this.requested.updateAndGet(current -> current + n);
        this.hasRequested.signal();
        this.lock.unlock();
    }

    public void cancel() {
        if (this.terminated.compareAndSet(false, true) && this.streamObserver != null) {
            this.streamObserver.cancel("Stream has been cancelled manually.", null);
        }
    }
}

