package org.apache.flink.connector.file.src.assigners;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/file/src/assigners/LocalityAwareSplitAssigner.class */
public class LocalityAwareSplitAssigner implements FileSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(LocalityAwareSplitAssigner.class);
    private final HashSet<SplitWithInfo> unassigned = new HashSet<>();
    private final HashMap<String, LocatableSplitChooser> localPerHost = new HashMap<>();
    private final LocatableSplitChooser remoteSplitChooser;
    private final SimpleCounter localAssignments;
    private final SimpleCounter remoteAssignments;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/src/assigners/LocalityAwareSplitAssigner$LocatableSplitChooser.class */
    public static class LocatableSplitChooser {
        private final LinkedList<SplitWithInfo> splits = new LinkedList<>();
        private int minLocalCount = -1;
        private int nextMinLocalCount = -1;
        private int elementCycleCount = 0;

        LocatableSplitChooser() {
        }

        LocatableSplitChooser(Collection<SplitWithInfo> collection) {
            Iterator<SplitWithInfo> it = collection.iterator();
            while (it.hasNext()) {
                addInputSplit(it.next());
            }
        }

        void addInputSplit(SplitWithInfo splitWithInfo) {
            int localCount = splitWithInfo.getLocalCount();
            if (this.minLocalCount == -1) {
                this.minLocalCount = localCount;
                this.elementCycleCount = 1;
                this.splits.offerFirst(splitWithInfo);
            } else {
                if (localCount < this.minLocalCount) {
                    this.nextMinLocalCount = this.minLocalCount;
                    this.minLocalCount = localCount;
                    this.elementCycleCount = 1;
                    this.splits.offerFirst(splitWithInfo);
                    return;
                }
                if (localCount == this.minLocalCount) {
                    this.elementCycleCount++;
                    this.splits.offerFirst(splitWithInfo);
                } else {
                    if (localCount < this.nextMinLocalCount) {
                        this.nextMinLocalCount = localCount;
                    }
                    this.splits.offerLast(splitWithInfo);
                }
            }
        }

        @Nullable
        SplitWithInfo getNextUnassignedMinLocalCountSplit(Set<SplitWithInfo> set) {
            if (this.splits.size() == 0) {
                return null;
            }
            do {
                this.elementCycleCount--;
                SplitWithInfo pollFirst = this.splits.pollFirst();
                if (!set.contains(pollFirst)) {
                    pollFirst = null;
                } else if (pollFirst.getLocalCount() > this.minLocalCount) {
                    this.splits.offerLast(pollFirst);
                    if (this.nextMinLocalCount == -1 || pollFirst.getLocalCount() < this.nextMinLocalCount) {
                        this.nextMinLocalCount = pollFirst.getLocalCount();
                    }
                    pollFirst = null;
                }
                if (this.elementCycleCount == 0) {
                    this.minLocalCount = this.nextMinLocalCount;
                    this.nextMinLocalCount = -1;
                    this.elementCycleCount = this.splits.size();
                }
                if (pollFirst != null) {
                    return pollFirst;
                }
            } while (this.elementCycleCount > 0);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/src/assigners/LocalityAwareSplitAssigner$SplitWithInfo.class */
    public static class SplitWithInfo {
        private final FileSourceSplit split;
        private final String[] normalizedHosts;
        private int localCount = 0;

        public SplitWithInfo(FileSourceSplit fileSourceSplit) {
            this.split = fileSourceSplit;
            this.normalizedHosts = LocalityAwareSplitAssigner.normalizeHostNames(fileSourceSplit.hostnames());
        }

        public void incrementLocalCount() {
            this.localCount++;
        }

        public int getLocalCount() {
            return this.localCount;
        }

        public FileSourceSplit getSplit() {
            return this.split;
        }

        public String[] getNormalizedHosts() {
            return this.normalizedHosts;
        }
    }

    public LocalityAwareSplitAssigner(Collection<FileSourceSplit> collection) {
        Iterator<FileSourceSplit> it = collection.iterator();
        while (it.hasNext()) {
            this.unassigned.add(new SplitWithInfo(it.next()));
        }
        this.remoteSplitChooser = new LocatableSplitChooser(this.unassigned);
        this.localAssignments = new SimpleCounter();
        this.remoteAssignments = new SimpleCounter();
    }

    @Override // org.apache.flink.connector.file.src.assigners.FileSplitAssigner
    public Optional<FileSourceSplit> getNext(@Nullable String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            Optional<FileSourceSplit> remoteSplit = getRemoteSplit();
            if (remoteSplit.isPresent()) {
                LOG.info("Assigning split to non-localized request: {}", remoteSplit);
            }
            return remoteSplit;
        }
        String normalizeHostName = normalizeHostName(str);
        SplitWithInfo nextUnassignedMinLocalCountSplit = this.localPerHost.computeIfAbsent(normalizeHostName, str2 -> {
            return buildChooserForHost(str2, this.unassigned);
        }).getNextUnassignedMinLocalCountSplit(this.unassigned);
        if (nextUnassignedMinLocalCountSplit != null) {
            Preconditions.checkState(this.unassigned.remove(nextUnassignedMinLocalCountSplit), "Selected split has already been assigned. This should not happen!");
            LOG.info("Assigning local split to requesting host '{}': {}", normalizeHostName, nextUnassignedMinLocalCountSplit.getSplit());
            this.localAssignments.inc();
            return Optional.of(nextUnassignedMinLocalCountSplit.getSplit());
        }
        Optional<FileSourceSplit> remoteSplit2 = getRemoteSplit();
        if (remoteSplit2.isPresent()) {
            LOG.info("Assigning remote split to requesting host '{}': {}", normalizeHostName, remoteSplit2);
        }
        return remoteSplit2;
    }

    @Override // org.apache.flink.connector.file.src.assigners.FileSplitAssigner
    public void addSplits(Collection<FileSourceSplit> collection) {
        Iterator<FileSourceSplit> it = collection.iterator();
        while (it.hasNext()) {
            SplitWithInfo splitWithInfo = new SplitWithInfo(it.next());
            this.remoteSplitChooser.addInputSplit(splitWithInfo);
            this.unassigned.add(splitWithInfo);
        }
    }

    @Override // org.apache.flink.connector.file.src.assigners.FileSplitAssigner
    public Collection<FileSourceSplit> remainingSplits() {
        return (Collection) this.unassigned.stream().map((v0) -> {
            return v0.getSplit();
        }).collect(Collectors.toList());
    }

    private Optional<FileSourceSplit> getRemoteSplit() {
        SplitWithInfo nextUnassignedMinLocalCountSplit = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned);
        if (nextUnassignedMinLocalCountSplit == null) {
            return Optional.empty();
        }
        Preconditions.checkState(this.unassigned.remove(nextUnassignedMinLocalCountSplit), "Selected split has already been assigned. This should not happen!");
        this.remoteAssignments.inc();
        return Optional.of(nextUnassignedMinLocalCountSplit.getSplit());
    }

    @VisibleForTesting
    int getNumberOfLocalAssignments() {
        return MathUtils.checkedDownCast(this.localAssignments.getCount());
    }

    @VisibleForTesting
    int getNumberOfRemoteAssignments() {
        return MathUtils.checkedDownCast(this.remoteAssignments.getCount());
    }

    static String normalizeHostName(String str) {
        if (str == null) {
            return null;
        }
        return NetUtils.getHostnameFromFQDN(str).toLowerCase(Locale.US);
    }

    static String[] normalizeHostNames(String[] strArr) {
        if (strArr == null) {
            return null;
        }
        String[] strArr2 = new String[strArr.length];
        boolean z = false;
        for (int i = 0; i < strArr.length; i++) {
            String str = strArr[i];
            String normalizeHostName = normalizeHostName(str);
            strArr2[i] = normalizeHostName;
            z |= str != normalizeHostName;
        }
        return z ? strArr2 : strArr;
    }

    private static boolean isLocal(String str, String[] strArr) {
        if (str == null || strArr == null) {
            return false;
        }
        for (String str2 : strArr) {
            if (str2 != null && str2.equals(str)) {
                return true;
            }
        }
        return false;
    }

    private static LocatableSplitChooser buildChooserForHost(String str, Set<SplitWithInfo> set) {
        LocatableSplitChooser locatableSplitChooser = new LocatableSplitChooser();
        for (SplitWithInfo splitWithInfo : set) {
            if (isLocal(str, splitWithInfo.getNormalizedHosts())) {
                splitWithInfo.incrementLocalCount();
                locatableSplitChooser.addInputSplit(splitWithInfo);
            }
        }
        return locatableSplitChooser;
    }
}
