/*
 * Decompiled with CFR 0.152.
 */
package com.abiquo.apiclient.stream;

import com.abiquo.event.json.module.AbiquoModule;
import com.abiquo.event.model.Event;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Realm;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import org.atmosphere.wasync.ClientFactory;
import org.atmosphere.wasync.Function;
import org.atmosphere.wasync.Request;
import org.atmosphere.wasync.Socket;
import org.atmosphere.wasync.impl.AtmosphereClient;
import org.atmosphere.wasync.impl.AtmosphereRequest;
import org.atmosphere.wasync.impl.DefaultOptionsBuilder;
import rx.Observable;
import rx.Subscriber;

public class StreamClient
implements Closeable {
    private static final Logger LOG = Logger.getLogger("abiquo.stream");
    private final AsyncHttpClientConfig clientConfig;
    private final String endpoint;
    private final ObjectMapper json;
    private final AtomicInteger subscriberCount = new AtomicInteger(0);
    private Socket socket;
    private AsyncHttpClient asyncClient;

    private StreamClient(String endpoint, String username, String password, SSLConfiguration sslConfiguration) {
        this.endpoint = (String)Preconditions.checkNotNull((Object)endpoint, (Object)"endpoint cannot be null");
        Preconditions.checkNotNull((Object)username, (Object)"username cannot be null");
        Preconditions.checkNotNull((Object)password, (Object)"password cannot be null");
        AsyncHttpClientConfig.Builder config = new AsyncHttpClientConfig.Builder();
        config.setRequestTimeoutInMs(-1);
        config.setIdleConnectionTimeoutInMs(-1);
        if (sslConfiguration != null) {
            config.setHostnameVerifier(sslConfiguration.hostnameVerifier());
            config.setSSLContext(sslConfiguration.sslContext());
        }
        config.setRealm(new Realm.RealmBuilder().setPrincipal(username).setPassword(password).setUsePreemptiveAuth(true).setScheme(Realm.AuthScheme.BASIC).build());
        this.clientConfig = config.build();
        this.json = new ObjectMapper().setAnnotationIntrospector((AnnotationIntrospector)new AnnotationIntrospectorPair((AnnotationIntrospector)new JacksonAnnotationIntrospector(), (AnnotationIntrospector)new JaxbAnnotationIntrospector(TypeFactory.defaultInstance()))).registerModule((Module)new AbiquoModule());
    }

    private void connect() throws IOException {
        Preconditions.checkState((this.socket == null ? 1 : 0) != 0, (Object)"the client is already listening to events");
        Preconditions.checkState((this.asyncClient == null ? 1 : 0) != 0, (Object)"the client is already listening to events");
        LOG.fine("Connecting to " + this.endpoint + "...");
        AtmosphereClient client = (AtmosphereClient)ClientFactory.getDefault().newClient(AtmosphereClient.class);
        AtmosphereRequest request = ((AtmosphereRequest.AtmosphereRequestBuilder)((AtmosphereRequest.AtmosphereRequestBuilder)client.newRequestBuilder().method(Request.METHOD.GET)).uri(this.endpoint + "?Content-Type=application/json")).transport(Request.TRANSPORT.SSE).transport(Request.TRANSPORT.LONG_POLLING).build();
        this.asyncClient = new AsyncHttpClient(this.clientConfig);
        this.socket = client.create(((DefaultOptionsBuilder)client.newOptionsBuilder().runtime(this.asyncClient)).build());
        this.socket.open((Request)request);
        LOG.fine("Connected!");
    }

    public boolean isConnected() {
        return this.socket != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Observable<Event> newEventStream() throws IOException {
        Observable observable = Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Event>(){

            public void call(final Subscriber<? super Event> subscriber) {
                StreamClient.this.subscriberCount.incrementAndGet();
                StreamClient.this.socket.on(org.atmosphere.wasync.Event.MESSAGE, (Function)new Function<String>(){

                    public void on(String rawEvent) {
                        if (subscriber.isUnsubscribed()) {
                            StreamClient.this.subscriberGone((Subscriber<? super Event>)subscriber);
                        } else {
                            try {
                                Event event = (Event)StreamClient.this.json.readValue(rawEvent, Event.class);
                                subscriber.onNext((Object)event);
                            }
                            catch (IOException ex) {
                                subscriber.onError((Throwable)new RuntimeException("Error parsing event: " + rawEvent, ex));
                            }
                        }
                    }
                }).on(org.atmosphere.wasync.Event.ERROR, (Function)new Function<String>(){

                    public void on(String rawEvent) {
                        if (subscriber.isUnsubscribed()) {
                            StreamClient.this.subscriberGone((Subscriber<? super Event>)subscriber);
                        } else {
                            subscriber.onError((Throwable)new RuntimeException("Unexpected error: " + rawEvent));
                        }
                    }
                }).on(org.atmosphere.wasync.Event.CLOSE, (Function)new Function<String>(){

                    public void on(String rawEvent) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onCompleted();
                        }
                    }
                });
            }
        });
        StreamClient streamClient = this;
        synchronized (streamClient) {
            if (!this.isConnected()) {
                this.connect();
            }
        }
        return observable;
    }

    private void subscriberGone(Subscriber<? super Event> subscriber) {
        if (this.subscriberCount.decrementAndGet() == 0) {
            LOG.fine("There are no subscribers left. Will disconnect.");
            try {
                this.close();
            }
            catch (IOException ex) {
                throw Throwables.propagate((Throwable)ex);
            }
        }
    }

    @Override
    public synchronized void close() throws IOException {
        LOG.fine("Disconnecting...");
        if (this.asyncClient != null) {
            this.asyncClient.closeAsynchronously();
        }
        if (this.socket != null) {
            this.socket.close();
        }
        this.asyncClient = null;
        this.socket = null;
        LOG.fine("Disconnected!");
    }

    public static Builder builder() {
        return new Builder();
    }

    public static interface SSLConfiguration {
        public SSLContext sslContext();

        public HostnameVerifier hostnameVerifier();
    }

    public static class Builder {
        private String endpoint;
        private String username;
        private String password;
        private SSLConfiguration sslConfiguration;

        public Builder endpoint(String endpoint) {
            this.endpoint = endpoint;
            return this;
        }

        public Builder credentials(String username, String password) {
            this.username = username;
            this.password = password;
            return this;
        }

        public Builder sslConfiguration(SSLConfiguration sslConfiguration) {
            this.sslConfiguration = sslConfiguration;
            return this;
        }

        public StreamClient build() {
            return new StreamClient(this.endpoint, this.username, this.password, this.sslConfiguration);
        }
    }
}

