package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.class */
public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {
    private static final Logger LOG;
    private final BackPressureRequestCoordinator coordinator;
    private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
    private final int backPressureStatsRefreshInterval;

    @GuardedBy("lock")
    private boolean shutDown;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();
    private final Set<ExecutionJobVertex> pendingStats = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl$BackPressureRequestCompletionCallback.class */
    public class BackPressureRequestCompletionCallback implements BiFunction<BackPressureStats, Throwable, Void> {
        private final ExecutionJobVertex vertex;

        BackPressureRequestCompletionCallback(ExecutionJobVertex executionJobVertex) {
            this.vertex = executionJobVertex;
        }

        @Override // java.util.function.BiFunction
        public Void apply(BackPressureStats backPressureStats, Throwable th) {
            synchronized (BackPressureStatsTrackerImpl.this.lock) {
                try {
                    try {
                    } catch (Throwable th2) {
                        BackPressureStatsTrackerImpl.LOG.error("Error during stats completion.", th2);
                        BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                    }
                    if (BackPressureStatsTrackerImpl.this.shutDown) {
                        return null;
                    }
                    JobStatus state = this.vertex.getGraph().getState();
                    if (state.isGloballyTerminalState()) {
                        BackPressureStatsTrackerImpl.LOG.debug("Ignoring stats, because job is in state " + state + ScopeFormat.SCOPE_SEPARATOR);
                    } else if (backPressureStats != null) {
                        BackPressureStatsTrackerImpl.this.operatorStatsCache.put(this.vertex, createOperatorBackPressureStats(backPressureStats));
                    } else {
                        BackPressureStatsTrackerImpl.LOG.debug("Failed to gather back pressure stats.", th);
                    }
                    BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                    return null;
                } finally {
                    BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                }
            }
        }

        private OperatorBackPressureStats createOperatorBackPressureStats(BackPressureStats backPressureStats) {
            Map<ExecutionAttemptID, Double> backPressureRatios = backPressureStats.getBackPressureRatios();
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(backPressureRatios.size());
            Set<ExecutionAttemptID> keySet = backPressureRatios.keySet();
            for (ExecutionVertex executionVertex : this.vertex.getTaskVertices()) {
                ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
                if (keySet.contains(attemptId)) {
                    newHashMapWithExpectedSize.put(attemptId, Integer.valueOf(executionVertex.getParallelSubtaskIndex()));
                } else {
                    BackPressureStatsTrackerImpl.LOG.debug("Outdated stats. A task, which is part of the request has been reset.");
                }
            }
            double[] dArr = new double[backPressureRatios.size()];
            for (Map.Entry<ExecutionAttemptID, Double> entry : backPressureRatios.entrySet()) {
                dArr[((Integer) newHashMapWithExpectedSize.get(entry.getKey())).intValue()] = entry.getValue().doubleValue();
            }
            return new OperatorBackPressureStats(backPressureStats.getRequestId(), backPressureStats.getEndTime(), dArr);
        }
    }

    public BackPressureStatsTrackerImpl(BackPressureRequestCoordinator backPressureRequestCoordinator, int i, int i2) {
        Preconditions.checkArgument(i >= 0, "The cleanup interval must be non-negative.");
        Preconditions.checkArgument(i2 >= 0, "The back pressure stats refresh interval must be non-negative.");
        this.coordinator = (BackPressureRequestCoordinator) Preconditions.checkNotNull(backPressureRequestCoordinator);
        this.backPressureStatsRefreshInterval = i2;
        this.operatorStatsCache = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(i, TimeUnit.MILLISECONDS).build();
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
    public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex executionJobVertex) {
        Optional<OperatorBackPressureStats> ofNullable;
        synchronized (this.lock) {
            OperatorBackPressureStats operatorBackPressureStats = (OperatorBackPressureStats) this.operatorStatsCache.getIfPresent(executionJobVertex);
            if (operatorBackPressureStats == null || this.backPressureStatsRefreshInterval <= System.currentTimeMillis() - operatorBackPressureStats.getEndTimestamp()) {
                triggerBackPressureRequestInternal(executionJobVertex);
            }
            ofNullable = Optional.ofNullable(operatorBackPressureStats);
        }
        return ofNullable;
    }

    private void triggerBackPressureRequestInternal(ExecutionJobVertex executionJobVertex) {
        Executor futureExecutor;
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.shutDown || this.pendingStats.contains(executionJobVertex) || executionJobVertex.getGraph().getState().isGloballyTerminalState() || (futureExecutor = executionJobVertex.getGraph().getFutureExecutor()) == null) {
            return;
        }
        this.pendingStats.add(executionJobVertex);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Triggering back pressure request for tasks: " + Arrays.toString(executionJobVertex.getTaskVertices()));
        }
        this.coordinator.triggerBackPressureRequest(executionJobVertex.getTaskVertices()).handleAsync((BiFunction<? super BackPressureStats, Throwable, ? extends U>) new BackPressureRequestCompletionCallback(executionJobVertex), futureExecutor);
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
    public void cleanUpOperatorStatsCache() {
        this.operatorStatsCache.cleanUp();
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
    public void shutDown() {
        synchronized (this.lock) {
            if (!this.shutDown) {
                this.operatorStatsCache.invalidateAll();
                this.pendingStats.clear();
                this.shutDown = true;
            }
        }
    }

    static {
        $assertionsDisabled = !BackPressureStatsTrackerImpl.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class);
    }
}
