package org.apache.flink.runtime.source.coordinator;

import java.lang.Thread;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.class */
public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends RecreateOnResetOperatorCoordinator.Provider {
    private static final long serialVersionUID = -1921681440009738462L;
    private final String operatorName;
    private final Source<?, SplitT, ?> source;
    private final int numWorkerThreads;
    private final WatermarkAlignmentParams alignmentParams;

    @Nullable
    private final String coordinatorListeningID;

    /* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.class */
    public static class CoordinatorExecutorThreadFactory implements ThreadFactory, Thread.UncaughtExceptionHandler {
        private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorProvider.class);
        private final String coordinatorThreadName;
        private final ClassLoader cl;
        private final Thread.UncaughtExceptionHandler errorHandler;

        @Nullable
        private Thread t;

        CoordinatorExecutorThreadFactory(String str, OperatorCoordinator.Context context) {
            this(str, context.getUserCodeClassloader(), (thread, th) -> {
                LOG.error("Thread '{}' produced an uncaught exception. Failing the job.", thread.getName(), th);
                context.failJob(th);
            });
        }

        CoordinatorExecutorThreadFactory(String str, ClassLoader classLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
            this.coordinatorThreadName = str;
            this.cl = classLoader;
            this.errorHandler = uncaughtExceptionHandler;
        }

        @Override // java.util.concurrent.ThreadFactory
        public synchronized Thread newThread(Runnable runnable) {
            Preconditions.checkState(this.t == null, "Please using the new CoordinatorExecutorThreadFactory, this factory cannot new multiple threads.");
            this.t = new Thread(runnable, this.coordinatorThreadName);
            this.t.setContextClassLoader(this.cl);
            this.t.setUncaughtExceptionHandler(this);
            return this.t;
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public synchronized void uncaughtException(Thread thread, Throwable th) {
            this.errorHandler.uncaughtException(thread, th);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getCoordinatorThreadName() {
            return this.coordinatorThreadName;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isCurrentThreadCoordinatorThread() {
            return Thread.currentThread() == this.t;
        }
    }

    public SourceCoordinatorProvider(String str, OperatorID operatorID, Source<?, SplitT, ?> source, int i, WatermarkAlignmentParams watermarkAlignmentParams, @Nullable String str2) {
        super(operatorID);
        this.operatorName = str;
        this.source = source;
        this.numWorkerThreads = i;
        this.alignmentParams = watermarkAlignmentParams;
        this.coordinatorListeningID = str2;
    }

    @Override // org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.Provider
    public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
        return new SourceCoordinator(context.getJobID(), this.operatorName, this.source, new SourceCoordinatorContext(context.getJobID(), new CoordinatorExecutorThreadFactory("SourceCoordinator-" + this.operatorName, context), this.numWorkerThreads, context, this.source.getSplitSerializer(), context.isConcurrentExecutionAttemptsSupported()), context.getCoordinatorStore(), this.alignmentParams, this.coordinatorListeningID);
    }
}
