/*
 * Decompiled with CFR 0.152.
 */
package org.noear.socketd.transport.stream.impl;

import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import org.noear.socketd.exception.SocketdTimeoutException;
import org.noear.socketd.transport.stream.Stream;
import org.noear.socketd.transport.stream.StreamInternal;
import org.noear.socketd.transport.stream.StreamManger;
import org.noear.socketd.utils.RunUtils;
import org.noear.socketd.utils.TriConsumer;

public abstract class StreamBase<T extends Stream>
implements StreamInternal<T> {
    private ScheduledFuture<?> insuranceFuture;
    private final String sid;
    private final int demands;
    private long timeout;
    private Consumer<Throwable> doOnError;
    private TriConsumer<Boolean, Integer, Integer> doOnProgress;

    public StreamBase(String sid, int demands, long timeout) {
        this.sid = sid;
        this.demands = demands;
        this.timeout = timeout;
    }

    @Override
    public String sid() {
        return this.sid;
    }

    @Override
    public int demands() {
        return this.demands;
    }

    public T timeout(long timeout) {
        this.timeout = timeout;
        return (T)this;
    }

    @Override
    public long timeout() {
        return this.timeout;
    }

    @Override
    public void insuranceStart(StreamManger streamManger, long streamTimeout) {
        if (this.insuranceFuture != null) {
            return;
        }
        this.insuranceFuture = RunUtils.delay(() -> {
            streamManger.removeStream(this.sid);
            this.onError(new SocketdTimeoutException("The stream response timeout, sid=" + this.sid));
        }, streamTimeout);
    }

    @Override
    public void insuranceCancel() {
        if (this.insuranceFuture != null) {
            this.insuranceFuture.cancel(false);
        }
    }

    @Override
    public void onError(Throwable error) {
        if (this.doOnError != null) {
            this.doOnError.accept(error);
        }
    }

    @Override
    public void onProgress(boolean isSend, int val, int max) {
        if (this.doOnProgress != null) {
            this.doOnProgress.accept(isSend, val, max);
        }
    }

    @Override
    public T thenError(Consumer<Throwable> onError) {
        this.doOnError = onError;
        return (T)this;
    }

    @Override
    public T thenProgress(TriConsumer<Boolean, Integer, Integer> onProgress) {
        this.doOnProgress = onProgress;
        return (T)this;
    }
}

