package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobmaster.slotpool.DeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclareResourceRequirementServiceConnectionManager.class */
class DefaultDeclareResourceRequirementServiceConnectionManager extends AbstractServiceConnectionManager<DeclareResourceRequirementServiceConnectionManager.DeclareResourceRequirementsService> implements DeclareResourceRequirementServiceConnectionManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultDeclareResourceRequirementServiceConnectionManager.class);
    private final ScheduledExecutor scheduledExecutor;

    @GuardedBy("lock")
    @Nullable
    private ResourceRequirements currentResourceRequirements;

    private DefaultDeclareResourceRequirementServiceConnectionManager(ScheduledExecutor scheduledExecutor) {
        this.scheduledExecutor = scheduledExecutor;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclareResourceRequirementServiceConnectionManager
    public void declareResourceRequirements(ResourceRequirements resourceRequirements) {
        synchronized (this.lock) {
            checkNotClosed();
            if (isConnected()) {
                this.currentResourceRequirements = resourceRequirements;
                triggerResourceRequirementsSubmission(Duration.ofMillis(1L), Duration.ofMillis(10000L), this.currentResourceRequirements);
            }
        }
    }

    @GuardedBy("lock")
    private void triggerResourceRequirementsSubmission(Duration duration, Duration duration2, ResourceRequirements resourceRequirements) {
        FutureUtils.retryWithDelay(() -> {
            return sendResourceRequirements(resourceRequirements);
        }, new ExponentialBackoffRetryStrategy(Integer.MAX_VALUE, duration, duration2), (Predicate<Throwable>) th -> {
            return !(th instanceof CancellationException);
        }, this.scheduledExecutor);
    }

    private CompletableFuture<Acknowledge> sendResourceRequirements(ResourceRequirements resourceRequirements) {
        synchronized (this.lock) {
            if (!isConnected()) {
                LOG.debug("Stop sending resource requirements to ResourceManager because it is not connected.");
                return FutureUtils.completedExceptionally(new CancellationException());
            }
            if (resourceRequirements == this.currentResourceRequirements) {
                return ((DeclareResourceRequirementServiceConnectionManager.DeclareResourceRequirementsService) this.service).declareResourceRequirements(resourceRequirements);
            }
            LOG.debug("Newer resource requirements found. Stop sending old requirements.");
            return FutureUtils.completedExceptionally(new CancellationException());
        }
    }

    public static DeclareResourceRequirementServiceConnectionManager create(ScheduledExecutor scheduledExecutor) {
        return new DefaultDeclareResourceRequirementServiceConnectionManager(scheduledExecutor);
    }
}
