package org.apache.flink.connector.kafka.source.enumerator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.class */
public class KafkaSourceEnumerator implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceEnumerator.class);
    private final KafkaSubscriber subscriber;
    private final OffsetsInitializer startingOffsetInitializer;
    private final OffsetsInitializer stoppingOffsetInitializer;
    private final Properties properties;
    private final long partitionDiscoveryIntervalMs;
    private final SplitEnumeratorContext<KafkaPartitionSplit> context;
    private final Set<TopicPartition> discoveredPartitions;
    private final Map<Integer, Set<KafkaPartitionSplit>> readerIdToSplitAssignments;
    private final Map<Integer, Set<KafkaPartitionSplit>> pendingPartitionSplitAssignment;
    private final String consumerGroupId;
    private KafkaConsumer<byte[], byte[]> consumer;
    private AdminClient adminClient;
    private boolean noMoreNewPartitionSplits;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.class */
    public static class PartitionOffsetsRetrieverImpl implements OffsetsInitializer.PartitionOffsetsRetriever, AutoCloseable {
        private final KafkaConsumer<?, ?> consumer;
        private final AdminClient adminClient;
        private final String groupId;

        public PartitionOffsetsRetrieverImpl(KafkaConsumer<?, ?> kafkaConsumer, AdminClient adminClient, String str) {
            this.consumer = kafkaConsumer;
            this.adminClient = adminClient;
            this.groupId = str;
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> collection) {
            try {
                return (Map) this.adminClient.listConsumerGroupOffsets(this.groupId, new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList(collection))).partitionsToOffsetAndMetadata().thenApply(map -> {
                    HashMap hashMap = new HashMap();
                    map.forEach((topicPartition, offsetAndMetadata) -> {
                    });
                    return hashMap;
                }).get();
            } catch (InterruptedException e) {
                throw new FlinkRuntimeException("Interrupted while listing offsets for consumer group " + this.groupId, e);
            } catch (ExecutionException e2) {
                throw new FlinkRuntimeException("Failed to fetch committed offsets for consumer group " + this.groupId + " due to", e2);
            }
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
            return this.consumer.endOffsets(collection);
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
            return this.consumer.beginningOffsets(collection);
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
            return this.consumer.offsetsForTimes(map);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.consumer.close(Duration.ZERO);
            this.adminClient.close(Duration.ZERO);
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator$PartitionSplitChange.class */
    private static class PartitionSplitChange {
        private final Set<KafkaPartitionSplit> newPartitionSplits;
        private final Set<TopicPartition> removedPartitions;

        private PartitionSplitChange(Set<KafkaPartitionSplit> set, Set<TopicPartition> set2) {
            this.newPartitionSplits = Collections.unmodifiableSet(set);
            this.removedPartitions = Collections.unmodifiableSet(set2);
        }
    }

    public KafkaSourceEnumerator(KafkaSubscriber kafkaSubscriber, OffsetsInitializer offsetsInitializer, OffsetsInitializer offsetsInitializer2, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext) {
        this(kafkaSubscriber, offsetsInitializer, offsetsInitializer2, properties, splitEnumeratorContext, new HashMap());
    }

    public KafkaSourceEnumerator(KafkaSubscriber kafkaSubscriber, OffsetsInitializer offsetsInitializer, OffsetsInitializer offsetsInitializer2, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext, Map<Integer, Set<KafkaPartitionSplit>> map) {
        this.noMoreNewPartitionSplits = false;
        this.subscriber = kafkaSubscriber;
        this.startingOffsetInitializer = offsetsInitializer;
        this.stoppingOffsetInitializer = offsetsInitializer2;
        this.properties = properties;
        this.context = splitEnumeratorContext;
        this.discoveredPartitions = new HashSet();
        this.readerIdToSplitAssignments = new HashMap(map);
        this.readerIdToSplitAssignments.forEach((num, set) -> {
            set.forEach(kafkaPartitionSplit -> {
                this.discoveredPartitions.add(kafkaPartitionSplit.getTopicPartition());
            });
        });
        this.pendingPartitionSplitAssignment = new HashMap();
        this.partitionDiscoveryIntervalMs = ((Long) KafkaSourceOptions.getOption(properties, KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS, Long::parseLong)).longValue();
        this.consumerGroupId = properties.getProperty("group.id");
    }

    public void start() {
        this.consumer = getKafkaConsumer();
        this.adminClient = getKafkaAdminClient();
        if (this.partitionDiscoveryIntervalMs > 0) {
            LOG.info("Starting the KafkaSourceEnumerator for consumer group {} with partition discovery interval of {} ms.", this.consumerGroupId, Long.valueOf(this.partitionDiscoveryIntervalMs));
            this.context.callAsync(this::discoverAndInitializePartitionSplit, this::handlePartitionSplitChanges, 0L, this.partitionDiscoveryIntervalMs);
        } else {
            LOG.info("Starting the KafkaSourceEnumerator for consumer group {} without periodic partition discovery.", this.consumerGroupId);
            this.context.callAsync(this::discoverAndInitializePartitionSplit, this::handlePartitionSplitChanges);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<KafkaPartitionSplit> list, int i) {
        addPartitionSplitChangeToPendingAssignments(list);
        assignPendingPartitionSplits();
    }

    public void addReader(int i) {
        LOG.debug("Adding reader {} to KafkaSourceEnumerator for consumer group {}.", Integer.valueOf(i), this.consumerGroupId);
        assignPendingPartitionSplits();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public KafkaSourceEnumState m4snapshotState() throws Exception {
        return new KafkaSourceEnumState(this.readerIdToSplitAssignments);
    }

    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    private PartitionSplitChange discoverAndInitializePartitionSplit() {
        KafkaSubscriber.PartitionChange partitionChanges = this.subscriber.getPartitionChanges(this.adminClient, Collections.unmodifiableSet(this.discoveredPartitions));
        Set<TopicPartition> unmodifiableSet = Collections.unmodifiableSet(partitionChanges.getNewPartitions());
        OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = getOffsetsRetriever();
        Map<TopicPartition, Long> partitionOffsets = this.startingOffsetInitializer.getPartitionOffsets(unmodifiableSet, offsetsRetriever);
        Map<TopicPartition, Long> partitionOffsets2 = this.stoppingOffsetInitializer.getPartitionOffsets(unmodifiableSet, offsetsRetriever);
        HashSet hashSet = new HashSet(unmodifiableSet.size());
        for (TopicPartition topicPartition : unmodifiableSet) {
            Long l = partitionOffsets.get(topicPartition);
            hashSet.add(new KafkaPartitionSplit(topicPartition, l.longValue(), partitionOffsets2.getOrDefault(topicPartition, Long.MIN_VALUE).longValue()));
        }
        this.discoveredPartitions.addAll(unmodifiableSet);
        return new PartitionSplitChange(hashSet, partitionChanges.getRemovedPartitions());
    }

    private void handlePartitionSplitChanges(PartitionSplitChange partitionSplitChange, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to handle partition splits change due to ", th);
        }
        if (this.partitionDiscoveryIntervalMs < 0) {
            LOG.debug("");
            this.noMoreNewPartitionSplits = true;
        }
        addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
        assignPendingPartitionSplits();
    }

    private void addPartitionSplitChangeToPendingAssignments(Collection<KafkaPartitionSplit> collection) {
        int currentParallelism = this.context.currentParallelism();
        for (KafkaPartitionSplit kafkaPartitionSplit : collection) {
            this.pendingPartitionSplitAssignment.computeIfAbsent(Integer.valueOf(getSplitOwner(kafkaPartitionSplit.getTopicPartition(), currentParallelism)), num -> {
                return new HashSet();
            }).add(kafkaPartitionSplit);
        }
        LOG.debug("Assigned {} to {} readers of consumer group {}.", new Object[]{collection, Integer.valueOf(currentParallelism), this.consumerGroupId});
    }

    private void assignPendingPartitionSplits() {
        HashMap hashMap = new HashMap();
        this.pendingPartitionSplitAssignment.forEach((num, set) -> {
            if (set.isEmpty() || !this.context.registeredReaders().containsKey(num)) {
                return;
            }
            ((List) hashMap.computeIfAbsent(num, num -> {
                return new ArrayList();
            })).addAll(set);
        });
        if (hashMap.isEmpty()) {
            return;
        }
        LOG.info("Assigning splits to readers {}", hashMap);
        this.context.assignSplits(new SplitsAssignment(hashMap));
        hashMap.forEach((num2, list) -> {
            this.readerIdToSplitAssignments.computeIfAbsent(num2, num2 -> {
                return new HashSet();
            }).addAll(list);
            this.pendingPartitionSplitAssignment.remove(num2);
            if (this.noMoreNewPartitionSplits) {
                LOG.debug("No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers in consumer group {}.", this.consumerGroupId);
                this.context.signalNoMoreSplits(num2.intValue());
            }
        });
    }

    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties properties = new Properties();
        deepCopyProperties(this.properties, properties);
        properties.setProperty("client.id", properties.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()) + "-enumerator-consumer");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
        return new KafkaConsumer<>(properties);
    }

    private AdminClient getKafkaAdminClient() {
        Properties properties = new Properties();
        deepCopyProperties(this.properties, properties);
        properties.setProperty("client.id", properties.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()) + "-enumerator-admin-client");
        return AdminClient.create(properties);
    }

    private OffsetsInitializer.PartitionOffsetsRetriever getOffsetsRetriever() {
        return new PartitionOffsetsRetrieverImpl(this.consumer, this.adminClient, this.properties.getProperty("group.id"));
    }

    @VisibleForTesting
    static int getSplitOwner(TopicPartition topicPartition, int i) {
        return ((((topicPartition.topic().hashCode() * 31) & FetchRequest.DEFAULT_RESPONSE_MAX_BYTES) % i) + topicPartition.partition()) % i;
    }

    @VisibleForTesting
    static void deepCopyProperties(Properties properties, Properties properties2) {
        for (String str : properties.stringPropertyNames()) {
            properties2.setProperty(str, properties.getProperty(str));
        }
    }
}
