package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/yarn/FlinkYarnSessionCliTest.class */
public class FlinkYarnSessionCliTest extends TestLogger {
    private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
    private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
    private static final String invalidPropertiesFile = "jasfobManager=22.33.44.55:asf6655";

    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();
    private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42);
    private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance(System.currentTimeMillis(), 43);
    private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;

    @Test
    public void testDynamicProperties() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "", "", false);
        Options options = new Options();
        flinkYarnSessionCli.addGeneralOptions(options);
        flinkYarnSessionCli.addRunOptions(options);
        AbstractYarnClusterDescriptor createClusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(new DefaultParser().parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"}));
        Assert.assertNotNull(createClusterDescriptor);
        Map dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(createClusterDescriptor.getDynamicPropertiesEncoded());
        Assert.assertEquals(2L, dynamicProperties.size());
        Assert.assertEquals("5 min", dynamicProperties.get("akka.ask.timeout"));
        Assert.assertEquals("-DappName=foobar", dynamicProperties.get("env.java.opts"));
    }

    @Test
    public void testCorrectSettingOfMaxSlots() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        CommandLine parseCommandLineOptions = flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-ys", "3"}, true);
        flinkYarnSessionCli.createClusterDescriptor(parseCommandLineOptions);
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(parseCommandLineOptions);
        Assert.assertEquals(3L, clusterSpecification.getSlotsPerTaskManager());
        Assert.assertEquals(2L, clusterSpecification.getNumberTaskManagers());
    }

    @Test
    public void testCorrectSettingOfDetachedMode() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertTrue(flinkYarnSessionCli.createClusterDescriptor(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yd"}, true)).isDetachedMode());
    }

    @Test
    public void testZookeeperNamespaceProperty() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertEquals("flink_test_namespace", flinkYarnSessionCli.createClusterDescriptor(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-yz", "flink_test_namespace"}, true)).getZookeeperNamespace());
    }

    @Test
    public void testNodeLabelProperty() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertEquals("flink_test_nodelabel", flinkYarnSessionCli.createClusterDescriptor(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-ynl", "flink_test_nodelabel"}, true)).getNodeLabel());
    }

    @Test
    public void testResumeFromYarnPropertiesFile() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertEquals(TEST_YARN_APPLICATION_ID, flinkYarnSessionCli.getClusterId(flinkYarnSessionCli.parseCommandLineOptions(new String[0], true)));
    }

    @Test(expected = FlinkException.class)
    public void testInvalidYarnPropertiesFile() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(invalidPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
    }

    @Test
    public void testResumeFromYarnID() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertEquals(TEST_YARN_APPLICATION_ID, flinkYarnSessionCli.getClusterId(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}, true)));
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertTrue(flinkYarnSessionCli.createClusterDescriptor(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}, true)).getFlinkConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID).matches("application_\\d+_0042"));
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertEquals("my_cluster", flinkYarnSessionCli.createClusterDescriptor(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", "my_cluster"}, true)).getFlinkConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID));
    }

    @Test
    public void testYarnIDOverridesPropertiesFile() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        Assert.assertEquals(TEST_YARN_APPLICATION_ID_2, flinkYarnSessionCli.getClusterId(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID_2.toString()}, true)));
    }

    @Test
    public void testCommandLineClusterSpecification() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, "1337m");
        configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, "7331m");
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 30);
        String[] strArr = {"-yjm", String.valueOf(1337) + "m", "-ytm", String.valueOf(7331) + "m", "-ys", String.valueOf(30)};
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(strArr, false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1337));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(7331));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getSlotsPerTaskManager()), Matchers.is(30));
    }

    @Test
    public void testConfigurationClusterSpecification() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, "1337m");
        configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, "7331m");
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 42);
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(new String[0], false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1337));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(7331));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getSlotsPerTaskManager()), Matchers.is(42));
    }

    @Test
    public void testHeapMemoryPropertyWithoutUnit() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-yjm", "1024", "-ytm", "2048"}, false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testHeapMemoryPropertyWithUnitMB() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-yjm", "1024m", "-ytm", "2048m"}, false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-yjm", "1g", "-ytm", "2g"}, false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
        configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(new String[0], false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(2048));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(4096));
    }

    @Test
    public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(flinkYarnSessionCli.parseCommandLineOptions(new String[0], false));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(1024));
    }

    private File writeYarnPropertiesFile(String str) throws IOException {
        File newFolder = this.tmp.newFolder();
        Files.write(new File(newFolder, ".yarn-properties-" + System.getProperty("user.name")).toPath(), str.getBytes(), StandardOpenOption.CREATE);
        return newFolder.getAbsoluteFile();
    }

    @Test
    public void testMultiDirSupportyt() throws Exception {
        File file = new File("../yt_dir");
        file.mkdirs();
        File file2 = new File("../yt_dir1");
        file2.mkdirs();
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
        AbstractYarnClusterDescriptor createClusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(flinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yn", "2", "-yt", "../yt_dir@@../yt_dir1"}, true));
        Assert.assertEquals(file, createClusterDescriptor.shipFiles.get(0));
        Assert.assertEquals(file2, createClusterDescriptor.shipFiles.get(1));
    }
}
