package org.apache.flink.runtime.resourceestimator;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.configuration2.tree.DefaultExpressionEngineSymbols;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rescaling.RedeploymentUtils;
import org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourceestimator/DataFlowEstimator.class */
public class DataFlowEstimator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DataFlowEstimator.class);

    @Nonnull
    private final PredictionCoordinator coordinator;
    private final ScheduledExecutorService modelUpdater = Executors.newScheduledThreadPool(1);
    private final long timeout = 15000;
    private JobGraph oldGraph;

    public DataFlowEstimator(PredictionCoordinator predictionCoordinator, @Nonnull JobGraph jobGraph) {
        this.coordinator = predictionCoordinator;
        predictionCoordinator.addPredictableList(jobGraph.getVerticesSortedTopologicallyFromSources());
        this.oldGraph = jobGraph;
        this.modelUpdater.scheduleWithFixedDelay(() -> {
            synchronized (predictionCoordinator) {
                predictionCoordinator.updateModels(this.oldGraph);
            }
        }, 15000L, 15000L, TimeUnit.MILLISECONDS);
    }

    public Optional<JobGraph> evaluate(JobGraph jobGraph, BlobServer blobServer, int i, int i2) throws FlinkException {
        HashMap<JobVertexID, JobVertexEstimations> jobVerticesInformation;
        LOG.info("Evaluating for an old graph at: " + System.currentTimeMillis());
        Map map = (Map) jobGraph.getVerticesSortedTopologicallyFromSources().stream().collect(Collectors.toMap((v0) -> {
            return v0.getID();
        }, (v0) -> {
            return v0.getParallelism();
        }));
        synchronized (this.coordinator) {
            jobVerticesInformation = this.coordinator.getJobVerticesInformation(86400L);
        }
        LOG.info("Estimated metrics: " + ((String) jobGraph.getVerticesSortedTopologicallyFromSources().stream().map((v0) -> {
            return v0.getID();
        }).map(jobVertexID -> {
            return jobVertexID.toHexString() + ": [" + ((JobVertexEstimations) jobVerticesInformation.get(jobVertexID)).getEstimatedOutputRatio() + " " + ((JobVertexEstimations) jobVerticesInformation.get(jobVertexID)).getEstimatedRecordsThroughput() + DefaultExpressionEngineSymbols.DEFAULT_ATTRIBUTE_END;
        }).collect(Collectors.joining(", "))));
        HashMap<JobVertexID, Integer> calcParallelism = calcParallelism(jobGraph, jobVerticesInformation, i, i2);
        StringBuilder sb = new StringBuilder();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            sb.append(jobVertex.getID()).append(": ").append(calcParallelism.get(jobVertex.getID()).intValue()).append("/").append(jobVertex.getMaxParallelism()).append("\n");
        }
        LOG.info("Calculated parallelism: " + ((Object) sb));
        if (map.equals(calcParallelism)) {
            return Optional.empty();
        }
        LOG.debug("Rescaled Graph: " + calcParallelism.toString());
        return RedeploymentUtils.modifyParallelismForJobGraphIgnoringOperators(jobGraph, calcParallelism, blobServer);
    }

    @VisibleForTesting
    public HashMap<JobVertexID, Integer> calcParallelism(JobGraph jobGraph, HashMap<JobVertexID, JobVertexEstimations> hashMap, int i, int i2) {
        if (i2 == -1) {
            i2 = 1000000000;
        }
        if (i < 0) {
            i = 0;
        }
        Preconditions.checkState(i <= i2, "Number of required slots should not exceed number of available slots");
        if (RedeploymentUtils.calculateMinimalSlotRequirements(jobGraph).intValue() > i2) {
            LOG.info("Cannot fit the {} limitation", Integer.valueOf(i2));
            HashMap<JobVertexID, Integer> hashMap2 = new HashMap<>();
            Iterator<JobVertex> it = jobGraph.getVerticesSortedTopologicallyFromSources().iterator();
            while (it.hasNext()) {
                hashMap2.put(it.next().getID(), 1);
            }
            return hashMap2;
        }
        HashMap<JobVertexID, Integer> calculateParallelismForRate = calculateParallelismForRate(jobGraph, hashMap, i2, 1.0d);
        if (calculateParallelismForRate == null) {
            double d = 0.0d;
            double d2 = 1.0d;
            for (int i3 = 0; i3 < 30; i3++) {
                double d3 = (d + d2) / 2.0d;
                if (calculateParallelismForRate(jobGraph, hashMap, i2, d3) == null) {
                    d2 = d3;
                } else {
                    d = d3;
                }
            }
            LOG.info("Estimated data rate: " + d);
            return calculateParallelismForRate(jobGraph, hashMap, i2, d);
        }
        LOG.info("Can process whole data stream using at most {} slots", Integer.valueOf(i2));
        if (RedeploymentUtils.calculateSlotRequirements(jobGraph, calculateParallelismForRate).intValue() >= i) {
            LOG.info("Rescaled job parallelism fits minimal {} slots requirement.", Integer.valueOf(i));
            return calculateParallelismForRate;
        }
        double d4 = 1.0d;
        double d5 = 1.0E9d;
        for (int i4 = 0; i4 < 60; i4++) {
            double d6 = (d4 + d5) / 2.0d;
            HashMap<JobVertexID, Integer> calculateParallelismForRate2 = calculateParallelismForRate(jobGraph, hashMap, i2, d6);
            if (calculateParallelismForRate2 == null) {
                d5 = d6;
            } else {
                if (RedeploymentUtils.calculateSlotRequirements(jobGraph, calculateParallelismForRate2).intValue() >= i) {
                    LOG.info("Rescaled job using {} processing {} ratio of stream", RedeploymentUtils.calculateSlotRequirements(jobGraph, calculateParallelismForRate2), Double.valueOf(d6));
                    return calculateParallelismForRate2;
                }
                d4 = d6;
            }
        }
        LOG.info("This message should not appear");
        return calculateParallelismForRate(jobGraph, hashMap, i2, (d5 + d4) / 2.0d);
    }

    private HashMap<JobVertexID, Integer> calculateParallelismForRate(JobGraph jobGraph, HashMap<JobVertexID, JobVertexEstimations> hashMap, int i, double d) {
        double estimatedOutputRatio;
        HashMap hashMap2 = new HashMap();
        HashMap<JobVertexID, Integer> hashMap3 = new HashMap<>();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            JobVertexEstimations jobVertexEstimations = hashMap.get(jobVertex.getID());
            int i2 = 1;
            if (jobVertex.getInputs().size() == 0) {
                estimatedOutputRatio = jobVertexEstimations.getEstimatedRecordsThroughput() * d;
            } else {
                double d2 = 0.0d;
                Iterator<JobEdge> it = jobVertex.getInputs().iterator();
                while (it.hasNext()) {
                    d2 += ((Double) hashMap2.get(it.next().getSource().getProducer().getID())).doubleValue();
                }
                i2 = Math.max(1, (int) Math.ceil(d2 / jobVertexEstimations.getEstimatedRecordsThroughput()));
                estimatedOutputRatio = d2 * jobVertexEstimations.getEstimatedOutputRatio();
            }
            if (jobVertex.getMaxParallelism() != -1 && jobVertex.getMaxParallelism() < i2) {
                LOG.debug("Failed to assign parallelism for rate: {}, vertex: {}, estimated: {}, limit: {}", Double.valueOf(d), jobVertex.getID().toHexString(), Integer.valueOf(i2), Integer.valueOf(jobVertex.getMaxParallelism()));
                return null;
            }
            hashMap3.put(jobVertex.getID(), Integer.valueOf(i2));
            hashMap2.put(jobVertex.getID(), Double.valueOf(estimatedOutputRatio));
        }
        if (RedeploymentUtils.calculateSlotRequirements(jobGraph, hashMap3).intValue() <= i) {
            return hashMap3;
        }
        return null;
    }

    public void updateParallelism(JobGraph jobGraph) {
        HashMap hashMap = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            hashMap.put(jobVertex.getID(), Integer.valueOf(jobVertex.getParallelism()));
        }
        synchronized (this.coordinator) {
            this.coordinator.updateParallelism(hashMap);
            this.coordinator.cleanMetricsForJob(this.oldGraph.getJobID());
            this.oldGraph = jobGraph;
        }
    }
}
