package com.huawei.hudi.hst.source.enumerator;

import com.huawei.hudi.hst.source.split.HoodieSourceSplit;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.hst.MetaKafka;
import org.apache.hudi.common.util.Option;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/hudi/hst/source/enumerator/HoodieSplitEnumerator.class */
public class HoodieSplitEnumerator<SplitT extends InputSplit> implements SplitEnumerator<HoodieSourceSplit<SplitT>, Collection<HoodieSourceSplit<SplitT>>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieSplitEnumerator.class);
    private final SplitEnumeratorContext<HoodieSourceSplit<SplitT>> context;
    private final HoodieSplitAssigner<SplitT> splitAssigner;

    @Nullable
    private final MetaKafka lastCommitKafkaMetadata;

    public HoodieSplitEnumerator(SplitEnumeratorContext<HoodieSourceSplit<SplitT>> splitEnumeratorContext, HoodieSplitAssigner<SplitT> hoodieSplitAssigner, @Nullable List<HoodieCommitMetadata> list, boolean z) {
        this.context = splitEnumeratorContext;
        this.splitAssigner = hoodieSplitAssigner;
        String str = list != null ? list.stream().filter(hoodieCommitMetadata -> {
            return !hoodieCommitMetadata.getCompacted().booleanValue();
        }).reduce((hoodieCommitMetadata2, hoodieCommitMetadata3) -> {
            return hoodieCommitMetadata3;
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Non-compaction commits metadata must not be empty.");
        }).getExtraMetadata().get(MetaKafka.JSON_FIELD_KEY) : null;
        if (str != null) {
            this.lastCommitKafkaMetadata = MetaKafka.fromString(str);
        } else {
            Preconditions.checkState(!z || hoodieSplitAssigner.remainingSplits().isEmpty(), "If there are non-empty splits Hudi source in hybrid mode requires Kafka offsets information stored in the last commit metadata.");
            this.lastCommitKafkaMetadata = null;
        }
    }

    public void start() {
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Subtask {} {} is requesting a file source split", Integer.valueOf(i), str == null ? "(no host locality info)" : "(on host '" + str + "')");
            }
            Option<HoodieSourceSplit<SplitT>> next = this.splitAssigner.getNext(str);
            if (!next.isPresent()) {
                this.context.signalNoMoreSplits(i);
                LOG.info("No more splits available for subtask {}", Integer.valueOf(i));
            } else {
                HoodieSourceSplit<SplitT> hoodieSourceSplit = next.get();
                this.context.assignSplit(hoodieSourceSplit, i);
                LOG.info("Assigned split to subtask {} : {}", Integer.valueOf(i), hoodieSourceSplit);
            }
        }
    }

    public void addSplitsBack(List<HoodieSourceSplit<SplitT>> list, int i) {
        LOG.debug("File Source Enumerator adds splits back: {}", list);
        this.splitAssigner.addSplits(list);
    }

    public void addReader(int i) {
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public Collection<HoodieSourceSplit<SplitT>> m108snapshotState(long j) throws Exception {
        return this.splitAssigner.remainingSplits();
    }

    public void close() throws IOException {
    }

    @Nullable
    public MetaKafka getLastCommitKafkaMetadata() {
        return this.lastCommitKafkaMetadata;
    }
}
