package org.apache.flink.runtime.resourceestimator.predictions;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.estimator.EstimationConfidenceLevel;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.resourceestimator.JobVertexEstimations;
import org.apache.flink.runtime.resourceestimator.metrics.MetricSnapshot;
import org.apache.flink.runtime.resourceestimator.metrics.MetricsRequester;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.LatestPredictor;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.MetricWithTimestamp;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.Predictor;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.ProcessingTimePredictor;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.ProcessingTimePredictorImpl;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.SourceProcessingTimePredictor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourceestimator/predictions/PredictionCoordinatorImpl.class */
public class PredictionCoordinatorImpl implements PredictionCoordinator {

    @Nonnull
    private final MetricsRequester metricsRequester;

    @Nonnull
    private final EstimationConfidenceLevel confidenceLevel;
    protected final Logger log = LoggerFactory.getLogger(PredictionCoordinatorImpl.class);
    private int iteration = 0;
    private final HashMap<JobVertexID, Predictor<Double>> ratePredictor = new HashMap<>();
    private final HashMap<JobVertexID, ProcessingTimePredictor> processingTime = new HashMap<>();

    public PredictionCoordinatorImpl(@NotNull MetricsRequester metricsRequester, @NotNull EstimationConfidenceLevel estimationConfidenceLevel) {
        this.metricsRequester = metricsRequester;
        this.confidenceLevel = estimationConfidenceLevel;
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator
    public void updateModels(JobGraph jobGraph) {
        this.iteration++;
        HashMap<JobVertexID, List<List<MetricSnapshot>>> requestMetrics = this.metricsRequester.requestMetrics(jobGraph.getJobID(), this.ratePredictor.keySet());
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            if (requestMetrics.containsKey(jobVertex.getID())) {
                this.processingTime.get(jobVertex.getID()).addEntries(requestMetrics.getOrDefault(jobVertex.getID(), new ArrayList()));
                LinkedList linkedList = new LinkedList();
                if (jobVertex.isInputVertex()) {
                    Iterator<List<MetricSnapshot>> it = requestMetrics.get(jobVertex.getID()).iterator();
                    while (it.hasNext()) {
                        for (MetricSnapshot metricSnapshot : it.next()) {
                            linkedList.add(new MetricWithTimestamp(metricSnapshot.getTimestamp(), Double.valueOf(metricSnapshot.getNumRecordsOutPerSec())));
                        }
                    }
                } else {
                    Iterator<List<MetricSnapshot>> it2 = requestMetrics.get(jobVertex.getID()).iterator();
                    while (it2.hasNext()) {
                        for (MetricSnapshot metricSnapshot2 : it2.next()) {
                            if (metricSnapshot2.getNumRecordsIn() == 0) {
                                linkedList.add(new MetricWithTimestamp(metricSnapshot2.getTimestamp(), Double.valueOf(0.0d)));
                            } else {
                                this.log.info("Adding value: " + ((metricSnapshot2.getNumRecordsOut() * 1.0d) / metricSnapshot2.getNumRecordsIn()) + " for vertex: " + jobVertex.getID().toHexString());
                                linkedList.add(new MetricWithTimestamp(metricSnapshot2.getTimestamp(), Double.valueOf((metricSnapshot2.getNumRecordsOut() * 1.0d) / metricSnapshot2.getNumRecordsIn())));
                            }
                        }
                    }
                }
                this.ratePredictor.get(jobVertex.getID()).add(linkedList);
                this.ratePredictor.get(jobVertex.getID()).update();
            }
        }
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator
    public HashMap<JobVertexID, JobVertexEstimations> getJobVerticesInformation(long j) {
        this.log.info("Iterations: " + this.iteration);
        HashMap<JobVertexID, JobVertexEstimations> hashMap = new HashMap<>();
        for (JobVertexID jobVertexID : this.ratePredictor.keySet()) {
            hashMap.put(jobVertexID, new JobVertexEstimations(this.ratePredictor.get(jobVertexID).predict(j).doubleValue(), 1.0E9d / this.processingTime.get(jobVertexID).predictTime()));
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator
    public void updateParallelism(Map<JobVertexID, Integer> map) {
        for (JobVertexID jobVertexID : map.keySet()) {
            this.processingTime.get(jobVertexID).rescale(map.get(jobVertexID).intValue());
        }
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator
    public void addPredictableList(@Nonnull List<JobVertex> list) {
        for (JobVertex jobVertex : list) {
            if (jobVertex.isInputVertex()) {
                this.processingTime.put(jobVertex.getID(), new SourceProcessingTimePredictor());
            } else {
                this.processingTime.put(jobVertex.getID(), new ProcessingTimePredictorImpl(jobVertex.getParallelism(), this.confidenceLevel));
            }
            this.ratePredictor.put(jobVertex.getID(), new LatestPredictor());
        }
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.PredictionCoordinator
    public void cleanMetricsForJob(JobID jobID) {
        this.metricsRequester.cleanMetricsForJob(jobID);
    }

    @VisibleForTesting
    public HashMap<JobVertexID, Integer> getParallelism() {
        HashMap<JobVertexID, Integer> hashMap = new HashMap<>();
        for (Map.Entry<JobVertexID, ProcessingTimePredictor> entry : this.processingTime.entrySet()) {
            hashMap.put(entry.getKey(), Integer.valueOf(entry.getValue().getParallelism()));
        }
        return hashMap;
    }

    @VisibleForTesting
    public HashMap<JobVertexID, List<Long>> getProcessingTimePredictionLastTimestamps() {
        HashMap<JobVertexID, List<Long>> hashMap = new HashMap<>();
        for (Map.Entry<JobVertexID, ProcessingTimePredictor> entry : this.processingTime.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getLastTimestamps());
        }
        return hashMap;
    }
}
