package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.class */
public class DefaultVertexParallelismDecider implements VertexParallelismDecider {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultVertexParallelismDecider.class);
    private static final double CAP_RATIO_OF_BROADCAST = 0.5d;
    private final int maxParallelism;
    private final int minParallelism;
    private final long dataVolumePerTask;
    private final int defaultSourceParallelism;

    private DefaultVertexParallelismDecider(int i, int i2, MemorySize memorySize, int i3) {
        Preconditions.checkArgument(i2 > 0, "The minimum parallelism must be larger than 0.");
        Preconditions.checkArgument(i >= i2, "Maximum parallelism should be greater than or equal to the minimum parallelism.");
        Preconditions.checkArgument(i3 > 0, "The default source parallelism must be larger than 0.");
        Preconditions.checkNotNull(memorySize);
        this.maxParallelism = i;
        this.minParallelism = i2;
        this.dataVolumePerTask = memorySize.getBytes();
        this.defaultSourceParallelism = i3;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider
    public int decideParallelismForVertex(List<BlockingResultInfo> list) {
        return list.isEmpty() ? this.defaultSourceParallelism : calculateParallelism(list);
    }

    private int calculateParallelism(List<BlockingResultInfo> list) {
        long sum = list.stream().filter((v0) -> {
            return v0.isBroadcast();
        }).mapToLong(blockingResultInfo -> {
            return blockingResultInfo.getBlockingPartitionSizes().stream().reduce(0L, (v0, v1) -> {
                return Long.sum(v0, v1);
            }).longValue();
        }).sum();
        long sum2 = list.stream().filter(blockingResultInfo2 -> {
            return !blockingResultInfo2.isBroadcast();
        }).mapToLong(blockingResultInfo3 -> {
            return blockingResultInfo3.getBlockingPartitionSizes().stream().reduce(0L, (v0, v1) -> {
                return Long.sum(v0, v1);
            }).longValue();
        }).sum();
        long ceil = (long) Math.ceil(this.dataVolumePerTask * 0.5d);
        if (sum > ceil) {
            LOG.info("The size of broadcast data {} is larger than the expected maximum value {} ('{}' * {}). Use {} as the size of broadcast data to decide the parallelism.", new MemorySize(sum), new MemorySize(ceil), JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK.key(), Double.valueOf(0.5d), new MemorySize(ceil));
            sum = ceil;
        }
        int ceil2 = (int) Math.ceil(sum2 / (this.dataVolumePerTask - sum));
        int normalizeParallelism = normalizeParallelism(ceil2);
        LOG.debug("The size of broadcast data is {}, the size of non-broadcast data is {}, the initially decided parallelism is {}, after normalize is {}", new MemorySize(sum), new MemorySize(sum2), Integer.valueOf(ceil2), Integer.valueOf(normalizeParallelism));
        if (normalizeParallelism < this.minParallelism) {
            LOG.info("The initially normalized parallelism {} is smaller than the normalized minimum parallelism {}. Use {} as the finally decided parallelism.", Integer.valueOf(normalizeParallelism), Integer.valueOf(this.minParallelism), Integer.valueOf(this.minParallelism));
            normalizeParallelism = this.minParallelism;
        } else if (normalizeParallelism > this.maxParallelism) {
            LOG.info("The initially normalized parallelism {} is larger than the normalized maximum parallelism {}. Use {} as the finally decided parallelism.", Integer.valueOf(normalizeParallelism), Integer.valueOf(this.maxParallelism), Integer.valueOf(this.maxParallelism));
            normalizeParallelism = this.maxParallelism;
        }
        return normalizeParallelism;
    }

    @VisibleForTesting
    int getMaxParallelism() {
        return this.maxParallelism;
    }

    @VisibleForTesting
    int getMinParallelism() {
        return this.minParallelism;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DefaultVertexParallelismDecider from(Configuration configuration) {
        int normalizedMaxParallelism = getNormalizedMaxParallelism(configuration);
        int normalizedMinParallelism = getNormalizedMinParallelism(configuration);
        Preconditions.checkState(normalizedMaxParallelism >= normalizedMinParallelism, String.format("Invalid configuration: '%s' should be greater than or equal to '%s' and the range must contain at least one power of 2.", JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(), JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key()));
        return new DefaultVertexParallelismDecider(normalizedMaxParallelism, normalizedMinParallelism, (MemorySize) configuration.get(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK), ((Integer) configuration.get(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM)).intValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int getNormalizedMaxParallelism(Configuration configuration) {
        return MathUtils.roundDownToPowerOf2(configuration.getInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
    }

    static int getNormalizedMinParallelism(Configuration configuration) {
        return MathUtils.roundUpToPowerOfTwo(configuration.getInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM));
    }

    static int normalizeParallelism(int i) {
        int roundDownToPowerOf2 = MathUtils.roundDownToPowerOf2(i);
        int roundUpToPowerOfTwo = MathUtils.roundUpToPowerOfTwo(i);
        return i < (roundUpToPowerOfTwo + roundDownToPowerOf2) / 2 ? roundDownToPowerOf2 : roundUpToPowerOfTwo;
    }
}
