package org.apache.flink.runtime.rescale;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rescale.plan.RuntimeRescalePlan;
import org.apache.flink.runtime.state.rescale.RuntimeRescaleStorageLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rescale/RuntimeRescaleTasksHandler.class */
public class RuntimeRescaleTasksHandler {
    public static final Logger LOG = LoggerFactory.getLogger(RuntimeRescaleTasksHandler.class);
    private final RuntimeRescaleCoordinator coordinator;
    private RuntimeRescaleStorageLocation location;
    private final Object acknowledgeTasksLock = new Object();

    @GuardedBy("acknowledgeTasksLock")
    private final Map<JobVertexID, AtomicInteger> numNotYetAcknowledgedTaskProducersByJobVertices = new HashMap();

    @GuardedBy("acknowledgeTasksLock")
    private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks = new HashMap();

    @GuardedBy("acknowledgeTasksLock")
    private final Map<JobVertexID, List<ExecutionJobVertex>> currentNextJobVertices = new HashMap();
    private CompletableFuture<Acknowledge> allTasksAcknowledgedFuture = new CompletableFuture<>();

    public RuntimeRescaleTasksHandler(RuntimeRescaleCoordinator runtimeRescaleCoordinator) {
        this.coordinator = (RuntimeRescaleCoordinator) Preconditions.checkNotNull(runtimeRescaleCoordinator);
    }

    public boolean acknowledgeTask(ExecutionAttemptID executionAttemptID) {
        boolean isEmpty;
        synchronized (this.acknowledgeTasksLock) {
            Preconditions.checkState(!this.notYetAcknowledgedTasks.isEmpty());
            ExecutionVertex remove = this.notYetAcknowledgedTasks.remove(executionAttemptID);
            Preconditions.checkNotNull(remove);
            LOG.info("Get acknowledge for rescale id {} and subtask {}", Long.valueOf(this.coordinator.getRuntimeRescaleActionId()), remove.getTaskNameWithSubtaskIndex());
            LOG.info("Rest tasks: {}", this.notYetAcknowledgedTasks.keySet());
            notifyExecutionsStartedByRuntimeRescale(remove.getJobVertex().getJobVertexId());
            isEmpty = this.notYetAcknowledgedTasks.isEmpty();
        }
        return isEmpty;
    }

    private void notifyExecutionsStartedByRuntimeRescale(JobVertexID jobVertexID) {
        Preconditions.checkState(Thread.holdsLock(this.acknowledgeTasksLock));
        for (ExecutionJobVertex executionJobVertex : this.currentNextJobVertices.get(jobVertexID)) {
            AtomicInteger atomicInteger = this.numNotYetAcknowledgedTaskProducersByJobVertices.get(executionJobVertex.getJobVertexId());
            Preconditions.checkNotNull(atomicInteger);
            int decrementAndGet = atomicInteger.decrementAndGet();
            Preconditions.checkState(decrementAndGet >= 0);
            if (decrementAndGet == 0) {
                executionJobVertex.getVerticesToAddByUpscaling().forEach(executionVertex -> {
                    executionVertex.getCurrentExecutionAttempt().notifyExecutionStartedByRuntimeRescaleForContinue();
                });
            }
        }
    }

    public void reset() {
        this.allTasksAcknowledgedFuture = new CompletableFuture<>();
        this.location = null;
        this.currentNextJobVertices.clear();
        this.numNotYetAcknowledgedTaskProducersByJobVertices.clear();
        this.notYetAcknowledgedTasks.clear();
    }

    public void finishAcknowledging() {
        Preconditions.checkNotNull(this.allTasksAcknowledgedFuture);
        if (this.allTasksAcknowledgedFuture.isCompletedExceptionally()) {
            LOG.warn("Runtime rescale action with id {} was completed exceptionally.", Long.valueOf(this.coordinator.getRuntimeRescaleActionId()));
        } else {
            Preconditions.checkState(!this.allTasksAcknowledgedFuture.isDone());
            this.allTasksAcknowledgedFuture.complete(Acknowledge.get());
        }
    }

    public void failAcknowledging(Throwable th) {
        Preconditions.checkNotNull(this.allTasksAcknowledgedFuture);
        if (this.allTasksAcknowledgedFuture.isDone()) {
            LOG.warn("Runtime rescale action with id {} was already finished.", Long.valueOf(this.coordinator.getRuntimeRescaleActionId()), th);
        } else {
            this.allTasksAcknowledgedFuture.completeExceptionally(th);
        }
    }

    public CompletableFuture<Acknowledge> getAllTasksAcknowledgedFuture() {
        return this.allTasksAcknowledgedFuture;
    }

    public void cleanupTasksHandler() {
        try {
            this.location.dispose();
        } catch (IOException e) {
            LOG.warn("Cannot dispose location for runtime rescale action {} with next error", Long.valueOf(this.coordinator.getRuntimeRescaleActionId()), e);
        }
    }

    public void initTasksHandler(RuntimeRescalePlan runtimeRescalePlan) throws IOException {
        Preconditions.checkState(null == this.location);
        this.location = this.coordinator.initializeRuntimeRescaleLocation();
        synchronized (this.acknowledgeTasksLock) {
            Preconditions.checkNotNull(runtimeRescalePlan);
            Preconditions.checkState(this.currentNextJobVertices.isEmpty());
            for (ExecutionJobVertex executionJobVertex : runtimeRescalePlan.getJobVerticesInTopologicalOrder()) {
                ArrayList arrayList = new ArrayList();
                InternalExecutionGraphAccessor graph = executionJobVertex.getGraph();
                for (IntermediateResult intermediateResult : executionJobVertex.getProducedDataSets()) {
                    Stream<JobVertexID> stream = intermediateResult.getConsumerVertices().stream();
                    Objects.requireNonNull(graph);
                    arrayList.addAll((Collection) stream.map(graph::getJobVertex).collect(Collectors.toList()));
                }
                this.currentNextJobVertices.put(executionJobVertex.getJobVertexId(), arrayList);
            }
            Preconditions.checkState(this.numNotYetAcknowledgedTaskProducersByJobVertices.isEmpty());
            Preconditions.checkState(this.notYetAcknowledgedTasks.isEmpty());
            for (Execution execution : runtimeRescalePlan.getTasksForAcknowledging()) {
                Iterator<ExecutionJobVertex> it = this.currentNextJobVertices.get(execution.getVertex().getJobvertexId()).iterator();
                while (it.hasNext()) {
                    this.numNotYetAcknowledgedTaskProducersByJobVertices.compute(it.next().getJobVertexId(), (jobVertexID, atomicInteger) -> {
                        AtomicInteger atomicInteger = atomicInteger;
                        if (null == atomicInteger) {
                            atomicInteger = new AtomicInteger();
                        }
                        atomicInteger.incrementAndGet();
                        return atomicInteger;
                    });
                }
                this.notYetAcknowledgedTasks.put(execution.getAttemptId(), execution.getVertex());
            }
        }
    }
}
