package com.huawei.streaming.operator.outputstream;

import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.StreamSerDeException;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.IOutputStreamOperator;
import com.huawei.streaming.serde.BaseSerDe;
import com.huawei.streaming.serde.StreamSerDe;
import com.huawei.streaming.util.StreamingUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/outputstream/TCPSenderFuncOp.class */
public class TCPSenderFuncOp implements IOutputStreamOperator {
    private static final Logger LOG = LoggerFactory.getLogger(TCPSenderFuncOp.class);
    private static final long serialVersionUID = 5433931984789791657L;
    private transient Socket s;
    private String tcpServer;
    private Integer tcpPort;
    private Integer tcpConnectSessionTimeout;
    private StreamSerDe serde;
    private StreamingConfig config;

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        this.tcpServer = streamingConfig.getStringValue(StreamingConfig.OPERATOR_TCPCLIENT_SERVER);
        this.tcpPort = Integer.valueOf(streamingConfig.getIntValue(StreamingConfig.OPERATOR_TCPCLIENT_PORT));
        this.tcpConnectSessionTimeout = Integer.valueOf(streamingConfig.getIntValue(StreamingConfig.OPERATOR_TCPCLIENT_SESSIONTIMEOUT));
        this.config = streamingConfig;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public StreamingConfig getConfig() {
        return this.config;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void initialize() throws StreamingException {
    }

    @Override // com.huawei.streaming.operator.IOutputStreamOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        Object serialize;
        LOG.debug("In execute of TCPSenderFuncOp");
        try {
            try {
                try {
                    this.s = new Socket();
                    this.s.connect(new InetSocketAddress(this.tcpServer, this.tcpPort.intValue()), this.tcpConnectSessionTimeout.intValue());
                    serialize = this.serde.serialize(BaseSerDe.changeEventsToList(tupleEvent));
                } catch (IOException e) {
                    LOG.warn("Ignore a IOException.", e);
                    StreamingUtils.close(this.s);
                }
            } catch (StreamSerDeException e2) {
                LOG.warn("Ignore a serde exception.", e2);
                StreamingUtils.close(this.s);
            }
            if (serialize == null) {
                LOG.warn("Ignore a null result in output.");
                StreamingUtils.close(this.s);
                return;
            }
            if (serialize instanceof List) {
                Iterator it = ((List) serialize).iterator();
                while (it.hasNext()) {
                    this.s.getOutputStream().write((byte[]) it.next());
                }
            } else {
                this.s.getOutputStream().write((byte[]) serialize);
            }
            StreamingUtils.close(this.s);
            LOG.debug("TCPSenderOp sending end.");
        } catch (Throwable th) {
            StreamingUtils.close(this.s);
            throw th;
        }
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void destroy() throws StreamingException {
    }

    @Override // com.huawei.streaming.operator.IOutputStreamOperator
    public void setSerDe(StreamSerDe streamSerDe) {
        this.serde = streamSerDe;
    }

    @Override // com.huawei.streaming.operator.IOutputStreamOperator
    public StreamSerDe getSerDe() {
        return this.serde;
    }
}
