package org.apache.flink.cep.dynamic.coordinator;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.cep.dynamic.coordinator.DynamicCepOperatorCoordinatorProvider;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cep/dynamic/coordinator/DynamicCepOperatorCoordinatorContext.class */
public class DynamicCepOperatorCoordinatorContext implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicCepOperatorCoordinatorContext.class);
    private final ScheduledExecutorService coordinatorExecutor;
    private final ScheduledExecutorService workerExecutor;
    private final DynamicCepOperatorCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private final OperatorCoordinator.Context operatorCoordinatorContext;
    private final Map<Integer, OperatorCoordinator.SubtaskGateway> subtaskGateways;

    public DynamicCepOperatorCoordinatorContext(DynamicCepOperatorCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, OperatorCoordinator.Context context) {
        this(Executors.newScheduledThreadPool(1, coordinatorExecutorThreadFactory), Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(coordinatorExecutorThreadFactory.getCoordinatorThreadName() + "-worker")), coordinatorExecutorThreadFactory, context);
    }

    public DynamicCepOperatorCoordinatorContext(ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, DynamicCepOperatorCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, OperatorCoordinator.Context context) {
        this.coordinatorExecutor = scheduledExecutorService;
        this.workerExecutor = scheduledExecutorService2;
        this.coordinatorThreadFactory = coordinatorExecutorThreadFactory;
        this.operatorCoordinatorContext = context;
        this.subtaskGateways = new HashMap(context.currentParallelism());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        ComponentClosingUtils.shutdownExecutorForcefully(this.workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
        ComponentClosingUtils.shutdownExecutorForcefully(this.coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute(runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScheduledFuture<?> schedulePeriodTask(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        ScheduledExecutorService scheduledExecutorService = this.coordinatorExecutor;
        Objects.requireNonNull(runnable);
        return scheduledExecutorService.scheduleAtFixedRate(runnable::run, j, j2, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClassLoader getUserCodeClassloader() {
        return this.operatorCoordinatorContext.getUserCodeClassloader();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskReady(OperatorCoordinator.SubtaskGateway subtaskGateway) {
        int subtask = subtaskGateway.getSubtask();
        if (this.subtaskGateways.get(Integer.valueOf(subtask)) != null) {
            throw new IllegalStateException("Already have a subtask gateway for " + subtask);
        }
        this.subtaskGateways.put(Integer.valueOf(subtask), subtaskGateway);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskNotReady(int i) {
        this.subtaskGateways.put(Integer.valueOf(i), null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Integer> getSubtasks() {
        return this.subtaskGateways.keySet();
    }

    public void sendEventToOperator(int i, OperatorEvent operatorEvent) {
        callInCoordinatorThread(() -> {
            OperatorCoordinator.SubtaskGateway subtaskGateway = this.subtaskGateways.get(Integer.valueOf(i));
            if (subtaskGateway == null) {
                LOG.warn(String.format("Subtask %d is not ready yet to receive events.", Integer.valueOf(i)));
                return null;
            }
            subtaskGateway.sendEvent(operatorEvent);
            return null;
        }, String.format("Failed to send event %s to subtask %d", operatorEvent, Integer.valueOf(i)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failCoordinator(Throwable th) {
        try {
            this.coordinatorExecutor.submit(() -> {
                this.coordinatorThreadFactory.failCoordinatorThread(th);
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new FlinkRuntimeException("Error failing DynamicCepOperatorCoordinator", e);
        }
    }

    private <V> V callInCoordinatorThread(Callable<V> callable, String str) {
        if (this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread() || this.coordinatorExecutor.isShutdown()) {
            try {
                return callable.call();
            } catch (Throwable th) {
                LOG.error("Uncaught Exception in DynamicCepOperatorCoordinator Executor", th);
                throw new FlinkRuntimeException(str, th);
            }
        }
        try {
            return (V) this.coordinatorExecutor.submit(() -> {
                try {
                    return callable.call();
                } catch (Throwable th2) {
                    LOG.error("Uncaught Exception in DynamicCepOperatorCoordinator Executor", th2);
                    ExceptionUtils.rethrowException(th2);
                    return null;
                }
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new FlinkRuntimeException(str, e);
        }
    }
}
