package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/ExecutionGraphHandler.class */
public class ExecutionGraphHandler {
    private final ExecutionGraph executionGraph;
    private final Logger log;
    private final Executor ioExecutor;
    private final ComponentMainThreadExecutor mainThreadExecutor;

    public ExecutionGraphHandler(ExecutionGraph executionGraph, Logger logger, Executor executor, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.executionGraph = executionGraph;
        this.log = logger;
        this.ioExecutor = executor;
        this.mainThreadExecutor = componentMainThreadExecutor;
    }

    public void reportCheckpointMetrics(ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics) {
        processCheckpointCoordinatorMessage("ReportCheckpointStats", checkpointCoordinator -> {
            checkpointCoordinator.reportStats(j, executionAttemptID, checkpointMetrics);
        });
    }

    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        processCheckpointCoordinatorMessage("AcknowledgeCheckpoint", checkpointCoordinator -> {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot), retrieveTaskManagerLocation(executionAttemptID));
        });
    }

    public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
        processCheckpointCoordinatorMessage("DeclineCheckpoint", checkpointCoordinator -> {
            checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, retrieveTaskManagerLocation(declineCheckpoint.getTaskExecutionId()));
        });
    }

    private void processCheckpointCoordinatorMessage(String str, ThrowingConsumer<CheckpointCoordinator, Exception> throwingConsumer) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            this.ioExecutor.execute(() -> {
                try {
                    throwingConsumer.accept(checkpointCoordinator);
                } catch (Exception e) {
                    this.log.warn("Error while processing " + str + " message", (Throwable) e);
                }
            });
            return;
        }
        String str2 = "Received " + str + " message for job {} with no CheckpointCoordinator";
        if (this.executionGraph.getState() == JobStatus.RUNNING) {
            this.log.error(str2, this.executionGraph.getJobID());
        } else {
            this.log.debug(str2, this.executionGraph.getJobID());
        }
    }

    private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
        return (String) Optional.ofNullable(this.executionGraph.getRegisteredExecutions().get(executionAttemptID)).map((v0) -> {
            return v0.getAssignedResourceLocation();
        }).map((v0) -> {
            return v0.toString();
        }).orElse("Unknown location");
    }

    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) throws PartitionProducerDisposedException {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(resultPartitionID.getProducerId());
        if (execution != null) {
            return execution.getState();
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get(intermediateDataSetID);
        if (intermediateResult == null) {
            throw new IllegalArgumentException("Intermediate data set with ID " + intermediateDataSetID + " not found.");
        }
        Execution currentExecutionAttempt = intermediateResult.getPartitionById(resultPartitionID.getPartitionId()).getProducer().getCurrentExecutionAttempt();
        if (currentExecutionAttempt.getAttemptId().equals(resultPartitionID.getProducerId())) {
            return currentExecutionAttempt.getState();
        }
        throw new PartitionProducerDisposedException(resultPartitionID);
    }

    public SerializedInputSplit requestNextInputSplit(JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) throws IOException {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(executionAttemptID);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", executionAttemptID);
            }
            throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttemptID);
        }
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
        if (jobVertex == null) {
            throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + jobVertexID);
        }
        if (jobVertex.getSplitAssigner() == null) {
            throw new IllegalStateException("No InputSplitAssigner for vertex ID " + jobVertexID);
        }
        InputSplit nextInputSplit = execution.getNextInputSplit();
        if (nextInputSplit != null) {
            this.log.debug("Send next input split {}.", nextInputSplit);
        } else {
            this.log.debug("No more input splits available");
        }
        try {
            return new SerializedInputSplit(InstantiationUtil.serializeObject(nextInputSplit));
        } catch (Exception e) {
            IOException iOException = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", e);
            jobVertex.fail(iOException);
            throw iOException;
        }
    }
}
