package org.apache.flink.runtime.resourceestimator;

import com.esotericsoftware.minlog.Log;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphOperatorDetails;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorDetails;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.history.JobMetricsSnapshot;
import org.apache.flink.runtime.metrics.history.MetricHistoryService;
import org.apache.flink.runtime.metrics.history.OperatorMetricsSnapshot;
import org.apache.flink.runtime.metrics.history.RateMetric;
import org.apache.flink.runtime.metrics.history.TaskMetricsSnapshot;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourceestimator/DynamicResourceEstimator.class */
public class DynamicResourceEstimator {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicResourceEstimator.class);
    public static final long SLOT_CPU_THRESHOLD = 3000000000L;
    public static final long SLOT_MEMORY_THRESHOLD = 268435456;
    public static final int DEFAULT_VERTEX_PARALLELISM = 1;
    public static final int MAX_VERTEX_PARALLELISM = 512;
    public static final long DEFAULT_INPUT_RATE = 500;
    public static final long DEFAULT_RECORD_SIZE = 512;
    private final MetricHistoryService metricHistoryService;

    public DynamicResourceEstimator(MetricHistoryService metricHistoryService) {
        Preconditions.checkNotNull(metricHistoryService, "DynamicResourceEstimator can't work without enabled MetricHistoryService. You need to configure metric reporter and turn on `metrics.reporter.influxdb.enableHistoryClient: true` in flink-conf.yaml");
        this.metricHistoryService = metricHistoryService;
    }

    public boolean checkMappingJobGraphWithJobGraphOperatorDetails(JobGraph jobGraph, JobGraphOperatorDetails jobGraphOperatorDetails) {
        return jobGraph.getVerticesSortedTopologicallyFromSources().stream().flatMap(jobVertex -> {
            return jobVertex.getOperatorIDs().stream();
        }).allMatch(operatorIDPair -> {
            return jobGraphOperatorDetails.getOperators().containsKey(operatorIDPair.getGeneratedOperatorID());
        });
    }

    public Optional<JobGraph> calcNewParallelismJobGraph(JobGraph jobGraph, List<JobGraph> list, BiFunction<JobGraph, Map<JobVertexID, Integer>, Optional<JobGraph>> biFunction) {
        Optional<JobGraphOperatorDetails> jobGraphOperatorDetails = jobGraph.getJobGraphOperatorDetails();
        if (!jobGraphOperatorDetails.isPresent()) {
            LOG.warn("Job " + jobGraph.getJobID() + " doesn't have JobGraphOperatorDetails");
            return Optional.empty();
        }
        JobGraphOperatorDetails jobGraphOperatorDetails2 = jobGraphOperatorDetails.get();
        Set<OperatorID> sinks = jobGraphOperatorDetails2.getSinks();
        Preconditions.checkState(checkMappingJobGraphWithJobGraphOperatorDetails(jobGraph, jobGraphOperatorDetails2), "JobGraphOperatorDetails should have details for each operator from JobGraph");
        JobMetricsSnapshot jobMetrics = this.metricHistoryService.getJobMetrics(jobGraph, list);
        Map<OperatorID, OperatorCost> map = (Map) collectOperatorCosts(sinks, jobGraphOperatorDetails2.getOperators(), jobMetrics.getOperatorMetrics()).stream().collect(Collectors.toMap(tuple2 -> {
            return (OperatorID) tuple2.f0;
        }, tuple22 -> {
            return (OperatorCost) tuple22.f1;
        }));
        LOG.debug("Operator estimations of job " + jobGraph.getJobID() + ": " + map.toString());
        Map map2 = (Map) jobGraph.getVerticesSortedTopologicallyFromSources().stream().collect(Collectors.toMap((v0) -> {
            return v0.getID();
        }, (v0) -> {
            return v0.getParallelism();
        }));
        Map<JobVertexID, Integer> estimateParallelismWithResourceLimitation = estimateParallelismWithResourceLimitation(jobGraph, map, jobMetrics.getTaskMetrics());
        if (map2.equals(estimateParallelismWithResourceLimitation)) {
            LOG.debug("New job vertices parallelism is similar to the old job " + jobGraph.getJobID() + ": " + estimateParallelismWithResourceLimitation);
            return Optional.empty();
        }
        Log.debug("New job vertices parallelism of job " + jobGraph.getJobID() + ": " + estimateParallelismWithResourceLimitation);
        Optional<JobGraph> apply = biFunction.apply(jobGraph, estimateParallelismWithResourceLimitation);
        LOG.debug("Created new job " + apply.map((v0) -> {
            return v0.getJobID();
        }) + " based on job " + jobGraph.getJobID());
        return apply;
    }

    private Map<JobVertexID, Integer> estimateParallelismWithResourceLimitation(JobGraph jobGraph, Map<OperatorID, OperatorCost> map, Map<JobVertexID, TaskMetricsSnapshot> map2) {
        Preconditions.checkArgument(jobGraph.getVerticesAsArray().length > 0, "JobGraph can't be empty");
        Map<JobVertexID, Integer> map3 = (Map) jobGraph.getVerticesSortedTopologicallyFromSources().stream().collect(Collectors.toMap((v0) -> {
            return v0.getID();
        }, jobVertex -> {
            return Integer.valueOf(jobVertex.getMaxParallelism() < 1 ? MAX_VERTEX_PARALLELISM : Math.min(jobVertex.getMaxParallelism(), MAX_VERTEX_PARALLELISM));
        }));
        Optional<U> map4 = jobGraph.getJobResourceLimitation().map(JobGraphComputeUnitSlotAdapter::new);
        HashMap hashMap = new HashMap();
        boolean z = true;
        do {
            for (JobVertex jobVertex2 : jobGraph.getVerticesSortedTopologicallyFromSources()) {
                int intValue = map3.get(jobVertex2.getID()).intValue();
                Stream<R> map5 = jobVertex2.getOperatorIDs().stream().map((v0) -> {
                    return v0.getGeneratedOperatorID();
                });
                map.getClass();
                Stream filter = map5.filter((v1) -> {
                    return r1.containsKey(v1);
                });
                map.getClass();
                List list = (List) filter.map((v1) -> {
                    return r1.get(v1);
                }).collect(Collectors.toList());
                int i = 1;
                double extractUpstreamBackpressure = extractUpstreamBackpressure(jobVertex2, map2);
                double extractUpstreamOutputPoolUsage = extractUpstreamOutputPoolUsage(jobVertex2, map2);
                if (!list.isEmpty()) {
                    i = calcParallelism(new JobVertexCost(list, extractUpstreamBackpressure, extractUpstreamOutputPoolUsage), jobVertex2.getParallelism(), intValue);
                }
                hashMap.put(jobVertex2.getID(), Integer.valueOf(i));
            }
            if (!map4.isPresent() || ((JobGraphComputeUnitSlotAdapter) map4.get()).isEnoughResources(jobGraph, hashMap)) {
                z = false;
            } else if (decreaseMaxParallelismIfPossible(map3)) {
                hashMap.clear();
            } else {
                z = false;
            }
        } while (z);
        return hashMap;
    }

    private double extractUpstreamBackpressure(JobVertex jobVertex, Map<JobVertexID, TaskMetricsSnapshot> map) {
        List list = (List) jobVertex.getInputs().stream().map(jobEdge -> {
            return jobEdge.getSource().getProducer();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return 0.0d;
        }
        double backpressure = (map.containsKey(jobVertex.getID()) ? map.get(jobVertex.getID()).getBackpressure() : 0.0d) / jobVertex.getParallelism();
        double doubleValue = ((Double) list.stream().map(jobVertex2 -> {
            return Double.valueOf((map.containsKey(jobVertex2.getID()) ? ((TaskMetricsSnapshot) map.get(jobVertex2.getID())).getBackpressure() : 0.0d) / jobVertex2.getParallelism());
        }).reduce(Double.valueOf(0.0d), (v0, v1) -> {
            return Math.max(v0, v1);
        })).doubleValue();
        return (backpressure <= 0.0d || jobVertex.isOutputVertex()) ? doubleValue : Math.max(0.0d, backpressure - doubleValue);
    }

    private double extractUpstreamOutputPoolUsage(JobVertex jobVertex, Map<JobVertexID, TaskMetricsSnapshot> map) {
        List list = (List) jobVertex.getInputs().stream().map(jobEdge -> {
            return jobEdge.getSource().getProducer();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return 0.0d;
        }
        return ((Double) list.stream().map(jobVertex2 -> {
            return Double.valueOf(map.containsKey(jobVertex2.getID()) ? ((TaskMetricsSnapshot) map.get(jobVertex2.getID())).getBuffersOutPoolUsage() : 0.0d);
        }).reduce(Double.valueOf(0.0d), (v0, v1) -> {
            return Math.max(v0, v1);
        })).doubleValue();
    }

    protected boolean decreaseMaxParallelismIfPossible(Map<JobVertexID, Integer> map) {
        Preconditions.checkArgument(map.size() > 0, "Mapping of JobVertexID to MaxParallelism can't be empty");
        Optional<Integer> max = map.values().stream().max((v0, v1) -> {
            return v0.compareTo(v1);
        });
        if (!max.isPresent()) {
            throw new IllegalStateException("MaxParallelism can't be determined.");
        }
        if (max.get().intValue() <= 1) {
            return false;
        }
        int intValue = max.get().intValue();
        map.replaceAll((jobVertexID, num) -> {
            return Integer.valueOf(intValue == num.intValue() ? num.intValue() - 1 : num.intValue());
        });
        return true;
    }

    private int calcParallelism(JobVertexCost jobVertexCost, int i, int i2) {
        return jobVertexCost.getUpstreamBackpressureContribution() > 0.0d ? Math.min((int) Math.ceil((1.0d + Math.min(1.0d, jobVertexCost.getUpstreamBackpressureContribution())) * Math.max(i, r0)), i2) : jobVertexCost.getUpstreamBuffersOutputPoolUsage() > 0.5d ? Math.min((int) Math.ceil((0.5d + jobVertexCost.getUpstreamBuffersOutputPoolUsage()) * Math.max(i, r0)), i2) : Math.min(Math.max(Math.max((int) (jobVertexCost.getEstimatedCpuCycles() / 3.0E9d), 1), Math.max((int) (jobVertexCost.getEstimatedStateBytes() / 2.68435456E8d), 1)), i2);
    }

    private List<Tuple2<OperatorID, OperatorCost>> collectOperatorCosts(Set<OperatorID> set, Map<OperatorID, OperatorDetails> map, Map<OperatorID, OperatorMetricsSnapshot> map2) {
        Preconditions.checkArgument(!set.isEmpty(), "List of target operators can't be empty");
        Iterator<OperatorID> it = set.iterator();
        if (!it.hasNext()) {
            return Collections.emptyList();
        }
        OperatorID next = it.next();
        OperatorDetails operatorDetails = map.get(next);
        OperatorID operatorID = operatorDetails.getOperatorID();
        if (operatorDetails.getInputOperationIDs().isEmpty()) {
            return Collections.singletonList(new Tuple2(next, calculateOperatorCost(operatorDetails, map2.get(operatorID))));
        }
        return (List) Stream.concat(collectOperatorCosts(operatorDetails.getInputOperationIDs(), map, map2).stream(), Stream.of(new Tuple2(operatorID, calculateOperatorCost(operatorDetails, map2.get(operatorID))))).collect(Collectors.toList());
    }

    private OperatorCost calculateOperatorCost(OperatorDetails operatorDetails, OperatorMetricsSnapshot operatorMetricsSnapshot) {
        RateMetric rate = operatorMetricsSnapshot.getRate();
        long longValue = Double.valueOf(operatorDetails.isSource() ? rate.getOutputRate() : rate.getInputRate()).longValue();
        long j = longValue >= 1 ? longValue : 500L;
        return OperatorCostUtils.estimate(operatorDetails.getOperationKindTag(), j, operatorMetricsSnapshot.getStateRecordCount() > 0 ? operatorMetricsSnapshot.getStateRecordCount() : j * 2, operatorMetricsSnapshot.getInputRecordSize() > 0 ? operatorMetricsSnapshot.getInputRecordSize() : 512L, (operatorDetails.isSink() || operatorDetails.isSource()) ? 1.0d : rate.getRateFactor().orElse(Double.valueOf(1.0d)).doubleValue());
    }
}
