package com.huawei.streaming.operator.outputstream;

import com.google.common.base.Strings;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.IOutputStreamOperator;
import com.huawei.streaming.serde.StreamSerDe;
import java.util.Locale;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/outputstream/KafkaFunctionOp.class */
public class KafkaFunctionOp implements IOutputStreamOperator {
    private static final long serialVersionUID = -3954667394060365585L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFunctionOp.class);
    private StreamSerDe serde;
    private StreamingConfig config;

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        initKeyField(streamingConfig);
        this.config = streamingConfig;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public StreamingConfig getConfig() {
        return this.config;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void initialize() throws StreamingException {
    }

    @Override // com.huawei.streaming.operator.IOutputStreamOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void destroy() throws StreamingException {
    }

    @Override // com.huawei.streaming.operator.IOutputStreamOperator
    public void setSerDe(StreamSerDe streamSerDe) {
        this.serde = streamSerDe;
    }

    @Override // com.huawei.streaming.operator.IOutputStreamOperator
    public StreamSerDe getSerDe() {
        return this.serde;
    }

    private void initKeyField(StreamingConfig streamingConfig) throws StreamingException {
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_KAFKA_KEY_FIELD)) {
            String stringValue = streamingConfig.getStringValue(StreamingConfig.OPERATOR_KAFKA_KEY_FIELD);
            if (Strings.isNullOrEmpty(stringValue)) {
                streamingConfig.remove(StreamingConfig.OPERATOR_KAFKA_KEY_FIELD);
                return;
            }
            boolean z = false;
            String[] allAttributeNames = this.serde.getSchema().getAllAttributeNames();
            int length = allAttributeNames.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                String str = allAttributeNames[i];
                if (str.toLowerCase(Locale.US).equals(stringValue.toLowerCase(Locale.US))) {
                    streamingConfig.put(StreamingConfig.OPERATOR_KAFKA_KEY_FIELD, str);
                    z = true;
                    break;
                }
                i++;
            }
            if (z) {
                return;
            }
            LOG.error("Failed to read config value {}", stringValue);
            throw new StreamingException(ErrorCode.CONFIG_VALUE_ERROR, stringValue);
        }
    }
}
