package org.apache.flink.connector.kafka.dynamic.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
import org.apache.flink.connector.kafka.dynamic.source.GetMetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.MapDifference;
import org.apache.flink.shaded.guava31.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.class */
public class DynamicKafkaSourceEnumerator implements SplitEnumerator<DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceEnumerator.class);
    private final Map<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>> clusterEnumeratorMap;
    private final Map<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
    private final KafkaStreamSubscriber kafkaStreamSubscriber;
    private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
    private final KafkaMetadataService kafkaMetadataService;
    private final Properties properties;
    private final OffsetsInitializer startingOffsetsInitializer;
    private final OffsetsInitializer stoppingOffsetInitializer;
    private final Boundedness boundedness;
    private final StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory stoppableKafkaEnumContextProxyFactory;
    private final long kafkaMetadataServiceDiscoveryIntervalMs;
    private final int kafkaMetadataServiceDiscoveryFailureThreshold;
    private int kafkaMetadataServiceDiscoveryFailureCount;
    private Map<String, Set<String>> latestClusterTopicsMap;
    private Set<KafkaStream> latestKafkaStreams;
    private boolean firstDiscoveryComplete;

    public DynamicKafkaSourceEnumerator(KafkaStreamSubscriber kafkaStreamSubscriber, KafkaMetadataService kafkaMetadataService, SplitEnumeratorContext<DynamicKafkaSourceSplit> splitEnumeratorContext, OffsetsInitializer offsetsInitializer, OffsetsInitializer offsetsInitializer2, Properties properties, Boundedness boundedness, DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState) {
        this(kafkaStreamSubscriber, kafkaMetadataService, splitEnumeratorContext, offsetsInitializer, offsetsInitializer2, properties, boundedness, dynamicKafkaSourceEnumState, StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory.getDefaultFactory());
    }

    @VisibleForTesting
    DynamicKafkaSourceEnumerator(KafkaStreamSubscriber kafkaStreamSubscriber, KafkaMetadataService kafkaMetadataService, SplitEnumeratorContext<DynamicKafkaSourceSplit> splitEnumeratorContext, OffsetsInitializer offsetsInitializer, OffsetsInitializer offsetsInitializer2, Properties properties, Boundedness boundedness, DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState, StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory stoppableKafkaEnumContextProxyFactory) {
        this.kafkaStreamSubscriber = kafkaStreamSubscriber;
        this.boundedness = boundedness;
        this.startingOffsetsInitializer = offsetsInitializer;
        this.stoppingOffsetInitializer = offsetsInitializer2;
        this.properties = properties;
        this.enumContext = splitEnumeratorContext;
        this.kafkaMetadataServiceDiscoveryIntervalMs = ((Long) DynamicKafkaSourceOptions.getOption(properties, DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS, Long::parseLong)).longValue();
        this.kafkaMetadataServiceDiscoveryFailureThreshold = ((Integer) DynamicKafkaSourceOptions.getOption(properties, DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD, Integer::parseInt)).intValue();
        this.kafkaMetadataServiceDiscoveryFailureCount = 0;
        this.firstDiscoveryComplete = false;
        this.kafkaMetadataService = kafkaMetadataService;
        this.stoppableKafkaEnumContextProxyFactory = stoppableKafkaEnumContextProxyFactory;
        this.clusterEnumeratorMap = new HashMap();
        this.clusterEnumContextMap = new HashMap();
        this.latestKafkaStreams = dynamicKafkaSourceEnumState.getKafkaStreams();
        HashMap hashMap = new HashMap();
        Iterator<KafkaStream> it = this.latestKafkaStreams.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, ClusterMetadata> entry : it.next().getClusterMetadataMap().entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().getProperties());
            }
        }
        this.latestClusterTopicsMap = new HashMap();
        for (Map.Entry<String, KafkaSourceEnumState> entry2 : dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
            this.latestClusterTopicsMap.put(entry2.getKey(), (Set) entry2.getValue().assignedPartitions().stream().map((v0) -> {
                return v0.topic();
            }).collect(Collectors.toSet()));
            createEnumeratorWithAssignedTopicPartitions(entry2.getKey(), this.latestClusterTopicsMap.get(entry2.getKey()), entry2.getValue(), (Properties) hashMap.get(entry2.getKey()));
        }
    }

    public void start() {
        if (!this.clusterEnumeratorMap.isEmpty()) {
            startAllEnumerators();
        }
        if (this.kafkaMetadataServiceDiscoveryIntervalMs <= 0) {
            this.enumContext.callAsync(() -> {
                return this.kafkaStreamSubscriber.getSubscribedStreams(this.kafkaMetadataService);
            }, this::onHandleSubscribedStreamsFetch);
        } else {
            this.enumContext.callAsync(() -> {
                return this.kafkaStreamSubscriber.getSubscribedStreams(this.kafkaMetadataService);
            }, this::onHandleSubscribedStreamsFetch, 0L, this.kafkaMetadataServiceDiscoveryIntervalMs);
        }
    }

    private void handleNoMoreSplits() {
        if (Boundedness.BOUNDED.equals(this.boundedness)) {
            boolean z = true;
            Iterator<StoppableKafkaEnumContextProxy> it = this.clusterEnumContextMap.values().iterator();
            while (it.hasNext()) {
                z = z && it.next().isNoMoreSplits();
            }
            if (!this.firstDiscoveryComplete || !z) {
                logger.info("Not ready to notify no more splits to readers.");
                return;
            }
            logger.info("Signal no more splits to all readers: {}", this.enumContext.registeredReaders().keySet());
            Set keySet = this.enumContext.registeredReaders().keySet();
            SplitEnumeratorContext<DynamicKafkaSourceSplit> splitEnumeratorContext = this.enumContext;
            Objects.requireNonNull(splitEnumeratorContext);
            keySet.forEach((v1) -> {
                r1.signalNoMoreSplits(v1);
            });
        }
    }

    private void onHandleSubscribedStreamsFetch(Set<KafkaStream> set, Throwable th) {
        KafkaSourceEnumState kafkaSourceEnumState;
        this.firstDiscoveryComplete = true;
        Set<KafkaStream> handleFetchSubscribedStreamsError = handleFetchSubscribedStreamsError(set, th);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator<KafkaStream> it = handleFetchSubscribedStreamsError.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, ClusterMetadata> entry : it.next().getClusterMetadataMap().entrySet()) {
                String key = entry.getKey();
                ClusterMetadata value = entry.getValue();
                ((Set) hashMap.computeIfAbsent(key, str -> {
                    return new HashSet();
                })).addAll(value.getTopics());
                hashMap2.put(key, value.getProperties());
            }
        }
        if (this.latestClusterTopicsMap.equals(hashMap)) {
            return;
        }
        if (logger.isInfoEnabled()) {
            MapDifference difference = Maps.difference(this.latestClusterTopicsMap, hashMap);
            logger.info("Common cluster topics after metadata refresh: {}", difference.entriesInCommon());
            logger.info("Removed cluster topics after metadata refresh: {}", difference.entriesOnlyOnLeft());
            logger.info("Additional cluster topics after metadata refresh: {}", difference.entriesOnlyOnRight());
        }
        try {
            DynamicKafkaSourceEnumState m4snapshotState = m4snapshotState(-1L);
            logger.info("Closing enumerators due to metadata change");
            closeAllEnumeratorsAndContexts();
            this.latestClusterTopicsMap = hashMap;
            this.latestKafkaStreams = handleFetchSubscribedStreamsError;
            sendMetadataUpdateEventToAvailableReaders();
            for (Map.Entry<String, Set<String>> entry2 : this.latestClusterTopicsMap.entrySet()) {
                KafkaSourceEnumState kafkaSourceEnumState2 = m4snapshotState.getClusterEnumeratorStates().get(entry2.getKey());
                if (kafkaSourceEnumState2 != null) {
                    Set<String> value2 = entry2.getValue();
                    kafkaSourceEnumState = new KafkaSourceEnumState((Set) kafkaSourceEnumState2.partitions().stream().filter(topicPartitionAndAssignmentStatus -> {
                        return value2.contains(topicPartitionAndAssignmentStatus.topicPartition().topic());
                    }).collect(Collectors.toSet()), kafkaSourceEnumState2.initialDiscoveryFinished(), kafkaSourceEnumState2.noMoreNewPartitionSplits());
                } else {
                    kafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false, false);
                }
                createEnumeratorWithAssignedTopicPartitions(entry2.getKey(), entry2.getValue(), kafkaSourceEnumState, (Properties) hashMap2.get(entry2.getKey()));
            }
            startAllEnumerators();
        } catch (Exception e) {
            throw new RuntimeException("unable to snapshot state in metadata change", e);
        }
    }

    private Set<KafkaStream> handleFetchSubscribedStreamsError(Set<KafkaStream> set, @Nullable Throwable th) {
        if (th == null) {
            this.kafkaMetadataServiceDiscoveryFailureCount = 0;
            return set;
        }
        if (!this.latestKafkaStreams.isEmpty()) {
            int i = this.kafkaMetadataServiceDiscoveryFailureCount + 1;
            this.kafkaMetadataServiceDiscoveryFailureCount = i;
            if (i <= this.kafkaMetadataServiceDiscoveryFailureThreshold) {
                logger.warn("Swallowing metadata service error", th);
                return this.latestKafkaStreams;
            }
        }
        throw new RuntimeException("Fetching subscribed Kafka streams failed and no metadata to fallback", th);
    }

    private void sendMetadataUpdateEventToAvailableReaders() {
        Iterator it = this.enumContext.registeredReaders().keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            MetadataUpdateEvent metadataUpdateEvent = new MetadataUpdateEvent(this.latestKafkaStreams);
            logger.info("sending metadata update to reader {}: {}", Integer.valueOf(intValue), metadataUpdateEvent);
            this.enumContext.sendEventToSourceReader(intValue, metadataUpdateEvent);
        }
    }

    private KafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions(String str, Set<String> set, KafkaSourceEnumState kafkaSourceEnumState, Properties properties) {
        StoppableKafkaEnumContextProxy create = this.stoppableKafkaEnumContextProxyFactory.create(this.enumContext, str, this.kafkaMetadataService, Boundedness.BOUNDED.equals(this.boundedness) ? this::handleNoMoreSplits : null);
        Properties properties2 = new Properties();
        KafkaPropertiesUtil.copyProperties(properties, properties2);
        KafkaPropertiesUtil.copyProperties(this.properties, properties2);
        KafkaPropertiesUtil.setClientIdPrefix(properties2, str);
        KafkaSourceEnumerator kafkaSourceEnumerator = new KafkaSourceEnumerator(KafkaSubscriber.getTopicListSubscriber(new ArrayList(set)), this.startingOffsetsInitializer, this.stoppingOffsetInitializer, properties2, create, this.boundedness, kafkaSourceEnumState, -1L);
        this.clusterEnumContextMap.put(str, create);
        this.clusterEnumeratorMap.put(str, kafkaSourceEnumerator);
        return kafkaSourceEnumerator;
    }

    private void startAllEnumerators() {
        for (String str : this.latestClusterTopicsMap.keySet()) {
            try {
                this.clusterEnumeratorMap.get(str).start();
            } catch (KafkaException e) {
                if (this.kafkaMetadataService.isClusterActive(str)) {
                    throw new RuntimeException(String.format("Failed to create enumerator for %s", str), e);
                }
                logger.info("Found inactive cluster {} while initializing, removing enumerator", str, e);
                try {
                    this.clusterEnumContextMap.remove(str).close();
                    this.clusterEnumeratorMap.remove(str).close();
                } catch (Exception e2) {
                    throw new RuntimeException("Failed to close enum context for " + str, e2);
                }
            }
        }
    }

    private void closeAllEnumeratorsAndContexts() {
        this.clusterEnumeratorMap.forEach((str, splitEnumerator) -> {
            try {
                this.clusterEnumContextMap.get(str).close();
                splitEnumerator.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.clusterEnumContextMap.clear();
        this.clusterEnumeratorMap.clear();
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        throw new UnsupportedOperationException("Kafka enumerators only assign splits to readers.");
    }

    public void addSplitsBack(List<DynamicKafkaSourceSplit> list, int i) {
        logger.debug("Adding splits back for {}", Integer.valueOf(i));
        ArrayListMultimap create = ArrayListMultimap.create();
        for (DynamicKafkaSourceSplit dynamicKafkaSourceSplit : list) {
            create.put(dynamicKafkaSourceSplit.getKafkaClusterId(), dynamicKafkaSourceSplit.getKafkaPartitionSplit());
        }
        for (String str : create.keySet()) {
            if (this.clusterEnumeratorMap.containsKey(str)) {
                this.clusterEnumeratorMap.get(str).addSplitsBack(create.get(str), i);
            } else {
                logger.warn("Split refers to inactive cluster {} with current clusters being {}", str, this.clusterEnumeratorMap.keySet());
            }
        }
        handleNoMoreSplits();
    }

    public void addReader(int i) {
        logger.debug("Adding reader {}", Integer.valueOf(i));
        this.clusterEnumeratorMap.forEach((str, splitEnumerator) -> {
            splitEnumerator.addReader(i);
        });
        handleNoMoreSplits();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public DynamicKafkaSourceEnumState m4snapshotState(long j) throws Exception {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>> entry : this.clusterEnumeratorMap.entrySet()) {
            hashMap.put(entry.getKey(), (KafkaSourceEnumState) entry.getValue().snapshotState(j));
        }
        return new DynamicKafkaSourceEnumState(this.latestKafkaStreams, hashMap);
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        Preconditions.checkArgument(sourceEvent instanceof GetMetadataUpdateEvent, "Received invalid source event: " + sourceEvent);
        if (!this.enumContext.registeredReaders().containsKey(Integer.valueOf(i))) {
            logger.warn("Got get metadata update but subtask was unavailable");
            return;
        }
        MetadataUpdateEvent metadataUpdateEvent = new MetadataUpdateEvent(this.latestKafkaStreams);
        logger.info("sending metadata update to reader {}: {}", Integer.valueOf(i), metadataUpdateEvent);
        this.enumContext.sendEventToSourceReader(i, metadataUpdateEvent);
    }

    public void close() throws IOException {
        try {
            Iterator<StoppableKafkaEnumContextProxy> it = this.clusterEnumContextMap.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Iterator<Map.Entry<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>> it2 = this.clusterEnumeratorMap.entrySet().iterator();
            while (it2.hasNext()) {
                it2.next().getValue().close();
            }
            this.kafkaMetadataService.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
