package org.apache.flink.runtime.taskmanager;

import akka.dispatch.OnFailure;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.class */
public class ActorGatewayResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ActorGatewayResultPartitionConsumableNotifier.class);
    private final ExecutionContext executionContext;
    private final ActorGateway jobManager;
    private final FiniteDuration jobManagerMessageTimeout;

    public ActorGatewayResultPartitionConsumableNotifier(ExecutionContext executionContext, ActorGateway actorGateway, FiniteDuration finiteDuration) {
        this.executionContext = (ExecutionContext) Preconditions.checkNotNull(executionContext);
        this.jobManager = (ActorGateway) Preconditions.checkNotNull(actorGateway);
        this.jobManagerMessageTimeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier
    public void notifyPartitionConsumable(JobID jobID, ResultPartitionID resultPartitionID, final TaskActions taskActions) {
        this.jobManager.ask(new JobManagerMessages.ScheduleOrUpdateConsumers(jobID, resultPartitionID), this.jobManagerMessageTimeout).onFailure(new OnFailure() { // from class: org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier.1
            @Override // akka.dispatch.OnFailure
            public void onFailure(Throwable th) {
                ActorGatewayResultPartitionConsumableNotifier.LOG.error("Could not schedule or update consumers at the JobManager.", th);
                taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers", th));
            }
        }, this.executionContext);
    }
}
