package org.apache.flink.test.misc;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.class */
public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
    private static final int PARALLELISM = 16;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(8).build());

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
        configuration.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
        return configuration;
    }

    @Test
    public void testSuccessfulProgramAfterFailure() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        runConnectedComponents(executionEnvironment);
        try {
            runKMeans(executionEnvironment);
            Assert.fail("This program execution should have failed.");
        } catch (JobExecutionException e) {
            Assert.assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers"));
        }
        runConnectedComponents(executionEnvironment);
    }

    private static void runConnectedComponents(ExecutionEnvironment executionEnvironment) throws Exception {
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().disableSysoutLogging();
        PartitionOperator rebalance = ConnectedComponentsData.getDefaultVertexDataSet(executionEnvironment).rebalance();
        FlatMapOperator flatMap = ConnectedComponentsData.getDefaultEdgeDataSet(executionEnvironment).rebalance().flatMap(new ConnectedComponents.UndirectEdge());
        MapOperator map = rebalance.map(new ConnectedComponents.DuplicateValue());
        DeltaIteration iterateDelta = map.iterateDelta(map, 100, new int[]{0});
        JoinOperator.EquiJoin with = iterateDelta.getWorkset().join(flatMap).where(new int[]{0}).equalTo(new int[]{0}).with(new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new ConnectedComponents.ComponentIdFilter());
        iterateDelta.closeWith(with, with).output(new DiscardingOutputFormat());
        executionEnvironment.execute();
    }

    private static void runKMeans(ExecutionEnvironment executionEnvironment) throws Exception {
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.getConfig().disableSysoutLogging();
        PartitionOperator rebalance = KMeansData.getDefaultPointDataSet(executionEnvironment).rebalance();
        IterativeDataSet iterate = KMeansData.getDefaultCentroidDataSet(executionEnvironment).rebalance().iterate(20);
        rebalance.map(new KMeans.SelectNearestCenter()).withBroadcastSet(iterate.closeWith(rebalance.map(new KMeans.SelectNearestCenter()).withBroadcastSet(iterate, "centroids").rebalance().map(new KMeans.CountAppender()).groupBy(new int[]{0}).reduce(new KMeans.CentroidAccumulator()).rebalance().map(new KMeans.CentroidAverager())), "centroids").output(new DiscardingOutputFormat());
        executionEnvironment.execute("KMeans Example");
    }
}
