/*
 * Decompiled with CFR 0.152.
 */
package com.robrua.easyjava.net.replication.multipaxos;

import com.robrua.easyjava.io.output.Logger;
import com.robrua.easyjava.net.replication.multipaxos.PaxosCommand;
import com.robrua.easyjava.net.replication.multipaxos.message.AcceptDenial;
import com.robrua.easyjava.net.replication.multipaxos.message.AcceptRequest;
import com.robrua.easyjava.net.replication.multipaxos.message.Accepted;
import com.robrua.easyjava.net.replication.multipaxos.message.LearnedCommand;
import com.robrua.easyjava.net.replication.multipaxos.message.PaxosMessage;
import com.robrua.easyjava.net.replication.multipaxos.message.Promise;
import com.robrua.easyjava.net.replication.multipaxos.message.Proposal;
import com.robrua.easyjava.net.replication.multipaxos.message.ProposeDenial;
import com.robrua.easyjava.net.udp.UDPListener;
import com.robrua.easyjava.net.udp.UDPSender;
import com.robrua.easyjava.type.Pair;
import java.io.IOException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public abstract class PaxosNode<E extends PaxosCommand> {
    private final Node<E> node;
    public final int nodeID;

    public PaxosNode(int nodeID, int port, Map<Integer, String> nodeHostPorts) throws NumberFormatException, UnknownHostException, SocketException {
        this(nodeID, port, nodeHostPorts, Logger.nullLogger());
    }

    public PaxosNode(int nodeID, int port, Map<Integer, String> nodeHostPorts, Logger logger) throws NumberFormatException, UnknownHostException, SocketException {
        this.nodeID = nodeID;
        if (logger == null) {
            logger = Logger.nullLogger();
        }
        this.node = new Node(this, port, nodeHostPorts, logger);
        new Thread(this.node).start();
    }

    protected abstract void handleCommand(E var1);

    public boolean propose(E command) {
        return this.node.propose(command);
    }

    public void stop() {
        this.node.stop();
    }

    private static final class Node<E extends PaxosCommand>
    implements Runnable {
        private static final long QUORUM_TIMEOUT_MILLIS = 100L;
        private static final long UPDATE_MILLIS = 100L;
        private long currentCommandSlot;
        private final Map<Long, Long> highestProposalNumbers;
        private final Map<Long, E> learnedCommands;
        private final Logger logger;
        private final Map<Long, UDPSender<PaxosMessage>> nodes;
        private final PaxosNode<E> owner;
        private final PriorityQueue<Pair<Long, E>> pendingCommands;
        private final Set<Long> pendingCommandSlots;
        private final BlockingQueue<PaxosMessage> proposeMessages;
        private boolean stopped;
        private final UDPListener<PaxosMessage> UDP;

        public Node(PaxosNode<E> owner, int port, Map<Integer, String> nodeHostPorts, Logger logger) throws NumberFormatException, UnknownHostException, SocketException {
            this.owner = owner;
            this.logger = logger;
            this.UDP = new UDPListener<PaxosMessage>(port, PaxosMessage.class);
            this.proposeMessages = new LinkedBlockingQueue<PaxosMessage>();
            this.highestProposalNumbers = new HashMap<Long, Long>();
            this.pendingCommands = new PriorityQueue();
            this.learnedCommands = new HashMap<Long, E>();
            this.pendingCommandSlots = new HashSet<Long>();
            this.stopped = false;
            this.currentCommandSlot = 1L;
            this.nodes = new HashMap<Long, UDPSender<PaxosMessage>>();
            for (Integer ID : nodeHostPorts.keySet()) {
                String[] parts = nodeHostPorts.get(ID).split(":");
                this.nodes.put(new Long(ID.intValue()), new UDPSender(parts[0], Integer.parseInt(parts[1])));
            }
        }

        private Pair<Set<PaxosMessage>, Pair<Quorum, AcceptState>> awaitQuorum(boolean forProposal, E command, long commandSlot, long proposalNumber) {
            HashSet<PaxosMessage> responses = new HashSet<PaxosMessage>();
            Quorum quorum = Quorum.INSUFFICIENT;
            while (quorum == Quorum.INSUFFICIENT) {
                PaxosMessage msg;
                try {
                    msg = this.proposeMessages.poll(100L, TimeUnit.MILLISECONDS);
                    if (msg == null) {
                        this.logger.verbose("Quorum timed out");
                        return new Pair<Set<PaxosMessage>, Pair<Quorum, AcceptState>>(responses, new Pair<Quorum, AcceptState>(Quorum.NO, AcceptState.NONE));
                    }
                }
                catch (InterruptedException e) {
                    this.logger.warn(e);
                    continue;
                }
                if (msg instanceof LearnedCommand && msg.commandSlot == commandSlot) {
                    LearnedCommand message = (LearnedCommand)msg;
                    this.logger.verbose("Reprocessing " + message);
                    if (message.command.equals(command)) {
                        return new Pair<Set<PaxosMessage>, Pair<Quorum, AcceptState>>(responses, new Pair<Quorum, AcceptState>(Quorum.NO, AcceptState.PROPOSED));
                    }
                    return new Pair<Set<PaxosMessage>, Pair<Quorum, AcceptState>>(responses, new Pair<Quorum, AcceptState>(Quorum.NO, AcceptState.OTHER));
                }
                if ((!forProposal || !(msg instanceof PaxosMessage.ProposeResponse)) && (forProposal || !(msg instanceof PaxosMessage.AcceptResponse)) || msg.commandSlot != commandSlot || msg.proposalNumber != proposalNumber) continue;
                this.logger.verbose("Processing " + msg);
                responses.add(msg);
                quorum = this.isQuorum(responses);
            }
            return new Pair<Set<PaxosMessage>, Pair<Quorum, AcceptState>>(responses, new Pair<Quorum, AcceptState>(quorum, AcceptState.NONE));
        }

        private void broadcast(PaxosMessage message) {
            this.nodes.values().forEach(sender -> {
                try {
                    sender.send(message);
                }
                catch (IOException e) {
                    this.logger.error(e);
                }
            });
        }

        private long getHighestDenialNumber(boolean forProposal, Set<PaxosMessage> responses, long proposalNumber) {
            long highest = responses.parallelStream().filter(response -> forProposal && response instanceof ProposeDenial || !forProposal && response instanceof AcceptDenial).map(response -> {
                if (response instanceof ProposeDenial) {
                    return ((ProposeDenial)response).highestSeenProposalNumber;
                }
                return ((AcceptDenial)response).highestSeenProposalNumber;
            }).max(Comparator.naturalOrder()).get();
            return highest;
        }

        private Quorum isQuorum(Set<PaxosMessage> responses) {
            int quorumSize = this.nodes.size() / 2 + 1;
            boolean isEven = this.nodes.size() % 2 == 0;
            int numDenials = 0;
            int numPromises = 0;
            for (PaxosMessage response : responses) {
                if (response instanceof PaxosMessage.Denial) {
                    if ((!isEven || ++numDenials < quorumSize - 1) && (isEven || numDenials < quorumSize)) continue;
                    return Quorum.NO;
                }
                if (!(response instanceof PaxosMessage.Acceptance) || ++numPromises < quorumSize) continue;
                return Quorum.YES;
            }
            return Quorum.INSUFFICIENT;
        }

        private long nextProposalNumber(long prevProposalNumber) {
            long otherID = prevProposalNumber % (long)this.nodes.size();
            if (otherID < (long)this.owner.nodeID) {
                return prevProposalNumber + (long)this.owner.nodeID - otherID;
            }
            return prevProposalNumber + (long)this.nodes.size() + (long)this.owner.nodeID - otherID;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processCommand(E command, long commandSlot) {
            if (this.learnedCommands.get(commandSlot) != null) {
                return;
            }
            if (commandSlot == 1L || this.learnedCommands.get(commandSlot - 1L) != null) {
                this.learnedCommands.put(commandSlot, command);
                Map<Long, Long> map = this.highestProposalNumbers;
                synchronized (map) {
                    this.highestProposalNumbers.remove(commandSlot);
                }
                this.logger.verbose("Committing command " + command + " in slot " + commandSlot);
                this.owner.handleCommand(command);
                ++this.currentCommandSlot;
                if (!this.pendingCommands.isEmpty() && commandSlot == this.pendingCommands.peek().getLeft() - 1L) {
                    Pair<Long, E> newCommand = this.pendingCommands.poll();
                    this.pendingCommandSlots.remove(newCommand.getLeft());
                    this.processCommand((PaxosCommand)newCommand.getRight(), newCommand.getLeft());
                }
            } else if (!this.pendingCommandSlots.contains(commandSlot)) {
                this.pendingCommands.offer(new Pair<Long, E>(commandSlot, command));
                this.pendingCommandSlots.add(commandSlot);
                new Thread(new SlotChecker(commandSlot - 1L)).start();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private PaxosMessage processMessage(PaxosMessage msg) {
            if (msg instanceof LearnedCommand) {
                LearnedCommand message = (LearnedCommand)msg;
                this.logger.verbose("Processing " + message);
                this.processCommand(message.command, message.commandSlot);
                this.proposeMessages.offer(message);
                return null;
            }
            if (msg instanceof Proposal) {
                Proposal message = (Proposal)msg;
                this.logger.verbose("Processing " + message);
                if (message.commandSlot < this.currentCommandSlot) {
                    LearnedCommand<PaxosCommand> fillIn = new LearnedCommand<PaxosCommand>(this.owner.nodeID, message.commandSlot, -1L, (PaxosCommand)this.learnedCommands.get(message.commandSlot));
                    this.logger.verbose("Sending fill-in " + fillIn);
                    return fillIn;
                }
                if (message.proposalNumber < 0L) {
                    return null;
                }
                Map<Long, Long> fillIn = this.highestProposalNumbers;
                synchronized (fillIn) {
                    Long highest = this.highestProposalNumbers.get(message.commandSlot);
                    if (highest == null) {
                        this.highestProposalNumbers.put(message.commandSlot, message.proposalNumber);
                        Promise<Object> promise = new Promise<Object>(this.owner.nodeID, message.commandSlot, message.proposalNumber, null, -1L);
                        this.logger.verbose("Sending " + promise);
                        return promise;
                    }
                    if (message.proposalNumber > highest) {
                        this.highestProposalNumbers.put(message.commandSlot, message.proposalNumber);
                        Promise<Object> promise = new Promise<Object>(this.owner.nodeID, message.commandSlot, message.proposalNumber, null, highest);
                        this.logger.verbose("Sending " + promise);
                        return promise;
                    }
                    ProposeDenial denial = new ProposeDenial(this.owner.nodeID, message.commandSlot, message.proposalNumber, highest);
                    this.logger.verbose("Sending " + denial);
                    return denial;
                }
            }
            if (msg instanceof AcceptRequest) {
                AcceptRequest message = (AcceptRequest)msg;
                this.logger.verbose("Processing " + message);
                if (message.commandSlot < this.currentCommandSlot) {
                    LearnedCommand<PaxosCommand> fillIn = new LearnedCommand<PaxosCommand>(this.owner.nodeID, message.commandSlot, -1L, (PaxosCommand)this.learnedCommands.get(message.commandSlot));
                    this.logger.verbose("Sending " + fillIn);
                    return fillIn;
                }
                Long highest = this.highestProposalNumbers.get(message.commandSlot);
                if (highest == null || highest <= message.proposalNumber) {
                    Accepted accepted = new Accepted(this.owner.nodeID, message.commandSlot, message.proposalNumber);
                    this.logger.verbose("Sending " + accepted);
                    return accepted;
                }
                AcceptDenial denial = new AcceptDenial(this.owner.nodeID, message.commandSlot, message.proposalNumber, highest);
                this.logger.verbose("Sending " + denial);
                return denial;
            }
            this.proposeMessages.offer(msg);
            return null;
        }

        public boolean propose(E command) {
            long commandSlot = this.currentCommandSlot;
            Pair<AcceptState, Long> result = new Pair<AcceptState, Long>(AcceptState.NONE, -1L);
            while (result.getLeft() == AcceptState.NONE) {
                result = this.propose(command, commandSlot, this.nextProposalNumber(result.getRight()));
            }
            if (result.getLeft() == AcceptState.PROPOSED) {
                this.logger.verbose("Proposed " + command + " successfully");
                return true;
            }
            this.logger.verbose("Proposed " + command + " unsuccessfully");
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private synchronized Pair<AcceptState, Long> propose(E command, long commandSlot, long proposalNumber) {
            Proposal proposal = new Proposal(this.owner.nodeID, commandSlot, proposalNumber);
            this.logger.verbose("Broadcasting " + proposal);
            this.broadcast(proposal);
            Pair<Set<PaxosMessage>, Pair<Quorum, AcceptState>> result = this.awaitQuorum(true, command, commandSlot, proposalNumber);
            this.logger.verbose("Quorum reached: " + (Object)((Object)result.getRight().getLeft()));
            if (result.getRight().getRight() != AcceptState.NONE) {
                return new Pair<AcceptState, Long>(result.getRight().getRight(), -1L);
            }
            if (result.getRight().getLeft() == Quorum.NO) {
                long highestDenialNumber = this.getHighestDenialNumber(true, result.getLeft(), proposalNumber);
                return new Pair<AcceptState, Long>(AcceptState.NONE, highestDenialNumber);
            }
            long highest = Long.MIN_VALUE;
            PaxosCommand requestValue = null;
            for (PaxosMessage response : result.getLeft()) {
                if (!(response instanceof Promise)) continue;
                Promise promise = (Promise)response;
                if (promise.previousProposalNumber <= highest) continue;
                highest = promise.previousProposalNumber;
                requestValue = promise.command;
            }
            if (requestValue == null) {
                requestValue = (PaxosCommand)command;
            }
            AcceptRequest<Object> request = new AcceptRequest<Object>(this.owner.nodeID, commandSlot, proposalNumber, requestValue);
            this.logger.verbose("Broadcasting " + request);
            this.broadcast(request);
            result = this.awaitQuorum(false, command, commandSlot, proposalNumber);
            this.logger.verbose("Quorum reached: " + (Object)((Object)result.getRight().getLeft()));
            if (result.getRight().getRight() != AcceptState.NONE) {
                return new Pair<AcceptState, Long>(result.getRight().getRight(), -1L);
            }
            if (result.getRight().getLeft() == Quorum.NO) {
                long highestDenialNumber = this.getHighestDenialNumber(false, result.getLeft(), proposalNumber);
                return new Pair<AcceptState, Long>(AcceptState.NONE, highestDenialNumber);
            }
            Map<Long, Long> highestDenialNumber = this.highestProposalNumbers;
            synchronized (highestDenialNumber) {
                Long highestProposal = this.highestProposalNumbers.get(commandSlot);
                if (highestProposal == null) {
                    return new Pair<AcceptState, Long>(AcceptState.OTHER, -1L);
                }
                if (highestProposal > proposalNumber) {
                    return new Pair<AcceptState, Long>(AcceptState.NONE, highestProposal);
                }
                this.processCommand(requestValue, commandSlot);
            }
            this.logger.verbose("Accepted as leader for slot " + commandSlot);
            LearnedCommand<PaxosCommand> edict = new LearnedCommand<PaxosCommand>(this.owner.nodeID, commandSlot, proposalNumber, requestValue);
            this.logger.verbose("Broadcasting " + edict);
            this.broadcast(edict);
            if (requestValue.equals(command)) {
                return new Pair<AcceptState, Long>(AcceptState.PROPOSED, -1L);
            }
            return new Pair<AcceptState, Long>(AcceptState.OTHER, -1L);
        }

        @Override
        public void run() {
            while (!this.stopped) {
                UDPListener.UDPMessage<PaxosMessage> udpMessage;
                try {
                    udpMessage = this.UDP.receive(100L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    continue;
                }
                if (udpMessage == null) {
                    new Thread(new SlotChecker(this.currentCommandSlot)).start();
                    continue;
                }
                PaxosMessage message = (PaxosMessage)udpMessage.data;
                if (message == null) continue;
                Long sourceNodeID = new Long(message.sourceNodeID);
                if ((message = this.processMessage(message)) == null) continue;
                UDPSender<PaxosMessage> sender = this.nodes.get(sourceNodeID);
                try {
                    sender.send(message);
                }
                catch (IOException e) {
                    this.logger.error(e);
                }
            }
        }

        public synchronized void stop() {
            this.stopped = true;
            this.UDP.close();
            this.nodes.values().forEach(sender -> sender.close());
        }

        private class SlotChecker
        implements Runnable {
            private final long commandSlot;

            public SlotChecker(long commandSlot) {
                this.commandSlot = commandSlot;
            }

            @Override
            public void run() {
                Proposal proposal = new Proposal(((Node)Node.this).owner.nodeID, this.commandSlot, -1L);
                Node.this.logger.verbose("Checking empty command slot " + this.commandSlot);
                Node.this.logger.verbose("Broadcasting " + proposal);
                Node.this.broadcast(proposal);
            }
        }

        private static enum Quorum {
            INSUFFICIENT,
            NO,
            YES;

        }

        private static enum AcceptState {
            NONE,
            OTHER,
            PROPOSED;

        }
    }
}

