package org.apache.flink.runtime.resourceestimator.predictions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.runtime.rescaling.provider.dataflow.EstimationConfidenceLevel;
import org.apache.flink.runtime.resourceestimator.metrics.MetricSnapshot;
import org.apache.flink.runtime.resourceestimator.metrics.SourceMetricSnapshot;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.MetricWithTimestamp;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.Predictor;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.ProcessingTimePredictor;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.ProcessingTimePredictorImpl;
import org.apache.flink.runtime.resourceestimator.predictions.predictors.timeseries.ExponentialSmoothing;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourceestimator/predictions/SourceEstimator.class */
public class SourceEstimator implements VertexEstimator {
    private static final Logger LOG = LoggerFactory.getLogger(SourceEstimator.class);
    private final ProcessingTimePredictor processingTimePredictor;
    private SourceMetricSnapshot latestSnapshot;
    private boolean skipFirst;
    private int parallelism;
    private final String sourceName;
    private final List<Double> lastPartitionValues = new ArrayList();
    private final Predictor<Double> inputRecordsPredictor = new ExponentialSmoothing(false);

    public SourceEstimator(String str, EstimationConfidenceLevel estimationConfidenceLevel) {
        this.sourceName = str;
        this.processingTimePredictor = new ProcessingTimePredictorImpl(this.parallelism, estimationConfidenceLevel);
        rescale(1);
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.VertexEstimator
    public void updateEstimator(List<List<MetricSnapshot>> list) {
        Preconditions.checkState(list.size() == 1, "Snapshot size doesn't fit vertex parallelism.");
        List<MetricWithTimestamp<Double>> arrayList = new ArrayList();
        Iterator<MetricSnapshot> it = list.get(0).iterator();
        while (it.hasNext()) {
            SourceMetricSnapshot sourceMetricSnapshot = (SourceMetricSnapshot) it.next();
            if (sourceMetricSnapshot.getPartitionInfo() != null) {
                if (!this.skipFirst) {
                    double d = 0.0d;
                    boolean z = false;
                    for (int i = 0; i < sourceMetricSnapshot.getPartitionInfo().size(); i++) {
                        if (!((Long) sourceMetricSnapshot.getPartitionInfo().get(i).f0).equals(this.latestSnapshot.getPartitionInfo().get(i).f0)) {
                            z = true;
                            long longValue = ((Long) sourceMetricSnapshot.getPartitionInfo().get(i).f1).longValue() - ((Long) this.latestSnapshot.getPartitionInfo().get(i).f1).longValue();
                            long longValue2 = ((Long) sourceMetricSnapshot.getPartitionInfo().get(i).f0).longValue() - ((Long) this.latestSnapshot.getPartitionInfo().get(i).f0).longValue();
                            d += ((longValue * 1.0d) / longValue2) * 1000.0d;
                            if (this.lastPartitionValues.size() > i) {
                                this.lastPartitionValues.set(i, Double.valueOf(((longValue * 1.0d) / longValue2) * 1000.0d));
                            } else {
                                this.lastPartitionValues.add(Double.valueOf(((longValue * 1.0d) / longValue2) * 1000.0d));
                            }
                        } else if (this.lastPartitionValues.size() > i) {
                            d += this.lastPartitionValues.get(i).doubleValue();
                        }
                    }
                    if (z) {
                        arrayList.add(new MetricWithTimestamp<>(sourceMetricSnapshot.getTimestamp(), Double.valueOf(d)));
                    }
                }
                this.skipFirst = false;
                arrayList = squash(arrayList);
            } else {
                long emitTimeNs = sourceMetricSnapshot.getEmitTimeNs();
                long emitTimeNs2 = emitTimeNs - this.latestSnapshot.getEmitTimeNs();
                long numRecordsOut = sourceMetricSnapshot.getNumRecordsOut() - this.latestSnapshot.getNumRecordsOut();
                Logger logger = LOG;
                Object[] objArr = new Object[8];
                objArr[0] = this.sourceName;
                objArr[1] = Long.valueOf(emitTimeNs);
                objArr[2] = Long.valueOf(sourceMetricSnapshot.getNumRecordsOut());
                objArr[3] = Long.valueOf(this.latestSnapshot.getEmitTimeNs());
                objArr[4] = Long.valueOf(this.latestSnapshot.getNumRecordsOut());
                objArr[5] = Long.valueOf(emitTimeNs2);
                objArr[6] = Long.valueOf(numRecordsOut);
                objArr[7] = Boolean.valueOf(emitTimeNs2 > 0);
                logger.debug("Current metrics for {} are: {} ns, {}\nPrevious metrics are: {}ns, {}\nDeltas: {}ns {}\nAdd: {}", objArr);
                if (emitTimeNs2 > 0) {
                    arrayList.add(new MetricWithTimestamp<>(System.currentTimeMillis(), Double.valueOf(1.0E-4d + (((numRecordsOut * 1.0d) / emitTimeNs2) * 1.0E9d))));
                }
            }
            this.latestSnapshot = sourceMetricSnapshot;
        }
        if (!arrayList.isEmpty()) {
            this.inputRecordsPredictor.add(arrayList);
            this.inputRecordsPredictor.update();
        }
        this.processingTimePredictor.addEntries((List) list.stream().map(list2 -> {
            return (List) list2.stream().map((v0) -> {
                return v0.toProcessingTimeEntry();
            }).collect(Collectors.toList());
        }).collect(Collectors.toList()));
    }

    private List<MetricWithTimestamp<Double>> squash(Collection<MetricWithTimestamp<Double>> collection) {
        TreeMap treeMap = new TreeMap();
        for (MetricWithTimestamp<Double> metricWithTimestamp : collection) {
            if (!treeMap.containsKey(Long.valueOf(metricWithTimestamp.getTimestamp()))) {
                treeMap.put(Long.valueOf(metricWithTimestamp.getTimestamp()), new LinkedList());
            }
            ((List) treeMap.get(Long.valueOf(metricWithTimestamp.getTimestamp()))).add(metricWithTimestamp.getValue());
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : treeMap.entrySet()) {
            double d = 0.0d;
            Iterator it = ((List) entry.getValue()).iterator();
            while (it.hasNext()) {
                d += ((Double) it.next()).doubleValue();
            }
            arrayList.add(new MetricWithTimestamp(((Long) entry.getKey()).longValue(), Double.valueOf(d)));
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.VertexEstimator
    public VertexEstimations estimate() {
        double predictTime = 1.0E9d / this.processingTimePredictor.predictTime(2000000.0d);
        double doubleValue = this.inputRecordsPredictor.predict(500.0d).doubleValue();
        LOG.debug("Source metrics: {" + doubleValue + ", " + predictTime + "}");
        return new VertexEstimations(doubleValue, predictTime, 1.0d);
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.VertexEstimator
    public void rescale(int i) {
        this.processingTimePredictor.rescale(1);
        this.latestSnapshot = SourceMetricSnapshot.emptySnapshot();
        this.skipFirst = true;
        this.parallelism = i;
        this.lastPartitionValues.clear();
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.VertexEstimator
    public int getCurrentParallelism() {
        return this.parallelism;
    }

    @Override // org.apache.flink.runtime.resourceestimator.predictions.VertexEstimator
    public List<Long> getLatestTimestamps() {
        return Collections.singletonList(Long.valueOf(this.latestSnapshot.getTimestamp()));
    }
}
