package org.apache.flink.connector.kafka.source;

import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/kafka/source/KafkaSourceBuilder.class */
public class KafkaSourceBuilder<OUT> implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class);
    private static final String[] REQUIRED_CONFIGS = {"bootstrap.servers"};
    private KafkaSubscriber subscriber = null;
    private OffsetsInitializer startingOffsetsInitializer = OffsetsInitializer.earliest();
    private OffsetsInitializer stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
    private KafkaRecordDeserializationSchema<OUT> deserializationSchema = null;
    protected Properties props = new Properties();
    private long recordsPerSecondRateLimit;
    private long lastHistoricalDataTimestamp;

    public KafkaSourceBuilder<OUT> setBootstrapServers(String str) {
        return setProperty("bootstrap.servers", str);
    }

    public KafkaSourceBuilder<OUT> setGroupId(String str) {
        return setProperty("group.id", str);
    }

    public KafkaSourceBuilder<OUT> setTopics(List<String> list) {
        ensureSubscriberIsNull("topics");
        this.subscriber = KafkaSubscriber.getTopicListSubscriber(list);
        return this;
    }

    public KafkaSourceBuilder<OUT> setTopics(String... strArr) {
        return setTopics(Arrays.asList(strArr));
    }

    public KafkaSourceBuilder<OUT> setTopicPattern(Pattern pattern) {
        ensureSubscriberIsNull("topic pattern");
        this.subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern);
        return this;
    }

    public KafkaSourceBuilder<OUT> setPartitions(Set<TopicPartition> set) {
        ensureSubscriberIsNull("partitions");
        this.subscriber = KafkaSubscriber.getPartitionSetSubscriber(set);
        return this;
    }

    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
        ensureSubscriberIsNull("custom");
        this.subscriber = (KafkaSubscriber) Preconditions.checkNotNull(kafkaSubscriber);
        return this;
    }

    public KafkaSourceBuilder<OUT> setStartingOffsets(OffsetsInitializer offsetsInitializer) {
        this.startingOffsetsInitializer = offsetsInitializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer offsetsInitializer) {
        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
        this.stoppingOffsetsInitializer = offsetsInitializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer offsetsInitializer) {
        this.boundedness = Boundedness.BOUNDED;
        this.stoppingOffsetsInitializer = offsetsInitializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setDeserializer(KafkaRecordDeserializationSchema<OUT> kafkaRecordDeserializationSchema) {
        this.deserializationSchema = kafkaRecordDeserializationSchema;
        return this;
    }

    public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(DeserializationSchema<OUT> deserializationSchema) {
        this.deserializationSchema = KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
        return this;
    }

    public KafkaSourceBuilder<OUT> setClientIdPrefix(String str) {
        return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), str);
    }

    public KafkaSourceBuilder<OUT> setProperty(String str, String str2) {
        this.props.setProperty(str, str2);
        return this;
    }

    public KafkaSourceBuilder<OUT> setProperties(Properties properties) {
        this.props.putAll(properties);
        return this;
    }

    public KafkaSourceBuilder<OUT> setRecordsPerSecondRateLimit(long j) {
        this.recordsPerSecondRateLimit = j;
        return this;
    }

    public KafkaSourceBuilder<OUT> setLastHistoricalDataTimestamp(long j) {
        this.lastHistoricalDataTimestamp = j;
        return this;
    }

    public KafkaSource<OUT> build() {
        sanityCheck();
        parseAndSetRequiredProperties();
        return new KafkaSource<>(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.boundedness, this.deserializationSchema, this.props, this.recordsPerSecondRateLimit, this.lastHistoricalDataTimestamp);
    }

    private void ensureSubscriberIsNull(String str) {
        if (this.subscriber != null) {
            throw new IllegalStateException(String.format("Cannot use %s for consumption because a %s is already set for consumption.", str, this.subscriber.getClass().getSimpleName()));
        }
    }

    private void parseAndSetRequiredProperties() {
        maybeOverride("key.deserializer", ByteArrayDeserializer.class.getName(), false);
        maybeOverride("value.deserializer", ByteArrayDeserializer.class.getName(), false);
        if (!this.props.containsKey("group.id")) {
            LOG.warn("Offset commit on checkpoint is disabled because {} is not specified", "group.id");
            maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
        }
        maybeOverride("enable.auto.commit", "false", false);
        maybeOverride("auto.offset.reset", this.startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true);
        maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", this.boundedness == Boundedness.BOUNDED);
        maybeOverride(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), this.props.containsKey("group.id") ? this.props.getProperty("group.id") : "KafkaSource-" + new Random().nextLong(), false);
        maybeOverride(KafkaSourceOptions.DEFAULT_REPLICATION_FACTOR.key(), "1", false);
    }

    private boolean maybeOverride(String str, String str2, boolean z) {
        boolean z2 = false;
        String property = this.props.getProperty(str);
        if (property == null) {
            this.props.setProperty(str, str2);
        } else if (z) {
            LOG.warn(String.format("Property %s is provided but will be overridden from %s to %s", str, property, str2));
            this.props.setProperty(str, str2);
            z2 = true;
        }
        return z2;
    }

    private void sanityCheck() {
        for (String str : REQUIRED_CONFIGS) {
            Preconditions.checkNotNull(this.props.getProperty(str), String.format("Property %s is required but not provided", str));
        }
        Preconditions.checkNotNull(this.subscriber, "No subscribe mode is specified, should be one of topics, topic pattern and partition set.");
        Preconditions.checkNotNull(this.deserializationSchema, "Deserialization schema is required but not provided.");
        Preconditions.checkState(this.props.containsKey("group.id") || !offsetCommitEnabledManually(), String.format("Property %s is required when offset commit is enabled", "group.id"));
        if (this.startingOffsetsInitializer instanceof OffsetsInitializerValidator) {
            ((OffsetsInitializerValidator) this.startingOffsetsInitializer).validate(this.props);
        }
        if (this.stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
            ((OffsetsInitializerValidator) this.stoppingOffsetsInitializer).validate(this.props);
        }
        if (this.props.containsKey("key.deserializer")) {
            checkDeserializer(this.props.getProperty("key.deserializer"));
        }
        if (this.props.containsKey("value.deserializer")) {
            checkDeserializer(this.props.getProperty("value.deserializer"));
        }
    }

    private void checkDeserializer(String str) {
        try {
            Class<?> cls = Class.forName(str);
            if (!Deserializer.class.isAssignableFrom(cls)) {
                throw new IllegalArgumentException(String.format("Deserializer class %s is not a subclass of %s", str, Deserializer.class.getName()));
            }
            for (Type type : cls.getGenericInterfaces()) {
                if (type instanceof ParameterizedType) {
                    ParameterizedType parameterizedType = (ParameterizedType) type;
                    if (parameterizedType.getRawType() == Deserializer.class) {
                        Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                        if (actualTypeArguments.length != 1 || actualTypeArguments[0] != byte[].class) {
                            throw new IllegalArgumentException(String.format("Deserializer class %s does not deserialize byte[]", str));
                        }
                    } else {
                        continue;
                    }
                }
            }
        } catch (ClassNotFoundException e) {
            throw new IllegalArgumentException(String.format("Deserializer class %s not found", str), e);
        }
    }

    private boolean offsetCommitEnabledManually() {
        return (this.props.containsKey("enable.auto.commit") && Boolean.parseBoolean(this.props.getProperty("enable.auto.commit"))) || (this.props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) && Boolean.parseBoolean(this.props.getProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())));
    }
}
