/*
 * Decompiled with CFR 0.152.
 */
package com.xxdb.streaming.client;

import com.xxdb.data.Vector;
import com.xxdb.streaming.client.AbstractClient;
import com.xxdb.streaming.client.IMessage;
import com.xxdb.streaming.client.MessageHandler;
import java.io.IOException;
import java.net.SocketException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;

public class ThreadedClient
extends AbstractClient {
    private HandlerLopper handlerLopper = null;

    public ThreadedClient() throws SocketException {
        this(8849);
    }

    public ThreadedClient(int subscribePort) throws SocketException {
        super(subscribePort);
    }

    @Override
    protected boolean doReconnect(AbstractClient.Site site) {
        if (this.handlerLopper == null) {
            throw new RuntimeException("Subscribe thread is not started");
        }
        this.handlerLopper.interrupt();
        try {
            this.subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1L, true, site.filter, site.allowExistTopic);
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            System.out.println(df.format(d) + " Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
            return true;
        }
        catch (Exception ex) {
            Date d = new Date();
            SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            System.out.println(df.format(d) + " Unable to subscribe table. Will try again after 1 seconds." + site.host + ":" + site.port + ":" + site.tableName);
            ex.printStackTrace();
            return false;
        }
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic) throws IOException {
        BlockingQueue<List<IMessage>> queue = this.subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, allowExistTopic);
        this.handlerLopper = new HandlerLopper(queue, handler);
        this.handlerLopper.start();
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, reconnect, null, false);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, Vector filter) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, false, filter, false);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, offset, false);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, -1L);
    }

    public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, actionName, handler, -1L, reconnect);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, -1L);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, -1L, reconnect);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, long offset) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, offset);
    }

    public void subscribe(String host, int port, String tableName, MessageHandler handler, long offset, boolean reconnect) throws IOException {
        this.subscribe(host, port, tableName, "javaStreamingApi", handler, offset, reconnect);
    }

    public void unsubscribe(String host, int port, String tableName, String actionName) throws IOException {
        this.unsubscribeInternal(host, port, tableName, actionName);
    }

    public void unsubscribe(String host, int port, String tableName) throws IOException {
        this.unsubscribeInternal(host, port, tableName);
    }

    class HandlerLopper
    extends Thread {
        BlockingQueue<List<IMessage>> queue;
        MessageHandler handler;

        HandlerLopper(BlockingQueue<List<IMessage>> queue, MessageHandler handler) {
            this.queue = queue;
            this.handler = handler;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    block3: while (true) {
                        List<IMessage> msgs = this.queue.take();
                        Iterator<IMessage> iterator = msgs.iterator();
                        while (true) {
                            if (!iterator.hasNext()) continue block3;
                            IMessage msg = iterator.next();
                            this.handler.doEvent(msg);
                        }
                        break;
                    }
                }
                catch (InterruptedException e) {
                    System.out.println("Handler thread stopped.");
                    continue;
                }
                break;
            }
        }
    }
}

