package org.apache.flink.connector.pulsar.source.enumerator.assigner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.class */
public class SplitAssignerImpl implements SplitAssigner {
    private final StopCursor stopCursor;
    private final boolean enablePartitionDiscovery;
    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
    private final Set<TopicPartition> appendedPartitions;
    private final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
    private boolean initialized = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplitAssignerImpl(StopCursor stopCursor, boolean z, SplitEnumeratorContext<PulsarPartitionSplit> splitEnumeratorContext, PulsarSourceEnumState pulsarSourceEnumState) {
        this.stopCursor = stopCursor;
        this.enablePartitionDiscovery = z;
        this.context = splitEnumeratorContext;
        this.appendedPartitions = pulsarSourceEnumState.getAppendedPartitions();
        this.pendingPartitionSplits = new HashMap(splitEnumeratorContext.currentParallelism());
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> set) {
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : set) {
            if (!this.appendedPartitions.contains(topicPartition)) {
                this.appendedPartitions.add(topicPartition);
                arrayList.add(topicPartition);
                addSplitToPendingList(partitionOwner(topicPartition), new PulsarPartitionSplit(topicPartition, this.stopCursor));
            }
        }
        if (!this.initialized) {
            this.initialized = true;
        }
        return arrayList;
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public void addSplitsBack(List<PulsarPartitionSplit> list, int i) {
        for (PulsarPartitionSplit pulsarPartitionSplit : list) {
            addSplitToPendingList(partitionOwner(pulsarPartitionSplit.getPartition()), pulsarPartitionSplit);
        }
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> list) {
        if (this.pendingPartitionSplits.isEmpty() || list.isEmpty()) {
            return Optional.empty();
        }
        HashMap hashMap = new HashMap(this.pendingPartitionSplits.size());
        for (Integer num : list) {
            Set<PulsarPartitionSplit> remove = this.pendingPartitionSplits.remove(num);
            if (remove != null && !remove.isEmpty()) {
                hashMap.put(num, new ArrayList(remove));
            }
        }
        return hashMap.isEmpty() ? Optional.empty() : Optional.of(new SplitsAssignment(hashMap));
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public boolean noMoreSplits(Integer num) {
        return (this.enablePartitionDiscovery || !this.initialized || this.pendingPartitionSplits.containsKey(num)) ? false : true;
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public PulsarSourceEnumState snapshotState() {
        return new PulsarSourceEnumState(this.appendedPartitions);
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public long getUnassignedSplitCount() {
        return this.pendingPartitionSplits.values().stream().mapToLong((v0) -> {
            return v0.size();
        }).sum();
    }

    private void addSplitToPendingList(int i, PulsarPartitionSplit pulsarPartitionSplit) {
        this.pendingPartitionSplits.computeIfAbsent(Integer.valueOf(i), num -> {
            return new HashSet();
        }).add(pulsarPartitionSplit);
    }

    private int partitionOwner(TopicPartition topicPartition) {
        return calculatePartitionOwner(topicPartition.getTopic(), topicPartition.getPartitionId(), this.context.currentParallelism());
    }

    @VisibleForTesting
    static int calculatePartitionOwner(String str, int i, int i2) {
        return Math.floorMod((((str.hashCode() * 31) & Integer.MAX_VALUE) % i2) + i, i2);
    }
}
