package org.apache.flink.streaming.tests;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram.class */
public class NettyShuffleMemoryControlTestProgram {
    private static final int RECORD_LENGTH = 2048;
    private static final ConfigOption<Integer> RUNNING_TIME_IN_SECONDS = ConfigOptions.key("test.running_time_in_seconds").defaultValue(120).withDescription("The time to run.");
    private static final ConfigOption<Integer> MAP_PARALLELISM = ConfigOptions.key("test.map_parallelism").defaultValue(1).withDescription("The number of map tasks.");
    private static final ConfigOption<Integer> REDUCE_PARALLELISM = ConfigOptions.key("test.reduce_parallelism").defaultValue(1).withDescription("The number of reduce tasks.");

    /* loaded from: input_file:org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram$DummySink.class */
    private static class DummySink extends RichSinkFunction<String> {
        private DummySink() {
        }

        public void invoke(String str, SinkFunction.Context context) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram$StringSourceFunction.class */
    private static class StringSourceFunction extends RichParallelSourceFunction<String> {
        private static final long serialVersionUID = 1;
        private volatile boolean isRunning;
        private final long runningTimeInSeconds;
        private transient long stopTime;

        public StringSourceFunction(long j) {
            this.runningTimeInSeconds = j;
        }

        public void open(Configuration configuration) {
            this.isRunning = true;
            this.stopTime = System.nanoTime() + (this.runningTimeInSeconds * 1000000000);
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) {
            byte[] bArr = new byte[NettyShuffleMemoryControlTestProgram.RECORD_LENGTH];
            for (int i = 0; i < NettyShuffleMemoryControlTestProgram.RECORD_LENGTH; i++) {
                bArr[i] = 97;
            }
            String str = new String(bArr);
            while (this.isRunning && System.nanoTime() < this.stopTime) {
                sourceContext.collect(str);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        int i = fromArgs.getInt(RUNNING_TIME_IN_SECONDS.key(), ((Integer) RUNNING_TIME_IN_SECONDS.defaultValue()).intValue());
        int i2 = fromArgs.getInt(MAP_PARALLELISM.key(), ((Integer) MAP_PARALLELISM.defaultValue()).intValue());
        int i3 = fromArgs.getInt(REDUCE_PARALLELISM.key(), ((Integer) REDUCE_PARALLELISM.defaultValue()).intValue());
        Preconditions.checkArgument(i > 0, "The running time in seconds should be positive, but it is {}", new Object[]{Integer.valueOf(i)});
        Preconditions.checkArgument(i2 > 0, "The number of map tasks should be positive, but it is {}", new Object[]{Integer.valueOf(i2)});
        Preconditions.checkArgument(i3 > 0, "The number of reduce tasks should be positve, but it is {}", new Object[]{Integer.valueOf(i3)});
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new StringSourceFunction(i)).setParallelism(i2).slotSharingGroup("a").shuffle().addSink(new DummySink()).setParallelism(i3).slotSharingGroup("b");
        executionEnvironment.execute("Netty Shuffle Memory Control Test");
    }
}
