package org.apache.flink.runtime.early;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/early/EarlyStoppingCoordinator.class */
public class EarlyStoppingCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(EarlyStoppingCoordinator.class);
    private final JobID jobID;
    private final Map<JobVertexID, AtomicInteger> requestedEarlyStoppedTasksCount = new HashMap();
    private final Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap();

    public EarlyStoppingCoordinator(JobID jobID, Iterable<ExecutionJobVertex> iterable) {
        this.jobID = (JobID) Preconditions.checkNotNull(jobID);
        for (ExecutionJobVertex executionJobVertex : iterable) {
            this.requestedEarlyStoppedTasksCount.put(executionJobVertex.getJobVertexId(), new AtomicInteger());
            this.jobVertices.put(executionJobVertex.getJobVertexId(), executionJobVertex);
        }
    }

    public void processEarlyStoppingMessage(JobID jobID, ExecutionAttemptID executionAttemptID, String str) {
        if (!this.jobID.equals(jobID)) {
            LOG.error("Received wrong EarlyStopping message for job {} from {}", jobID, str);
            return;
        }
        LOG.info("Received early stopping message for job {} and execution attempt {}", jobID, executionAttemptID);
        JobVertexID jobVertexId = executionAttemptID.getJobVertexId();
        if (this.requestedEarlyStoppedTasksCount.get(jobVertexId).incrementAndGet() == this.jobVertices.get(jobVertexId).getParallelism()) {
            sendEarlyStoppingRequest().join();
        }
    }

    private CompletableFuture<?> sendEarlyStoppingRequest() {
        return FutureUtils.combineAll((Collection) this.jobVertices.values().stream().map(this::sendEarlyStoppingRequestToVertex).collect(Collectors.toList()));
    }

    private CompletableFuture<?> sendEarlyStoppingRequestToVertex(ExecutionJobVertex executionJobVertex) {
        return FutureUtils.combineAll((Collection) Arrays.stream(executionJobVertex.getTaskVertices()).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map(execution -> {
            return execution.sendEarlyStoppingEvent().exceptionally(th -> {
                LOG.error("Error during early stopping of {}", execution, th);
                return Acknowledge.get();
            }).thenCompose(acknowledge -> {
                return execution.getReleaseFuture();
            });
        }).collect(Collectors.toList()));
    }
}
