/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.shaded.org.apache.zookeeper.server.quorum;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.shaded.org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.hadoop.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class QuorumCnxManager {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManager.class);
    static final int RECV_CAPACITY = 100;
    static final int SEND_CAPACITY = 1;
    static final int PACKETMAXSIZE = 524288;
    static final int MAX_CONNECTION_ATTEMPTS = 2;
    public static final int maxBuffer = 2048;
    private long observerCounter = -1L;
    private int cnxTO = 5000;
    final QuorumPeer self;
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    public final ArrayBlockingQueue<Message> recvQueue;
    private final Object recvQLock = new Object();
    volatile boolean shutdown = false;
    public final Listener listener;
    private AtomicInteger threadCnt = new AtomicInteger(0);

    public QuorumCnxManager(QuorumPeer self) {
        this.recvQueue = new ArrayBlockingQueue(100);
        this.queueSendMap = new ConcurrentHashMap();
        this.senderWorkerMap = new ConcurrentHashMap();
        this.lastMessageSent = new ConcurrentHashMap();
        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if (cnxToValue != null) {
            this.cnxTO = new Integer(cnxToValue);
        }
        this.self = self;
        this.listener = new Listener();
    }

    public void testInitiateConnection(long sid) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opening channel to server " + sid);
        }
        Socket sock = new Socket();
        this.setSockOpts(sock);
        sock.connect(this.self.getVotingView().get((Object)Long.valueOf((long)sid)).electionAddr, this.cnxTO);
        this.initiateConnection(sock, sid);
    }

    public boolean initiateConnection(Socket sock, Long sid) {
        DataOutputStream dout = null;
        try {
            dout = new DataOutputStream(sock.getOutputStream());
            dout.writeLong(this.self.getId());
            dout.flush();
        }
        catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", (Throwable)e);
            this.closeSocket(sock);
            return false;
        }
        if (sid <= this.self.getId()) {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            if (!this.queueSendMap.containsKey(sid)) {
                this.queueSendMap.put(sid, new ArrayBlockingQueue(1));
            }
            sw.start();
            rw.start();
            return true;
        }
        LOG.info("Have smaller server identifier, so dropping the connection: (" + sid + ", " + this.self.getId() + ")");
        this.closeSocket(sock);
        return false;
    }

    public void receiveConnection(Socket sock) {
        SendWorker sw;
        Long sid = null;
        try {
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            if (sid < 0L) {
                sid = din.readLong();
                int num_remaining_bytes = din.readInt();
                if (num_remaining_bytes < 0 || num_remaining_bytes > 2048) {
                    LOG.error("Unreasonable buffer length: {}", (Object)num_remaining_bytes);
                    this.closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }
            if (sid == Long.MAX_VALUE) {
                sid = this.observerCounter--;
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        }
        catch (IOException e) {
            this.closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e.toString());
            return;
        }
        if (sid < this.self.getId()) {
            sw = this.senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
        } else {
            sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            if (!this.queueSendMap.containsKey(sid)) {
                this.queueSendMap.put(sid, new ArrayBlockingQueue(1));
            }
            sw.start();
            rw.start();
            return;
        }
        LOG.debug("Create new connection to server: " + sid);
        this.closeSocket(sock);
        this.connectOne(sid);
    }

    public void toSend(Long sid, ByteBuffer b) {
        if (this.self.getId() == sid.longValue()) {
            b.position(0);
            this.addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
            if (!this.queueSendMap.containsKey(sid)) {
                ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(1);
                this.queueSendMap.put(sid, bq);
                this.addToSendQueue(bq, b);
            } else {
                ArrayBlockingQueue<ByteBuffer> bq = this.queueSendMap.get(sid);
                if (bq != null) {
                    this.addToSendQueue(bq, b);
                } else {
                    LOG.error("No queue for server " + sid);
                }
            }
            this.connectOne(sid);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    synchronized void connectOne(long sid) {
        if (this.senderWorkerMap.get(sid) == null) {
            if (!this.self.quorumPeers.containsKey(sid)) {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
            InetSocketAddress electionAddr = this.self.quorumPeers.get((Object)Long.valueOf((long)sid)).electionAddr;
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Opening channel to server " + sid);
                }
                Socket sock = new Socket();
                this.setSockOpts(sock);
                sock.connect(this.self.getView().get((Object)Long.valueOf((long)sid)).electionAddr, this.cnxTO);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connected to server " + sid);
                }
                this.initiateConnection(sock, sid);
                return;
            }
            catch (UnresolvedAddressException e) {
                LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, (Throwable)e);
                if (!this.self.getView().containsKey(sid)) throw e;
                this.self.getView().get(sid).recreateSocketAddresses();
                throw e;
            }
            catch (IOException e) {
                LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, (Throwable)e);
                if (!this.self.getView().containsKey(sid)) return;
                this.self.getView().get(sid).recreateSocketAddresses();
                return;
            }
        } else {
            LOG.debug("There is a connection already for server " + sid);
        }
    }

    public void connectAll() {
        Enumeration<Long> en = this.queueSendMap.keys();
        while (en.hasMoreElements()) {
            long sid = en.nextElement();
            this.connectOne(sid);
        }
    }

    boolean haveDelivered() {
        for (ArrayBlockingQueue<ByteBuffer> queue : this.queueSendMap.values()) {
            LOG.debug("Queue size: " + queue.size());
            if (queue.size() != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.debug("Halting listener");
        this.listener.halt();
        this.softHalt();
    }

    public void softHalt() {
        for (SendWorker sw : this.senderWorkerMap.values()) {
            LOG.debug("Halting sender: " + sw);
            sw.finish();
        }
    }

    private void setSockOpts(Socket sock) throws SocketException {
        sock.setTcpNoDelay(true);
        sock.setSoTimeout(this.self.tickTime * this.self.syncLimit);
    }

    private void closeSocket(Socket sock) {
        try {
            sock.close();
        }
        catch (IOException ie) {
            LOG.error("Exception while closing", (Throwable)ie);
        }
    }

    public long getThreadCount() {
        return this.threadCnt.get();
    }

    public QuorumPeer getQuorumPeer() {
        return this.self;
    }

    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
        if (queue.remainingCapacity() == 0) {
            try {
                queue.remove();
            }
            catch (NoSuchElementException ne) {
                LOG.debug("Trying to remove from an empty Queue. Ignoring exception " + ne);
            }
        }
        try {
            queue.add(buffer);
        }
        catch (IllegalStateException ie) {
            LOG.error("Unable to insert an element in the queue " + ie);
        }
    }

    private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
        return queue.isEmpty();
    }

    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToRecvQueue(Message msg) {
        Object object = this.recvQLock;
        synchronized (object) {
            if (this.recvQueue.remainingCapacity() == 0) {
                try {
                    this.recvQueue.remove();
                }
                catch (NoSuchElementException ne) {
                    LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                this.recvQueue.add(msg);
            }
            catch (IllegalStateException ie) {
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

    public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
        return this.recvQueue.poll(timeout, unit);
    }

    class RecvWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        volatile boolean running;
        DataInputStream din;
        final SendWorker sw;

        RecvWorker(Socket sock, Long sid, SendWorker sw) {
            super("RecvWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.sw = sw;
            try {
                this.din = new DataInputStream(sock.getInputStream());
                sock.setSoTimeout(0);
            }
            catch (IOException e) {
                LOG.error("Error while accessing socket for " + sid, (Throwable)e);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
        }

        synchronized boolean finish() {
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            this.interrupt();
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                try {}
                catch (Exception e) {
                    LOG.warn("Connection broken for id " + this.sid + ", my id = " + QuorumCnxManager.this.self.getId() + ", error = ", (Throwable)e);
                    Object var5_6 = null;
                    LOG.warn("Interrupting SendWorker");
                    this.sw.finish();
                    if (this.sock == null) return;
                    QuorumCnxManager.this.closeSocket(this.sock);
                    return;
                }
            }
            catch (Throwable throwable) {
                Object var5_7 = null;
                LOG.warn("Interrupting SendWorker");
                this.sw.finish();
                if (this.sock == null) throw throwable;
                QuorumCnxManager.this.closeSocket(this.sock);
                throw throwable;
            }
            while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                int length = this.din.readInt();
                if (length <= 0) throw new IOException("Received packet with invalid packet: " + length);
                if (length > 524288) {
                    throw new IOException("Received packet with invalid packet: " + length);
                }
                byte[] msgArray = new byte[length];
                this.din.readFully(msgArray, 0, length);
                ByteBuffer message = ByteBuffer.wrap(msgArray);
                QuorumCnxManager.this.addToRecvQueue(new Message(message.duplicate(), this.sid));
            }
            Object var5_5 = null;
            LOG.warn("Interrupting SendWorker");
            this.sw.finish();
            if (this.sock == null) return;
            QuorumCnxManager.this.closeSocket(this.sock);
        }
    }

    class SendWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        RecvWorker recvWorker;
        volatile boolean running;
        DataOutputStream dout;

        SendWorker(Socket sock, Long sid) {
            super("SendWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.recvWorker = null;
            try {
                this.dout = new DataOutputStream(sock.getOutputStream());
            }
            catch (IOException e) {
                LOG.error("Unable to access socket output stream", (Throwable)e);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
            LOG.debug("Address of remote peer: " + this.sid);
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Calling finish for " + this.sid);
            }
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            QuorumCnxManager.this.closeSocket(this.sock);
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Removing entry from senderWorkerMap sid=" + this.sid);
            }
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid, this);
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            }
            catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", (Throwable)be);
                return;
            }
            this.dout.writeInt(b.capacity());
            this.dout.write(b.array());
            this.dout.flush();
        }

        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                ByteBuffer b;
                ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                if ((bq == null || QuorumCnxManager.this.isSendQueueEmpty(bq)) && (b = QuorumCnxManager.this.lastMessageSent.get(this.sid)) != null) {
                    LOG.debug("Attempting to send lastMessage to sid=" + this.sid);
                    this.send(b);
                }
            }
            catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", (Throwable)e);
                this.finish();
            }
            block6: while (true) {
                try {
                    while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                        ByteBuffer b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                            if (bq == null) {
                                LOG.error("No queue of incoming messages for server " + this.sid);
                                break block6;
                            }
                            b = QuorumCnxManager.this.pollSendQueue(bq, 1000L, TimeUnit.MILLISECONDS);
                            if (b == null) continue block6;
                            QuorumCnxManager.this.lastMessageSent.put(this.sid, b);
                            this.send(b);
                            continue block6;
                        }
                        catch (InterruptedException e) {
                            LOG.warn("Interrupted while waiting for message on queue", (Throwable)e);
                        }
                    }
                    break;
                }
                catch (Exception e) {
                    LOG.warn("Exception when using channel: for id " + this.sid + " my id = " + QuorumCnxManager.this.self.getId() + " error = " + e);
                    break;
                }
            }
            this.finish();
            LOG.warn("Send worker leaving thread");
        }
    }

    public class Listener
    extends ZooKeeperThread {
        volatile ServerSocket ss;

        public Listener() {
            super("ListenerThread");
            this.ss = null;
        }

        public void run() {
            int numRetries = 0;
            while (!QuorumCnxManager.this.shutdown && numRetries < 3) {
                try {
                    InetSocketAddress addr;
                    this.ss = new ServerSocket();
                    this.ss.setReuseAddress(true);
                    if (QuorumCnxManager.this.self.getQuorumListenOnAllIPs()) {
                        int port = QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    this.setName(QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr.toString());
                    this.ss.bind(addr);
                    while (!QuorumCnxManager.this.shutdown) {
                        Socket client = this.ss.accept();
                        QuorumCnxManager.this.setSockOpts(client);
                        LOG.info("Received connection request " + client.getRemoteSocketAddress());
                        QuorumCnxManager.this.receiveConnection(client);
                        numRetries = 0;
                    }
                }
                catch (IOException e) {
                    LOG.error("Exception while listening", (Throwable)e);
                    ++numRetries;
                    try {
                        this.ss.close();
                        Thread.sleep(1000L);
                    }
                    catch (IOException ie) {
                        LOG.error("Error closing server socket", (Throwable)ie);
                    }
                    catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. Ignoring exception", (Throwable)ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!QuorumCnxManager.this.shutdown) {
                LOG.error("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: " + QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr);
            }
        }

        void halt() {
            try {
                LOG.debug("Trying to close listener: " + this.ss);
                if (this.ss != null) {
                    LOG.debug("Closing listener: " + QuorumCnxManager.this.self.getId());
                    this.ss.close();
                }
            }
            catch (IOException e) {
                LOG.warn("Exception when shutting down listener: " + e);
            }
        }
    }

    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }
    }
}

