/*
 * Decompiled with CFR 0.152.
 */
package io.split.engine.sse;

import io.split.engine.common.PushManager;
import io.split.engine.sse.PushStatusTracker;
import io.split.engine.sse.client.SSEClient;
import io.split.engine.sse.dtos.ControlNotification;
import io.split.engine.sse.dtos.ControlType;
import io.split.engine.sse.dtos.ErrorNotification;
import io.split.engine.sse.dtos.OccupancyNotification;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import split.com.google.common.base.Preconditions;
import split.com.google.common.collect.Maps;

public class PushStatusTrackerImp
implements PushStatusTracker {
    private static final Logger _log = LoggerFactory.getLogger(PushStatusTracker.class);
    private static final String CONTROL_PRI_CHANNEL = "control_pri";
    private static final String CONTROL_SEC_CHANNEL = "control_sec";
    private final AtomicBoolean _publishersOnline = new AtomicBoolean(true);
    private final AtomicReference<SSEClient.StatusMessage> _sseStatus = new AtomicReference<SSEClient.StatusMessage>(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
    private final AtomicReference<ControlType> _backendStatus = new AtomicReference<ControlType>(ControlType.STREAMING_RESUMED);
    private final LinkedBlockingQueue<PushManager.Status> _statusMessages;
    private final ConcurrentMap<String, Integer> regions = Maps.newConcurrentMap();
    private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

    public PushStatusTrackerImp(LinkedBlockingQueue<PushManager.Status> statusMessages, TelemetryRuntimeProducer telemetryRuntimeProducer) {
        this._statusMessages = statusMessages;
        this._telemetryRuntimeProducer = Preconditions.checkNotNull(telemetryRuntimeProducer);
    }

    private synchronized void reset() {
        this._publishersOnline.set(true);
        this._sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
        this._backendStatus.set(ControlType.STREAMING_RESUMED);
    }

    @Override
    public void handleSseStatus(SSEClient.StatusMessage newStatus) {
        _log.debug(String.format("Current status: %s. New status: %s", this._sseStatus.get().toString(), newStatus.toString()));
        switch (newStatus) {
            case FIRST_EVENT: {
                if (SSEClient.StatusMessage.CONNECTED.equals((Object)this._sseStatus.get())) {
                    this._statusMessages.offer(PushManager.Status.STREAMING_READY);
                    this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.CONNECTION_ESTABLISHED.getType(), 0L, System.currentTimeMillis()));
                }
            }
            case CONNECTED: {
                this._sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED);
                break;
            }
            case RETRYABLE_ERROR: {
                if (!this._sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.RETRYABLE_ERROR)) break;
                this._statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
                break;
            }
            case NONRETRYABLE_ERROR: {
                if (!this._sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.NONRETRYABLE_ERROR) && !this._sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.NONRETRYABLE_ERROR)) break;
                this._statusMessages.offer(PushManager.Status.STREAMING_OFF);
                break;
            }
            case FORCED_STOP: {
                if (!this._sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.FORCED_STOP) && !this._sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.FORCED_STOP) && !this._sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.FORCED_STOP)) break;
                this._statusMessages.offer(PushManager.Status.STREAMING_DOWN);
                break;
            }
            case INITIALIZATION_IN_PROGRESS: {
                this.reset();
            }
        }
    }

    @Override
    public void handleIncomingControlEvent(ControlNotification controlNotification) {
        _log.debug(String.format("handleIncomingOccupancyEvent: %s", new Object[]{controlNotification.getControlType()}));
        if (this._backendStatus.get().equals((Object)ControlType.STREAMING_DISABLED)) {
            return;
        }
        switch (controlNotification.getControlType()) {
            case STREAMING_RESUMED: {
                if (!this._backendStatus.compareAndSet(ControlType.STREAMING_PAUSED, ControlType.STREAMING_RESUMED) || !this._publishersOnline.get()) break;
                this._statusMessages.offer(PushManager.Status.STREAMING_READY);
                break;
            }
            case STREAMING_PAUSED: {
                this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_PAUSED.getValue(), System.currentTimeMillis()));
                if (!this._backendStatus.compareAndSet(ControlType.STREAMING_RESUMED, ControlType.STREAMING_PAUSED) || !this._publishersOnline.get()) break;
                this._statusMessages.offer(PushManager.Status.STREAMING_DOWN);
                break;
            }
            case STREAMING_DISABLED: {
                this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_DISABLED.getValue(), System.currentTimeMillis()));
                this._backendStatus.set(ControlType.STREAMING_DISABLED);
                this._statusMessages.offer(PushManager.Status.STREAMING_OFF);
            }
        }
    }

    @Override
    public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotification) {
        _log.debug(String.format("handleIncomingOccupancyEvent: publishers=%d", occupancyNotification.getMetrics().getPublishers()));
        int publishers = occupancyNotification.getMetrics().getPublishers();
        this.recordTelemetryOcuppancy(occupancyNotification, publishers);
        this.regions.put(occupancyNotification.getChannel(), publishers);
        boolean isPublishers = this.isPublishers();
        if (!isPublishers && this._publishersOnline.compareAndSet(true, false) && this._backendStatus.get().equals((Object)ControlType.STREAMING_RESUMED)) {
            this._statusMessages.offer(PushManager.Status.STREAMING_DOWN);
        } else if (isPublishers && this._publishersOnline.compareAndSet(false, true) && this._backendStatus.get().equals((Object)ControlType.STREAMING_RESUMED)) {
            this._statusMessages.offer(PushManager.Status.STREAMING_READY);
        }
    }

    @Override
    public void handleIncomingAblyError(ErrorNotification notification) {
        _log.debug(String.format("handleIncomingAblyError: %s", notification.getMessage()));
        this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.ABLY_ERROR.getType(), notification.getCode(), System.currentTimeMillis()));
        if (this._backendStatus.get().equals((Object)ControlType.STREAMING_DISABLED)) {
            return;
        }
        if (notification.getCode() >= 40140 && notification.getCode() <= 40149) {
            this._statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
            return;
        }
        if (notification.getCode() >= 40000 && notification.getCode() <= 49999) {
            this._statusMessages.offer(PushManager.Status.STREAMING_OFF);
        }
    }

    @Override
    public synchronized void forcePushDisable() {
        _log.debug("forcePushDisable");
        this._publishersOnline.set(false);
        this._sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
        this._backendStatus.set(ControlType.STREAMING_DISABLED);
        this._statusMessages.offer(PushManager.Status.STREAMING_OFF);
    }

    private boolean isPublishers() {
        for (Integer publisher : this.regions.values()) {
            if (publisher <= 0) continue;
            return true;
        }
        return false;
    }

    private void recordTelemetryOcuppancy(OccupancyNotification occupancyNotification, int publishers) {
        if (CONTROL_PRI_CHANNEL.equals(occupancyNotification.getChannel())) {
            this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.OCCUPANCY_PRI.getType(), publishers, System.currentTimeMillis()));
        } else if (CONTROL_SEC_CHANNEL.equals(occupancyNotification.getChannel())) {
            this._telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.OCCUPANCY_SEC.getType(), publishers, System.currentTimeMillis()));
        }
    }
}

