package com.huawei.streaming.operator.inputstream;

import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.exception.StreamSerDeException;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.IEmitter;
import com.huawei.streaming.operator.IInputStreamOperator;
import com.huawei.streaming.serde.StreamSerDe;
import com.huawei.streaming.util.StreamingUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/inputstream/TCPClientInputOperator.class */
public class TCPClientInputOperator implements IInputStreamOperator {
    private static final Logger LOG = LoggerFactory.getLogger(TCPClientInputOperator.class);
    private static final long serialVersionUID = 4927492973217406542L;
    private String tcpServer;
    private Integer tcpPort;
    private Integer tcpConnectSessionTimeout;
    private IEmitter emitter;
    private StreamSerDe serde;
    private int binaryPackageSize = 883;
    private StreamingConfig config;

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        this.tcpServer = streamingConfig.getStringValue(StreamingConfig.OPERATOR_TCPCLIENT_SERVER);
        this.tcpPort = Integer.valueOf(streamingConfig.getIntValue(StreamingConfig.OPERATOR_TCPCLIENT_PORT));
        this.tcpConnectSessionTimeout = Integer.valueOf(streamingConfig.getIntValue(StreamingConfig.OPERATOR_TCPCLIENT_SESSIONTIMEOUT));
        this.binaryPackageSize = streamingConfig.getIntValue(StreamingConfig.OPERATOR_TCPCLIENT_PACKAGELENGTH);
        this.config = streamingConfig;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public StreamingConfig getConfig() {
        return this.config;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void initialize() throws StreamingException {
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public void execute() throws StreamingException {
        Socket socket = new Socket();
        byte[] bArr = new byte[this.binaryPackageSize];
        try {
            try {
                try {
                    socket.connect(new InetSocketAddress(this.tcpServer, this.tcpPort.intValue()), this.tcpConnectSessionTimeout.intValue());
                    while (socket.getInputStream().read(bArr) != -1) {
                        emitData(bArr);
                    }
                    StreamingUtils.close(socket);
                } catch (Exception e) {
                    LOG.warn("Ignore an Exception.", e);
                    StreamingUtils.close(socket);
                }
            } catch (StreamSerDeException e2) {
                LOG.warn("Ignore a serde exception.", e2);
                StreamingUtils.close(socket);
            } catch (IOException e3) {
                LOG.warn("Ignore a IOException.", e3);
                StreamingUtils.close(socket);
            }
        } catch (Throwable th) {
            StreamingUtils.close(socket);
            throw th;
        }
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void destroy() throws StreamingException {
    }

    private void emitData(Object obj) throws StreamSerDeException, StreamingException {
        List<Object[]> deSerialize = this.serde.deSerialize(obj);
        if (deSerialize == null || deSerialize.size() == 0) {
            return;
        }
        Iterator<Object[]> it = deSerialize.iterator();
        while (it.hasNext()) {
            this.emitter.emit(it.next());
        }
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public void setEmitter(IEmitter iEmitter) {
        this.emitter = iEmitter;
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public void setSerDe(StreamSerDe streamSerDe) {
        this.serde = streamSerDe;
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public StreamSerDe getSerDe() {
        return this.serde;
    }
}
