package org.apache.flink.runtime.resourceestimator;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rescaling.RescalingUtils;
import org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator;
import org.apache.flink.runtime.resourceestimator.predictions.VertexEstimations;
import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourceestimator/DataFlowEstimator.class */
public class DataFlowEstimator {
    private static final Logger LOG = LoggerFactory.getLogger(DataFlowEstimator.class);

    @Nonnull
    private final PredictionCoordinator coordinator;
    private final ScheduledFuture<?> modelUpdateFuture;
    private JobGraph oldGraph;
    private final ScheduledExecutorService modelUpdater = Executors.newScheduledThreadPool(1);
    private final long timeout = 15000;

    public DataFlowEstimator(PredictionCoordinator predictionCoordinator, @Nonnull JobGraph jobGraph) {
        this.coordinator = predictionCoordinator;
        predictionCoordinator.addPredictableList(jobGraph.getVerticesSortedTopologicallyFromSources());
        this.oldGraph = jobGraph;
        this.modelUpdateFuture = this.modelUpdater.scheduleWithFixedDelay(() -> {
            synchronized (predictionCoordinator) {
                try {
                    predictionCoordinator.updateModels(this.oldGraph);
                } catch (Exception e) {
                    LOG.error("Error during metric update " + e.getClass() + ": " + e.getMessage());
                }
            }
        }, 15000L, 15000L, TimeUnit.MILLISECONDS);
    }

    public Map<JobVertexID, Integer> evaluate(JobGraph jobGraph, int i, int i2) {
        Map<JobVertexID, VertexEstimations> jobVerticesInformation;
        LOG.info("Evaluating for an old graph at: " + System.currentTimeMillis());
        Map map = (Map) jobGraph.getVerticesSortedTopologicallyFromSources().stream().collect(Collectors.toMap((v0) -> {
            return v0.getID();
        }, (v0) -> {
            return v0.getParallelism();
        }));
        synchronized (this.coordinator) {
            jobVerticesInformation = this.coordinator.getJobVerticesInformation(86400L);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((String) jobGraph.getVerticesSortedTopologicallyFromSources().stream().map(jobVertex -> {
                return jobVertex.getName() + ": " + ((VertexEstimations) jobVerticesInformation.get(jobVertex.getID())).toString();
            }).collect(Collectors.joining(",\n")));
        }
        Map<JobVertexID, Integer> calcParallelism = calcParallelism(jobGraph, jobVerticesInformation, i, i2);
        if (CollectionUtil.isNullOrEmpty(calcParallelism)) {
            LOG.info("Fail to calculate new parallelism for job {}", jobGraph.getJobID());
            return calcParallelism;
        }
        if (LOG.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder();
            for (JobVertex jobVertex2 : jobGraph.getVerticesSortedTopologicallyFromSources()) {
                sb.append(jobVertex2.getID()).append(": ").append(calcParallelism.get(jobVertex2.getID()).intValue()).append("/").append(jobVertex2.getMaxParallelism()).append("\n");
            }
            LOG.info("Calculated parallelism: " + ((Object) sb));
        }
        if (!map.equals(calcParallelism)) {
            LOG.debug("Rescaled Graph: " + calcParallelism);
            return calcParallelism;
        }
        LOG.info("After calculation, parallelism for job {} needs no change", jobGraph.getJobID());
        calcParallelism.clear();
        return calcParallelism;
    }

    @VisibleForTesting
    public Map<JobVertexID, Integer> calcParallelism(JobGraph jobGraph, Map<JobVertexID, VertexEstimations> map, int i, int i2) {
        if (RescalingUtils.calculateMinimalSlotRequirements(jobGraph).intValue() > i2) {
            LOG.info("Cannot fit the {} limitation", Integer.valueOf(i2));
            HashMap hashMap = new HashMap();
            Iterator<JobVertex> it = jobGraph.getVerticesSortedTopologicallyFromSources().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next().getID(), 1);
            }
            return hashMap;
        }
        Map<JobVertexID, Integer> calculateParallelismForRate = calculateParallelismForRate(jobGraph, map, i2, 1.0d);
        if (calculateParallelismForRate != null) {
            try {
                LOG.info("Can process whole data stream using at most {} slots", Integer.valueOf(i2));
                if (RescalingUtils.calculateSlotRequirements(jobGraph, calculateParallelismForRate).intValue() >= i) {
                    LOG.info("Rescaled job parallelism fits minimal {} slots requirement.", Integer.valueOf(i));
                    return calculateParallelismForRate;
                }
                double d = 1.0d;
                double d2 = 1.0E9d;
                for (int i3 = 0; i3 < 60; i3++) {
                    double d3 = (d + d2) / 2.0d;
                    Map<JobVertexID, Integer> calculateParallelismForRate2 = calculateParallelismForRate(jobGraph, map, i2, d3);
                    if (calculateParallelismForRate2 == null) {
                        d2 = d3;
                    } else {
                        if (RescalingUtils.calculateSlotRequirements(jobGraph, calculateParallelismForRate2).intValue() >= i) {
                            LOG.info("Rescaled job using {} processing {} ratio of stream", RescalingUtils.calculateSlotRequirements(jobGraph, calculateParallelismForRate2), Double.valueOf(d3));
                            return calculateParallelismForRate2;
                        }
                        d = d3;
                    }
                }
                return calculateParallelismForRate(jobGraph, map, i2, (d2 + d) / 2.0d);
            } catch (Exception e) {
                LOG.info("Exception during estimation: " + e.getMessage());
            }
        }
        double d4 = 0.0d;
        double d5 = 1.0d;
        for (int i4 = 0; i4 < 30; i4++) {
            double d6 = (d4 + d5) / 2.0d;
            if (calculateParallelismForRate(jobGraph, map, i2, d6) == null) {
                d5 = d6;
            } else {
                d4 = d6;
            }
        }
        LOG.info("Estimated data rate: " + d4);
        return calculateParallelismForRate(jobGraph, map, i2, d4);
    }

    private Map<JobVertexID, Integer> calculateParallelismForRate(JobGraph jobGraph, Map<JobVertexID, VertexEstimations> map, int i, double d) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            VertexEstimations vertexEstimations = map.get(jobVertex.getID());
            double extraInputPerSecond = vertexEstimations.getExtraInputPerSecond();
            Iterator<JobEdge> it = jobVertex.getInputs().iterator();
            while (it.hasNext()) {
                extraInputPerSecond += ((Double) hashMap.get(it.next().getSource().getProducer().getID())).doubleValue();
            }
            double outputRate = vertexEstimations.getOutputRate();
            if (jobVertex.isInputVertex()) {
                if (d > 1.0d) {
                    extraInputPerSecond = Math.max(1.0d, extraInputPerSecond);
                }
                outputRate *= d;
            }
            double d2 = extraInputPerSecond * outputRate;
            int max = Math.max(1, (int) Math.ceil(extraInputPerSecond / vertexEstimations.getThroughput()));
            if (jobVertex.getMaxParallelism() != -1 && jobVertex.getMaxParallelism() < max) {
                LOG.debug("Failed to assign parallelism for rate: {}, vertex: {}, estimated: {}, limit: {}", new Object[]{Double.valueOf(d), jobVertex.getID().toHexString(), Integer.valueOf(max), Integer.valueOf(jobVertex.getMaxParallelism())});
                return null;
            }
            hashMap2.put(jobVertex.getID(), Integer.valueOf(max));
            hashMap.put(jobVertex.getID(), Double.valueOf(d2));
        }
        if (RescalingUtils.calculateSlotRequirements(jobGraph, hashMap2).intValue() <= i) {
            return hashMap2;
        }
        return null;
    }

    public void updateParallelism(JobGraph jobGraph) {
        HashMap hashMap = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            hashMap.put(jobVertex.getID(), Integer.valueOf(jobVertex.getParallelism()));
        }
        synchronized (this.coordinator) {
            this.coordinator.updateParallelism(hashMap);
            this.coordinator.cleanMetricsForJob(this.oldGraph.getJobID());
            this.oldGraph = jobGraph;
        }
    }

    public void cancel() {
        this.modelUpdateFuture.cancel(true);
        this.modelUpdater.shutdown();
    }
}
