package org.apache.flink.connector.base.source.hybrid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.class */
public class HybridSourceSplitEnumerator implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
    private final SplitEnumeratorContext<HybridSourceSplit> context;
    private final List<HybridSource.SourceListEntry> sources;
    private final SwitchedSources switchedSources = new SwitchedSources();
    private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits;
    private final Set<Integer> finishedReaders;
    private final Map<Integer, Integer> readerSourceIndex;
    private int currentSourceIndex;
    private HybridSourceEnumeratorState restoredEnumeratorState;
    private SplitEnumerator<SourceSplit, Object> currentEnumerator;
    private SimpleVersionedSerializer<Object> currentEnumeratorCheckpointSerializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator$SplitEnumeratorContextProxy.class */
    public static class SplitEnumeratorContextProxy<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
        private static final Logger LOG = LoggerFactory.getLogger(SplitEnumeratorContextProxy.class);
        private final SplitEnumeratorContext<HybridSourceSplit> realContext;
        private final int sourceIndex;
        private final Map<Integer, Integer> readerSourceIndex;
        private final SwitchedSources switchedSources;
        private final int sourceSize;

        private SplitEnumeratorContextProxy(int i, SplitEnumeratorContext<HybridSourceSplit> splitEnumeratorContext, Map<Integer, Integer> map, SwitchedSources switchedSources, int i2) {
            this.realContext = splitEnumeratorContext;
            this.sourceIndex = i;
            this.readerSourceIndex = map;
            this.switchedSources = switchedSources;
            this.sourceSize = i2;
        }

        public SplitEnumeratorMetricGroup metricGroup() {
            return this.realContext.metricGroup();
        }

        public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
            this.realContext.sendEventToSourceReader(i, sourceEvent);
        }

        public int currentParallelism() {
            return this.realContext.currentParallelism();
        }

        public Map<Integer, ReaderInfo> registeredReaders() {
            Map<Integer, ReaderInfo> registeredReaders = this.realContext.registeredReaders();
            if (registeredReaders.size() != this.readerSourceIndex.size()) {
                return filterRegisteredReaders(registeredReaders);
            }
            Integer num = null;
            for (Integer num2 : this.readerSourceIndex.values()) {
                if (num != null && num != num2) {
                    return filterRegisteredReaders(registeredReaders);
                }
                num = num2;
            }
            return registeredReaders;
        }

        private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> map) {
            HashMap hashMap = new HashMap(map.size());
            for (Map.Entry<Integer, ReaderInfo> entry : map.entrySet()) {
                if (this.readerSourceIndex.get(entry.getKey()) == Integer.valueOf(this.sourceIndex)) {
                    hashMap.put(entry.getKey(), entry.getValue());
                }
            }
            return hashMap;
        }

        public void assignSplits(SplitsAssignment<SplitT> splitsAssignment) {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : splitsAssignment.assignment().entrySet()) {
                hashMap.put(entry.getKey(), HybridSourceSplit.wrapSplits((List) entry.getValue(), this.sourceIndex, this.switchedSources));
            }
            SplitsAssignment splitsAssignment2 = new SplitsAssignment(hashMap);
            LOG.debug("Assigning splits sourceIndex={} {}", Integer.valueOf(this.sourceIndex), splitsAssignment2);
            this.realContext.assignSplits(splitsAssignment2);
        }

        public void assignSplit(SplitT splitt, int i) {
            this.realContext.assignSplit(HybridSourceSplit.wrapSplit(splitt, this.sourceIndex, this.switchedSources), i);
        }

        public void signalNoMoreSplits(int i) {
            HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(this.realContext, i, this.sourceIndex, this.sourceSize);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
            this.realContext.callAsync(callable, biConsumer);
        }

        public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
            this.realContext.callAsync(callable, biConsumer, j, j2);
        }

        public void runInCoordinatorThread(Runnable runnable) {
            this.realContext.runInCoordinatorThread(runnable);
        }

        public void setIsProcessingBacklog(boolean z) {
            this.realContext.setIsProcessingBacklog(z);
        }

        public void signalBacklogIsOver(int i) {
            this.realContext.signalBacklogIsOver(i);
        }
    }

    public HybridSourceSplitEnumerator(SplitEnumeratorContext<HybridSourceSplit> splitEnumeratorContext, List<HybridSource.SourceListEntry> list, int i, HybridSourceEnumeratorState hybridSourceEnumeratorState) {
        Preconditions.checkArgument(i < list.size());
        this.context = splitEnumeratorContext;
        this.sources = list;
        this.currentSourceIndex = i;
        this.pendingSplits = new HashMap();
        this.finishedReaders = new HashSet();
        this.readerSourceIndex = new HashMap();
        this.restoredEnumeratorState = hybridSourceEnumeratorState;
    }

    public void start() {
        switchEnumerator();
    }

    public void handleSplitRequest(int i, String str) {
        LOG.debug("handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", new Object[]{Integer.valueOf(i), Integer.valueOf(this.currentSourceIndex), this.pendingSplits});
        Preconditions.checkState(this.pendingSplits.isEmpty() || !this.pendingSplits.containsKey(Integer.valueOf(i)));
        this.currentEnumerator.handleSplitRequest(i, str);
    }

    public void addSplitsBack(List<HybridSourceSplit> list, int i) {
        LOG.debug("Adding splits back for subtask={} splits={}", Integer.valueOf(i), list);
        TreeMap treeMap = new TreeMap();
        for (HybridSourceSplit hybridSourceSplit : list) {
            ((List) treeMap.computeIfAbsent(Integer.valueOf(hybridSourceSplit.sourceIndex()), num -> {
                return new ArrayList();
            })).add(hybridSourceSplit);
        }
        treeMap.forEach((num2, list2) -> {
            if (num2.intValue() == this.currentSourceIndex) {
                this.currentEnumerator.addSplitsBack(HybridSourceSplit.unwrapSplits(list2, this.switchedSources), i);
            } else {
                this.pendingSplits.computeIfAbsent(Integer.valueOf(i), num2 -> {
                    return new TreeMap();
                }).put(num2, list2);
            }
        });
    }

    public void addReader(int i) {
        LOG.debug("addReader subtaskId={}", Integer.valueOf(i));
        this.readerSourceIndex.remove(Integer.valueOf(i));
    }

    private void sendSwitchSourceEvent(int i, int i2) {
        this.readerSourceIndex.put(Integer.valueOf(i), Integer.valueOf(i2));
        this.context.sendEventToSourceReader(i, new SwitchSourceEvent(i2, this.switchedSources.sourceOf(i2), i2 >= this.sources.size() - 1));
        TreeMap<Integer, List<HybridSourceSplit>> treeMap = this.pendingSplits.get(Integer.valueOf(i));
        if (treeMap != null) {
            List<HybridSourceSplit> remove = treeMap.remove(Integer.valueOf(i2));
            if (remove != null && !remove.isEmpty()) {
                LOG.debug("Restoring splits to subtask={} {}", Integer.valueOf(i), remove);
                this.context.assignSplits(new SplitsAssignment(Collections.singletonMap(Integer.valueOf(i), remove)));
                checkAndSignalNoMoreSplits(this.context, i, i2, this.sources.size());
            }
            if (treeMap.isEmpty()) {
                this.pendingSplits.remove(Integer.valueOf(i));
            }
        }
        if (i2 == this.currentSourceIndex) {
            LOG.debug("adding reader subtask={} sourceIndex={}", Integer.valueOf(i), Integer.valueOf(this.currentSourceIndex));
            this.currentEnumerator.addReader(i);
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public HybridSourceEnumeratorState m10snapshotState(long j) throws Exception {
        return new HybridSourceEnumeratorState(this.currentSourceIndex, this.currentEnumeratorCheckpointSerializer.serialize(this.currentEnumerator.snapshotState(j)), this.currentEnumeratorCheckpointSerializer.getVersion());
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.currentEnumerator.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.currentEnumerator.notifyCheckpointAborted(j);
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        LOG.debug("handleSourceEvent {} subtask={} pendingSplits={}", new Object[]{sourceEvent, Integer.valueOf(i), this.pendingSplits});
        if (!(sourceEvent instanceof SourceReaderFinishedEvent)) {
            this.currentEnumerator.handleSourceEvent(i, sourceEvent);
            return;
        }
        SourceReaderFinishedEvent sourceReaderFinishedEvent = (SourceReaderFinishedEvent) sourceEvent;
        int intValue = this.readerSourceIndex.computeIfAbsent(Integer.valueOf(i), num -> {
            LOG.debug("New reader subtask={} sourceIndex={}", Integer.valueOf(i), Integer.valueOf(sourceReaderFinishedEvent.sourceIndex()));
            return Integer.valueOf(sourceReaderFinishedEvent.sourceIndex());
        }).intValue();
        if (sourceReaderFinishedEvent.sourceIndex() < intValue) {
            return;
        }
        if (intValue < this.currentSourceIndex) {
            sendSwitchSourceEvent(i, intValue == -1 ? this.switchedSources.getFirstSourceIndex() : intValue + 1);
            return;
        }
        this.finishedReaders.add(Integer.valueOf(i));
        if (this.finishedReaders.size() == this.context.currentParallelism()) {
            LOG.debug("All readers finished, ready to switch enumerator!");
            if (this.currentSourceIndex + 1 < this.sources.size()) {
                switchEnumerator();
                for (int i2 = 0; i2 < this.context.currentParallelism(); i2++) {
                    sendSwitchSourceEvent(i2, this.currentSourceIndex);
                }
            }
        }
    }

    public void close() throws IOException {
        if (this.currentEnumerator != null) {
            this.currentEnumerator.close();
        }
    }

    private void switchEnumerator() {
        final SplitEnumerator<SourceSplit, Object> splitEnumerator = this.currentEnumerator;
        if (this.currentEnumerator != null) {
            try {
                this.currentEnumerator.close();
                this.finishedReaders.clear();
                this.currentEnumerator = null;
                this.currentSourceIndex++;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        Source create = this.sources.get(this.currentSourceIndex).factory.create(new HybridSource.SourceSwitchContext<Object>() { // from class: org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.1
            @Override // org.apache.flink.connector.base.source.hybrid.HybridSource.SourceSwitchContext
            public Object getPreviousEnumerator() {
                return splitEnumerator;
            }
        });
        this.switchedSources.put(this.currentSourceIndex, create);
        this.currentEnumeratorCheckpointSerializer = create.getEnumeratorCheckpointSerializer();
        SplitEnumeratorContextProxy splitEnumeratorContextProxy = new SplitEnumeratorContextProxy(this.currentSourceIndex, this.context, this.readerSourceIndex, this.switchedSources, this.sources.size());
        try {
            if (this.restoredEnumeratorState == null) {
                this.currentEnumerator = create.createEnumerator(splitEnumeratorContextProxy);
            } else {
                LOG.info("Restoring enumerator for sourceIndex={}", Integer.valueOf(this.currentSourceIndex));
                this.currentEnumerator = create.restoreEnumerator(splitEnumeratorContextProxy, this.currentEnumeratorCheckpointSerializer.deserialize(this.restoredEnumeratorState.getWrappedStateSerializerVersion(), this.restoredEnumeratorState.getWrappedState()));
                this.restoredEnumeratorState = null;
            }
            LOG.info("Starting enumerator for sourceIndex={}", Integer.valueOf(this.currentSourceIndex));
            this.context.setIsProcessingBacklog(this.currentSourceIndex < this.sources.size() - 1);
            this.currentEnumerator.start();
        } catch (Exception e2) {
            throw new RuntimeException("Failed to create enumerator for sourceIndex=" + this.currentSourceIndex, e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void checkAndSignalNoMoreSplits(SplitEnumeratorContext<HybridSourceSplit> splitEnumeratorContext, int i, int i2, int i3) {
        Preconditions.checkState(splitEnumeratorContext instanceof SupportsIntermediateNoMoreSplits, "The split enumerator context %s must implement SupportsIntermediateNoMoreSplits to be used in hybrid source scenario.", new Object[]{splitEnumeratorContext.getClass().getCanonicalName()});
        if (i2 >= i3 - 1) {
            splitEnumeratorContext.signalNoMoreSplits(i);
        } else {
            ((SupportsIntermediateNoMoreSplits) splitEnumeratorContext).signalIntermediateNoMoreSplits(i);
        }
    }
}
