/*
 * Decompiled with CFR 0.152.
 */
package com.abiquo.apiclient.stream;

import com.abiquo.tracing.model.Trace;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Realm;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Logger;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import org.atmosphere.wasync.ClientFactory;
import org.atmosphere.wasync.Event;
import org.atmosphere.wasync.Function;
import org.atmosphere.wasync.Request;
import org.atmosphere.wasync.Socket;
import org.atmosphere.wasync.impl.AtmosphereClient;
import org.atmosphere.wasync.impl.AtmosphereRequest;
import org.atmosphere.wasync.impl.DefaultOptionsBuilder;

public class StreamClient
implements Closeable {
    private static final Logger LOG = Logger.getLogger("abiquo.stream");
    private final String endpoint;
    private final String username;
    private final String password;
    private final SSLConfiguration sslConfiguration;
    private final Consumer<Trace> consumer;
    private boolean reconnect = false;
    private int reconnectAttempts = 0;
    private int pauseBeforeReconnectInSeconds = 0;
    private final Runnable beforeReconnection;
    private final Runnable afterReconnection;
    private final ObjectMapper json;
    private AsyncHttpClient asyncClient;
    private Socket socket;
    private AtomicBoolean manuallyClosing = new AtomicBoolean(false);
    private Executor reconnectionExecutor = Executors.newCachedThreadPool();

    private StreamClient(String endpoint, String username, String password, SSLConfiguration sslConfiguration, Consumer<Trace> consumer, Runnable beforeReconnection, Runnable afterReconnection, boolean reconnect, int reconnectAttempts, int pauseBeforeReconnectInSeconds) {
        this.endpoint = (String)Preconditions.checkNotNull((Object)endpoint, (Object)"endpoint cannot be null");
        this.username = (String)Preconditions.checkNotNull((Object)username, (Object)"username cannot be null");
        this.password = (String)Preconditions.checkNotNull((Object)password, (Object)"password cannot be null");
        this.consumer = (Consumer)Preconditions.checkNotNull(consumer, (Object)"consumer cannot be null");
        this.sslConfiguration = sslConfiguration;
        this.reconnect = (Boolean)Preconditions.checkNotNull((Object)reconnect);
        if (this.reconnect) {
            this.reconnectAttempts = (Integer)Preconditions.checkNotNull((Object)reconnectAttempts, (Object)"reconnect attempts cannot be null if reconnection has been enabled");
            this.pauseBeforeReconnectInSeconds = (Integer)Preconditions.checkNotNull((Object)pauseBeforeReconnectInSeconds, (Object)"pause seconds before reconnect cannot be null if reconnection has been enabled");
            this.beforeReconnection = beforeReconnection;
            this.afterReconnection = afterReconnection;
        } else {
            this.beforeReconnection = null;
            this.afterReconnection = null;
        }
        this.json = new ObjectMapper();
    }

    public void connect() throws IOException {
        Preconditions.checkState((this.socket == null ? 1 : 0) != 0, (Object)"the client is already listening to events");
        Preconditions.checkState((this.asyncClient == null ? 1 : 0) != 0, (Object)"the client is already listening to events");
        LOG.fine("Connecting to " + this.endpoint + "...");
        AtmosphereClient client = (AtmosphereClient)ClientFactory.getDefault().newClient(AtmosphereClient.class);
        AtmosphereRequest request = ((AtmosphereRequest.AtmosphereRequestBuilder)((AtmosphereRequest.AtmosphereRequestBuilder)client.newRequestBuilder().method(Request.METHOD.GET)).uri(this.endpoint + "?Content-Type=application/json")).transport(Request.TRANSPORT.SSE).transport(Request.TRANSPORT.LONG_POLLING).build();
        AsyncHttpClientConfig.Builder config = new AsyncHttpClientConfig.Builder();
        config.setRequestTimeoutInMs(-1);
        config.setIdleConnectionTimeoutInMs(-1);
        if (this.sslConfiguration != null) {
            config.setHostnameVerifier(this.sslConfiguration.hostnameVerifier());
            config.setSSLContext(this.sslConfiguration.sslContext());
        }
        config.setRealm(new Realm.RealmBuilder().setPrincipal(this.username).setPassword(this.password).setUsePreemptiveAuth(true).setScheme(Realm.AuthScheme.BASIC).build());
        this.asyncClient = new AsyncHttpClient(config.build());
        this.socket = client.create(((DefaultOptionsBuilder)client.newOptionsBuilder().runtime(this.asyncClient)).build());
        this.socket.open((Request)request);
        this.configure();
        LOG.fine("Connected!");
    }

    private void configure() {
        this.socket.on(Event.MESSAGE, (Function)new Function<String>(){

            public void on(String rawEvent) {
                try {
                    Trace event = (Trace)StreamClient.this.json.readValue(rawEvent, Trace.class);
                    StreamClient.this.consumer.accept(event);
                }
                catch (IOException ex) {
                    LOG.warning(String.format("Unexpected error processing event: %s\n%s", ex.getMessage(), Throwables.getStackTraceAsString((Throwable)ex)));
                }
            }
        }).on(Event.CLOSE, (Function)new Function<String>(){

            public void on(String rawEvent) {
                if (!StreamClient.this.manuallyClosing.get() && StreamClient.this.reconnect) {
                    StreamClient.this.reconnectionExecutor.execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                if (StreamClient.this.beforeReconnection != null) {
                                    StreamClient.this.beforeReconnection.run();
                                }
                                LOG.warning("Connection lost, going to reconnect");
                                StreamClient.this.reconnect();
                                LOG.fine("Reconnected");
                                if (StreamClient.this.afterReconnection != null) {
                                    StreamClient.this.afterReconnection.run();
                                }
                            }
                            catch (IOException e) {
                                throw Throwables.propagate((Throwable)e);
                            }
                        }
                    });
                }
            }
        });
    }

    private void reconnect() throws IOException {
        for (int retry = 0; retry < this.reconnectAttempts; ++retry) {
            try {
                this.closeConnection();
                this.connect();
                return;
            }
            catch (IOException e) {
                try {
                    Thread.sleep(this.pauseBeforeReconnectInSeconds * 1000);
                    continue;
                }
                catch (InterruptedException e1) {
                    throw Throwables.propagate((Throwable)e1);
                }
            }
        }
        LOG.severe("Reconnection failed");
        this.closeConnection();
    }

    @Override
    public synchronized void close() throws IOException {
        try {
            this.manuallyClosing.set(true);
            this.closeConnection();
        }
        finally {
            this.manuallyClosing.set(false);
        }
    }

    private synchronized void closeConnection() throws IOException {
        LOG.fine("Disconnecting...");
        if (this.asyncClient != null) {
            this.asyncClient.close();
        }
        if (this.socket != null) {
            this.socket.close();
        }
        this.asyncClient = null;
        this.socket = null;
        LOG.fine("Disconnected!");
    }

    public static Builder builder() {
        return new Builder();
    }

    public static interface SSLConfiguration {
        public SSLContext sslContext();

        public HostnameVerifier hostnameVerifier();
    }

    public static class Builder {
        private String endpoint;
        private String username;
        private String password;
        private SSLConfiguration sslConfiguration;
        private Consumer<Trace> consumer;
        private boolean reconnect = false;
        private int reconnectAttempts = 10;
        private int pauseBeforeReconnectInSeconds = 5;
        private Runnable beforeReconnect;
        private Runnable afterReconnect;

        public Builder endpoint(String endpoint) {
            this.endpoint = endpoint;
            return this;
        }

        public Builder credentials(String username, String password) {
            this.username = username;
            this.password = password;
            return this;
        }

        public Builder sslConfiguration(SSLConfiguration sslConfiguration) {
            this.sslConfiguration = sslConfiguration;
            return this;
        }

        public Builder consumer(Consumer<Trace> consumer) {
            this.consumer = consumer;
            return this;
        }

        public Builder reconnect(boolean reconnect) {
            this.reconnect = reconnect;
            return this;
        }

        public Builder reconnectAttempts(int attempts) {
            this.reconnectAttempts = attempts;
            return this;
        }

        public Builder pauseBeforeReconnectInSeconds(int seconds) {
            this.pauseBeforeReconnectInSeconds = seconds;
            return this;
        }

        public Builder beforeReconnect(Runnable beforeReconnect) {
            this.beforeReconnect = beforeReconnect;
            return this;
        }

        public Builder afterReconnect(Runnable afterReconnect) {
            this.afterReconnect = afterReconnect;
            return this;
        }

        public StreamClient build() {
            return new StreamClient(this.endpoint, this.username, this.password, this.sslConfiguration, this.consumer, this.beforeReconnect, this.afterReconnect, this.reconnect, this.reconnectAttempts, this.pauseBeforeReconnectInSeconds);
        }
    }
}

