package org.apache.flink.streaming.connectors.kafka.table;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.producer.ProducerRecord;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.class */
public class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> {
    private final Set<String> topics;
    private final Pattern topicPattern;
    private final KafkaPartitioner<RowData> partitioner;

    @Nullable
    private final SerializationSchema<RowData> keySerialization;
    private final SerializationSchema<RowData> valueSerialization;
    private final RowData.FieldGetter[] keyFieldGetters;
    private final RowData.FieldGetter[] valueFieldGetters;
    private final boolean hasMetadata;
    private final int[] metadataPositions;
    private final boolean upsertMode;
    private final Map<String, Boolean> topicPatternMatches;
    private final boolean mergeUpdates;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DynamicKafkaRecordSerializationSchema(@Nullable List<String> list, @Nullable Pattern pattern, @Nullable KafkaPartitioner<RowData> kafkaPartitioner, @Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<RowData> serializationSchema2, RowData.FieldGetter[] fieldGetterArr, RowData.FieldGetter[] fieldGetterArr2, boolean z, int[] iArr, boolean z2, boolean z3) {
        if (z2) {
            Preconditions.checkArgument(serializationSchema != null && fieldGetterArr.length > 0, "Key must be set in upsert mode for serialization schema.");
        }
        Preconditions.checkArgument((list != null && pattern == null && list.size() > 0) || (list == null && pattern != null), "Either Topic or Topic Pattern must be set.");
        if (list != null) {
            this.topics = new HashSet(list);
        } else {
            this.topics = null;
        }
        this.topicPattern = pattern;
        this.partitioner = kafkaPartitioner;
        this.keySerialization = serializationSchema;
        this.valueSerialization = (SerializationSchema) Preconditions.checkNotNull(serializationSchema2);
        this.keyFieldGetters = fieldGetterArr;
        this.valueFieldGetters = fieldGetterArr2;
        this.hasMetadata = z;
        this.metadataPositions = iArr;
        this.upsertMode = z2;
        this.topicPatternMatches = new HashMap();
        this.mergeUpdates = z3;
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
    public ProducerRecord<byte[], byte[]> serialize(RowData rowData, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext, Long l) {
        byte[] serialize;
        byte[] serialize2;
        if (this.mergeUpdates && rowData.getRowKind() == RowKind.UPDATE_AFTER) {
            return serializeUpdate((GenericRowData) rowData, kafkaSinkContext);
        }
        if (this.keySerialization == null && !this.hasMetadata) {
            byte[] serialize3 = this.valueSerialization.serialize(rowData);
            String targetTopic = getTargetTopic(rowData);
            return new ProducerRecord<>(targetTopic, extractPartition(rowData, targetTopic, null, serialize3, kafkaSinkContext.getPartitionsForTopic(targetTopic)), (Object) null, serialize3);
        }
        if (this.keySerialization == null) {
            serialize = null;
        } else {
            serialize = this.keySerialization.serialize(createProjectedRow(rowData, RowKind.INSERT, this.keyFieldGetters));
        }
        RowKind rowKind = rowData.getRowKind();
        if (!this.upsertMode) {
            serialize2 = this.valueSerialization.serialize(createProjectedRow(rowData, rowKind, this.valueFieldGetters));
        } else if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
            serialize2 = null;
        } else {
            RowData createProjectedRow = createProjectedRow(rowData, rowKind, this.valueFieldGetters);
            createProjectedRow.setRowKind(RowKind.INSERT);
            serialize2 = this.valueSerialization.serialize(createProjectedRow);
        }
        String targetTopic2 = getTargetTopic(rowData);
        return new ProducerRecord<>(targetTopic2, extractPartition(rowData, targetTopic2, serialize, serialize2, kafkaSinkContext.getPartitionsForTopic(targetTopic2)), (Long) readMetadata(rowData, KafkaDynamicSink.WritableMetadata.TIMESTAMP), serialize, serialize2, (Iterable) readMetadata(rowData, KafkaDynamicSink.WritableMetadata.HEADERS));
    }

    private ProducerRecord<byte[], byte[]> serializeUpdate(GenericRowData genericRowData, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) {
        RowData row = genericRowData.getRow(0, -1);
        RowData row2 = genericRowData.getRow(1, -1);
        Preconditions.checkArgument(row.getRowKind() == RowKind.UPDATE_BEFORE);
        Preconditions.checkArgument(row2.getRowKind() == RowKind.UPDATE_AFTER);
        if (this.keySerialization == null && !this.hasMetadata) {
            byte[] serialize = this.valueSerialization.serialize(genericRowData);
            String targetTopic = getTargetTopic(genericRowData);
            return new ProducerRecord<>(targetTopic, extractPartition(row2, targetTopic, null, serialize, kafkaSinkContext.getPartitionsForTopic(targetTopic)), (Object) null, serialize);
        }
        byte[] serialize2 = this.keySerialization == null ? null : this.keySerialization.serialize(createProjectedRow(row2, RowKind.INSERT, this.keyFieldGetters));
        RowData createProjectedRow = createProjectedRow(row, RowKind.UPDATE_BEFORE, this.valueFieldGetters);
        RowData createProjectedRow2 = createProjectedRow(row2, RowKind.UPDATE_AFTER, this.valueFieldGetters);
        genericRowData.setField(0, createProjectedRow);
        genericRowData.setField(1, createProjectedRow2);
        byte[] serialize3 = this.valueSerialization.serialize(genericRowData);
        String targetTopic2 = getTargetTopic(genericRowData);
        return new ProducerRecord<>(targetTopic2, extractPartition(createProjectedRow2, targetTopic2, serialize2, serialize3, kafkaSinkContext.getPartitionsForTopic(targetTopic2)), (Long) readMetadata(createProjectedRow2, KafkaDynamicSink.WritableMetadata.TIMESTAMP), serialize2, serialize3, (Iterable) readMetadata(createProjectedRow2, KafkaDynamicSink.WritableMetadata.HEADERS));
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
    public void open(SerializationSchema.InitializationContext initializationContext, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) throws Exception {
        if (this.keySerialization != null) {
            this.keySerialization.open(initializationContext);
        }
        if (this.partitioner != null) {
            this.partitioner.open(kafkaSinkContext.getParallelInstanceId(), kafkaSinkContext.getNumberOfParallelInstances());
        }
        this.valueSerialization.open(initializationContext);
    }

    private String getTargetTopic(RowData rowData) {
        if (this.topics != null && this.topics.size() == 1) {
            return this.topics.stream().findFirst().get();
        }
        String str = (String) readMetadata(rowData, KafkaDynamicSink.WritableMetadata.TOPIC);
        if (str == null) {
            throw new IllegalArgumentException("The topic of the sink record is not valid. Expected a single topic but no topic is set.");
        }
        if (this.topics != null && !this.topics.contains(str)) {
            throw new IllegalArgumentException(String.format("The topic of the sink record is not valid. Expected topic to be in: %s but was: %s", this.topics, str));
        }
        if (this.topicPattern == null || cachedTopicPatternMatch(str)) {
            return str;
        }
        throw new IllegalArgumentException(String.format("The topic of the sink record is not valid. Expected topic to match: %s but was: %s", this.topicPattern, str));
    }

    private boolean cachedTopicPatternMatch(String str) {
        return this.topicPatternMatches.computeIfAbsent(str, str2 -> {
            return Boolean.valueOf(this.topicPattern.matcher(str2).matches());
        }).booleanValue();
    }

    private Integer extractPartition(RowData rowData, String str, @Nullable byte[] bArr, byte[] bArr2, int[] iArr) {
        if (this.partitioner != null) {
            return Integer.valueOf(this.partitioner.partition(rowData, bArr, bArr2, str, iArr));
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static RowData createProjectedRow(RowData rowData, RowKind rowKind, RowData.FieldGetter[] fieldGetterArr) {
        int length = fieldGetterArr.length;
        GenericRowData genericRowData = new GenericRowData(rowKind, length);
        for (int i = 0; i < length; i++) {
            genericRowData.setField(i, fieldGetterArr[i].getFieldOrNull(rowData));
        }
        return genericRowData;
    }

    private <T> T readMetadata(RowData rowData, KafkaDynamicSink.WritableMetadata writableMetadata) {
        int i = this.metadataPositions[writableMetadata.ordinal()];
        if (i < 0) {
            return null;
        }
        return (T) writableMetadata.converter.read(rowData, i);
    }
}
