package org.apache.flink.runtime.source.coordinator;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.class */
public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SourceCoordinatorContext.class);
    private final ExecutorService coordinatorExecutor;
    private final ExecutorNotifier notifier;
    private final OperatorCoordinator.Context operatorCoordinatorContext;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
    private final SplitAssignmentTracker<SplitT> assignmentTracker;
    private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private final String coordinatorThreadName;

    public SourceCoordinatorContext(ExecutorService executorService, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, int i, OperatorCoordinator.Context context, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer) {
        this(executorService, coordinatorExecutorThreadFactory, i, context, simpleVersionedSerializer, new SplitAssignmentTracker());
    }

    SourceCoordinatorContext(ExecutorService executorService, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, int i, OperatorCoordinator.Context context, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
        this.coordinatorExecutor = executorService;
        this.coordinatorThreadFactory = coordinatorExecutorThreadFactory;
        this.operatorCoordinatorContext = context;
        this.splitSerializer = simpleVersionedSerializer;
        this.registeredReaders = new ConcurrentHashMap();
        this.assignmentTracker = splitAssignmentTracker;
        this.coordinatorThreadName = coordinatorExecutorThreadFactory.getCoordinatorThreadName();
        this.notifier = new ExecutorNotifier(Executors.newScheduledThreadPool(i, new ExecutorThreadFactory(this.coordinatorThreadName + "-worker")), runnable -> {
            executorService.execute(new ThrowableCatchingRunnable(this::handleUncaughtExceptionFromAsyncCall, runnable));
        });
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public MetricGroup metricGroup() {
        return null;
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        callInCoordinatorThread(() -> {
            try {
                this.operatorCoordinatorContext.sendEvent(new SourceEventWrapper(sourceEvent), i);
                return null;
            } catch (TaskNotRunningException e) {
                throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d", sourceEvent, Integer.valueOf(i)), e);
            }
        }, String.format("Failed to send event %s to subtask %d", sourceEvent, Integer.valueOf(i)));
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public int currentParallelism() {
        return this.operatorCoordinatorContext.currentParallelism();
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public Map<Integer, ReaderInfo> registeredReaders() {
        return Collections.unmodifiableMap(this.registeredReaders);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public void assignSplits(SplitsAssignment<SplitT> splitsAssignment) {
        callInCoordinatorThread(() -> {
            for (Integer num : splitsAssignment.assignment().keySet()) {
                if (!this.registeredReaders.containsKey(num)) {
                    throw new IllegalArgumentException(String.format("Cannot assign splits %s to subtask %d because the subtask is not registered.", this.registeredReaders.get(num), num));
                }
            }
            this.assignmentTracker.recordSplitAssignment(splitsAssignment);
            splitsAssignment.assignment().forEach((num2, list) -> {
                try {
                    this.operatorCoordinatorContext.sendEvent(new AddSplitEvent(list, this.splitSerializer), num2.intValue());
                } catch (IOException e) {
                    throw new FlinkRuntimeException("Failed to serialize splits.", e);
                } catch (TaskNotRunningException e2) {
                    throw new FlinkRuntimeException(String.format("Failed to assign splits %s to reader %d.", list, num2), e2);
                }
            });
            return null;
        }, String.format("Failed to assign splits %s due to ", splitsAssignment));
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public void signalNoMoreSplits(int i) {
        callInCoordinatorThread(() -> {
            try {
                this.operatorCoordinatorContext.sendEvent(new NoMoreSplitsEvent(), i);
                return null;
            } catch (TaskNotRunningException e) {
                throw new FlinkRuntimeException("Failed to send 'NoMoreSplits' to reader " + i, e);
            }
        }, "Failed to send 'NoMoreSplits' to reader " + i);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        this.notifier.notifyReadyAsync(callable, biConsumer, j, j2);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        this.notifier.notifyReadyAsync(callable, biConsumer);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumeratorContext
    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute(runnable);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.notifier.close();
        this.coordinatorExecutor.shutdown();
        this.coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failJob(Throwable th) {
        this.operatorCoordinatorContext.failJob(th);
    }

    void handleUncaughtExceptionFromAsyncCall(Throwable th) {
        ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
        LOG.error("Exception while handling result from async call in {}. Triggering job failover.", this.coordinatorThreadName, th);
        failJob(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void snapshotState(long j, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, DataOutputStream dataOutputStream) throws Exception {
        dataOutputStream.writeInt(0);
        this.assignmentTracker.snapshotState(j, simpleVersionedSerializer, dataOutputStream);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restoreState(SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, DataInputStream dataInputStream) throws Exception {
        SourceCoordinatorSerdeUtils.readRegisteredReaders(dataInputStream);
        this.assignmentTracker.restoreState(simpleVersionedSerializer, dataInputStream);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSourceReader(ReaderInfo readerInfo) {
        ReaderInfo put = this.registeredReaders.put(Integer.valueOf(readerInfo.getSubtaskId()), readerInfo);
        if (put != null) {
            throw new IllegalStateException("Overwriting " + put + " with " + readerInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterSourceReader(int i) {
        this.registeredReaders.remove(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SplitT> getAndRemoveUncheckpointedAssignment(int i, long j) {
        return this.assignmentTracker.getAndRemoveUncheckpointedAssignment(i, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCheckpointComplete(long j) {
        this.assignmentTracker.onCheckpointComplete(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OperatorCoordinator.Context getCoordinatorContext() {
        return this.operatorCoordinatorContext;
    }

    private <V> V callInCoordinatorThread(Callable<V> callable, String str) {
        if (this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
            try {
                return callable.call();
            } catch (Exception e) {
                throw new FlinkRuntimeException(str, e);
            }
        }
        try {
            return (V) this.coordinatorExecutor.submit(callable).get();
        } catch (InterruptedException | ExecutionException e2) {
            throw new FlinkRuntimeException(str, e2);
        }
    }
}
