package org.apache.storm.sql.runtime.datasource.socket.trident;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.storm.Config;
import org.apache.storm.spout.Scheme;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout.class */
public class TridentSocketSpout implements IBatchSpout {
    private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
    private final String host;
    private final int port;
    private final Scheme scheme;
    private volatile boolean _running = true;
    private BlockingDeque<String> queue;
    private Socket socket;
    private Thread readerThread;
    private BufferedReader in;
    private ObjectMapper objectMapper;
    private Map<Long, List<List<Object>>> batches;

    /* loaded from: input_file:org/apache/storm/sql/runtime/datasource/socket/trident/TridentSocketSpout$SocketReaderRunnable.class */
    private class SocketReaderRunnable implements Runnable {
        private SocketReaderRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String readLine;
            while (TridentSocketSpout.this._running) {
                try {
                    readLine = TridentSocketSpout.this.in.readLine();
                } catch (Throwable th) {
                    TridentSocketSpout.this.die(th);
                }
                if (readLine == null) {
                    throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
                    break;
                }
                TridentSocketSpout.this.queue.push(readLine.trim());
            }
        }
    }

    public TridentSocketSpout(Scheme scheme, String str, int i) {
        this.scheme = scheme;
        this.host = str;
        this.port = i;
    }

    public void open(Map map, TopologyContext topologyContext) {
        this.queue = new LinkedBlockingDeque();
        this.objectMapper = new ObjectMapper();
        this.batches = new HashMap();
        try {
            this.socket = new Socket(this.host, this.port);
            this.in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            this.readerThread = new Thread(new SocketReaderRunnable());
            this.readerThread.start();
        } catch (IOException e) {
            throw new RuntimeException("Error opening socket: host " + this.host + " port " + this.port);
        }
    }

    public void emitBatch(long j, TridentCollector tridentCollector) {
        List<List<Object>> list = this.batches.get(Long.valueOf(j));
        if (list == null) {
            list = new ArrayList();
            while (this.queue.peek() != null) {
                List<Object> convertLineToTuple = convertLineToTuple(this.queue.poll());
                if (convertLineToTuple != null) {
                    list.add(convertLineToTuple);
                }
            }
            this.batches.put(Long.valueOf(j), list);
        }
        Iterator<List<Object>> it = list.iterator();
        while (it.hasNext()) {
            tridentCollector.emit(it.next());
        }
    }

    private List<Object> convertLineToTuple(String str) {
        return this.scheme.deserialize(ByteBuffer.wrap(str.getBytes()));
    }

    public void ack(long j) {
        this.batches.remove(Long.valueOf(j));
    }

    public void close() {
        this._running = false;
        this.readerThread.interrupt();
        this.queue.clear();
        closeQuietly(this.in);
        closeQuietly(this.socket);
    }

    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.setMaxTaskParallelism(1);
        return config;
    }

    public Fields getOutputFields() {
        return this.scheme.getOutputFields();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void die(Throwable th) {
        LOG.error("Halting process: TridentSocketSpout died.", th);
        if (this._running || (th instanceof Error)) {
            System.exit(11);
        }
    }

    private void closeQuietly(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (IOException e) {
            }
        }
    }
}
