package org.apache.flink.connector.pulsar.source.enumerator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.class */
public class PulsarSourceEnumerator implements SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
    private final PulsarClient pulsarClient;
    private final PulsarSubscriber subscriber;
    private final StartCursor startCursor;
    private final RangeGenerator rangeGenerator;
    private final SourceConfiguration sourceConfiguration;
    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
    private final SplitAssigner splitAssigner;
    private final SplitEnumeratorMetricGroup metricGroup;

    public PulsarSourceEnumerator(PulsarSubscriber pulsarSubscriber, StartCursor startCursor, StopCursor stopCursor, RangeGenerator rangeGenerator, SourceConfiguration sourceConfiguration, SplitEnumeratorContext<PulsarPartitionSplit> splitEnumeratorContext) throws PulsarClientException {
        this(pulsarSubscriber, startCursor, stopCursor, rangeGenerator, sourceConfiguration, splitEnumeratorContext, PulsarSourceEnumState.initialState());
    }

    public PulsarSourceEnumerator(PulsarSubscriber pulsarSubscriber, StartCursor startCursor, StopCursor stopCursor, RangeGenerator rangeGenerator, SourceConfiguration sourceConfiguration, SplitEnumeratorContext<PulsarPartitionSplit> splitEnumeratorContext, PulsarSourceEnumState pulsarSourceEnumState) throws PulsarClientException {
        this.pulsarClient = PulsarClientFactory.createClient(sourceConfiguration);
        this.subscriber = pulsarSubscriber;
        this.startCursor = startCursor;
        this.rangeGenerator = rangeGenerator;
        this.sourceConfiguration = sourceConfiguration;
        this.context = splitEnumeratorContext;
        this.splitAssigner = SplitAssigner.createAssigner(stopCursor, sourceConfiguration, splitEnumeratorContext, pulsarSourceEnumState);
        this.metricGroup = splitEnumeratorContext.metricGroup();
    }

    public void start() {
        this.subscriber.open(this.pulsarClient);
        this.rangeGenerator.open(this.sourceConfiguration);
        if (this.metricGroup != null) {
            SplitEnumeratorMetricGroup splitEnumeratorMetricGroup = this.metricGroup;
            SplitAssigner splitAssigner = this.splitAssigner;
            Objects.requireNonNull(splitAssigner);
            splitEnumeratorMetricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);
        }
        if (this.sourceConfiguration.isEnablePartitionDiscovery()) {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} with partition discovery interval of {} ms.", this.sourceConfiguration.getSubscriptionDesc(), Long.valueOf(this.sourceConfiguration.getPartitionDiscoveryIntervalMs()));
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0L, this.sourceConfiguration.getPartitionDiscoveryIntervalMs());
        } else {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} without periodic partition discovery.", this.sourceConfiguration.getSubscriptionDesc());
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<PulsarPartitionSplit> list, int i) {
        this.splitAssigner.addSplitsBack(list, i);
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            LOG.debug("Reader {} has been restarted after crashing, we will put splits back to it.", Integer.valueOf(i));
            assignPendingPartitionSplits(new ArrayList(this.context.registeredReaders().keySet()));
        }
    }

    public void addReader(int i) {
        LOG.debug("Adding reader {} to PulsarSourceEnumerator for subscription {}.", Integer.valueOf(i), this.sourceConfiguration.getSubscriptionDesc());
        assignPendingPartitionSplits(Collections.singletonList(Integer.valueOf(i)));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PulsarSourceEnumState m24snapshotState(long j) {
        return this.splitAssigner.snapshotState();
    }

    public void close() throws PulsarClientException {
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
    }

    private Set<TopicPartition> getSubscribedTopicPartitions() throws Exception {
        return this.subscriber.getSubscribedTopicPartitions(this.rangeGenerator, this.context.currentParallelism());
    }

    private void checkPartitionChanges(Set<TopicPartition> set, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to list subscribed topic partitions due to: " + th.getMessage(), th);
        }
        for (TopicPartition topicPartition : this.splitAssigner.registerTopicPartitions(set)) {
            String fullTopicName = topicPartition.getFullTopicName();
            String subscriptionName = this.sourceConfiguration.getSubscriptionName();
            try {
                this.startCursor.position(topicPartition.getTopic(), topicPartition.getPartitionId()).setupSubPosition(this.pulsarClient, fullTopicName, subscriptionName);
            } catch (PulsarClientException e) {
                throw new FlinkRuntimeException(e);
            }
        }
        assignPendingPartitionSplits(new ArrayList(this.context.registeredReaders().keySet()));
    }

    private void assignPendingPartitionSplits(List<Integer> list) {
        if (list.isEmpty()) {
            return;
        }
        list.forEach(num -> {
            if (!this.context.registeredReaders().containsKey(num)) {
                throw new IllegalStateException("Reader " + num + " is not registered to source coordinator");
            }
        });
        this.splitAssigner.createAssignment(list).ifPresent(splitsAssignment -> {
            LOG.info("The split assignment results are: {}", splitsAssignment.assignment());
            this.context.assignSplits(splitsAssignment);
        });
        for (Integer num2 : list) {
            if (this.splitAssigner.noMoreSplits(num2)) {
                LOG.debug("No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in subscription {}.", num2, this.sourceConfiguration.getSubscriptionDesc());
                this.context.signalNoMoreSplits(num2.intValue());
            }
        }
    }
}
