package org.apache.flink.runtime.rescaling;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphOperatorDetails;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorDetails;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.resourceestimator.DynamicResourceEstimator;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rescaling/RescaleUtils.class */
public class RescaleUtils {
    private static final Logger LOG = LoggerFactory.getLogger(RescaleUtils.class);

    public static Integer calculateSlotRequirements(JobGraph jobGraph) throws FlinkRuntimeException {
        HashMap hashMap = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            String slotSharingGroupId = jobVertex.getSlotSharingGroup().getSlotSharingGroupId().toString();
            if (hashMap.containsKey(slotSharingGroupId)) {
                hashMap.computeIfPresent(slotSharingGroupId, (str, num) -> {
                    return Integer.valueOf(Math.max(num.intValue(), jobVertex.getParallelism()));
                });
            } else {
                hashMap.put(slotSharingGroupId, Integer.valueOf(jobVertex.getParallelism()));
            }
            if (jobVertex.getCoLocationGroup() != null) {
                throw new FlinkRuntimeException("CoLocationGroup for vertex is not supported in Rescale Manager.");
            }
        }
        return (Integer) hashMap.values().stream().reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
    }

    public static Integer calculateMinimalSlotRequirements(JobGraph jobGraph) {
        HashSet hashSet = new HashSet();
        Iterator<JobVertex> it = jobGraph.getVerticesSortedTopologicallyFromSources().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getSlotSharingGroup().toString());
        }
        return Integer.valueOf(hashSet.size());
    }

    public static Integer calculateSlotRequirements(JobGraph jobGraph, HashMap<JobVertexID, Integer> hashMap) throws FlinkRuntimeException {
        HashMap hashMap2 = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
            String slotSharingGroupId = jobVertex.getSlotSharingGroup().getSlotSharingGroupId().toString();
            if (hashMap2.containsKey(slotSharingGroupId)) {
                hashMap2.computeIfPresent(slotSharingGroupId, (str, num) -> {
                    return Integer.valueOf(Math.max(num.intValue(), ((Integer) hashMap.getOrDefault(jobVertex.getID(), 1)).intValue()));
                });
            } else {
                hashMap2.put(slotSharingGroupId, hashMap.getOrDefault(jobVertex.getID(), 1));
            }
            if (jobVertex.getCoLocationGroup() != null) {
                throw new FlinkRuntimeException("CoLocationGroup for vertex is not supported in Rescale Manager.");
            }
        }
        return (Integer) hashMap2.values().stream().reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
    }

    public static <T extends Serializable> T createCopySerializable(T t) throws IOException {
        if (t == null) {
            throw new IllegalArgumentException();
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(t);
        objectOutputStream.close();
        byteArrayOutputStream.close();
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
            Throwable th = null;
            try {
                T t2 = (T) objectInputStream.readObject();
                if (objectInputStream != null) {
                    if (0 != 0) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                return t2;
            } finally {
            }
        } catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
    }

    public static JobGraph copyJobGraph(JobGraph jobGraph, BlobServer blobServer) throws IOException {
        JobGraph jobGraph2 = (JobGraph) createCopySerializable(jobGraph);
        JobGraph jobGraph3 = new JobGraph(JobID.generate(), jobGraph2.getName());
        jobGraph3.setSnapshotSettings(jobGraph2.getCheckpointingSettings());
        jobGraph3.setScheduleMode(jobGraph2.getScheduleMode());
        Optional<JobGraphOperatorDetails> jobGraphOperatorDetails = jobGraph.getJobGraphOperatorDetails();
        jobGraph3.getClass();
        jobGraphOperatorDetails.ifPresent(jobGraph3::setJobGraphOperatorDetails);
        Iterator<JobVertex> it = jobGraph2.getVertices().iterator();
        while (it.hasNext()) {
            jobGraph3.addVertex(it.next());
        }
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph2.getUserArtifacts().entrySet()) {
            jobGraph3.addUserArtifact(entry.getKey(), entry.getValue());
        }
        jobGraph3.writeUserArtifactEntriesToConfiguration();
        jobGraph3.setClasspaths(jobGraph2.getClasspaths());
        Iterator<Path> it2 = jobGraph2.getUserJars().iterator();
        while (it2.hasNext()) {
            jobGraph3.addJar(it2.next());
        }
        Iterator<PermanentBlobKey> it3 = jobGraph2.getUserJarBlobKeys().iterator();
        while (it3.hasNext()) {
            jobGraph3.addUserJarBlobKey(blobServer.copyFileToOtherJob(jobGraph2.getJobID(), jobGraph3.getJobID(), it3.next()));
        }
        return jobGraph3;
    }

    public static JobGraph modifyParallelismForJobGraph(JobGraph jobGraph, int i, BlobServer blobServer) throws FlinkException {
        try {
            JobGraph copyJobGraph = copyJobGraph(jobGraph, blobServer);
            ExecutionConfig jobExecutionConfig = getJobExecutionConfig(jobGraph);
            jobExecutionConfig.setParallelism(i);
            copyJobGraph.setExecutionConfig(jobExecutionConfig);
            for (JobVertex jobVertex : copyJobGraph.getVerticesSortedTopologicallyFromSources()) {
                if (!jobVertex.isInputVertex()) {
                    jobVertex.setParallelism(i);
                    jobVertex.setMaxParallelism(-1);
                }
            }
            return copyJobGraph;
        } catch (IOException e) {
            FlinkException flinkException = new FlinkException("Cannot serialize ExecutionConfig of jobGraph.");
            flinkException.addSuppressed(e);
            throw flinkException;
        }
    }

    public static Optional<JobGraph> modifyParallelismForJobGraph(JobGraph jobGraph, Map<JobVertexID, Integer> map, BlobServer blobServer) throws FlinkException {
        if (map.isEmpty()) {
            return Optional.empty();
        }
        try {
            JobGraph copyJobGraph = copyJobGraph(jobGraph, blobServer);
            LOG.debug("creating rescaled jobGraph with id " + copyJobGraph.getJobID());
            ExecutionConfig jobExecutionConfig = getJobExecutionConfig(jobGraph);
            Optional<Integer> max = map.values().stream().filter(num -> {
                return num.intValue() > 0;
            }).max((v0, v1) -> {
                return v0.compareTo(v1);
            });
            jobExecutionConfig.getClass();
            max.ifPresent((v1) -> {
                r1.setParallelism(v1);
            });
            copyJobGraph.setExecutionConfig(jobExecutionConfig);
            Map<OperatorID, OperatorDetails> emptyMap = Collections.emptyMap();
            Optional<JobGraphOperatorDetails> jobGraphOperatorDetails = copyJobGraph.getJobGraphOperatorDetails();
            if (jobGraphOperatorDetails.isPresent()) {
                emptyMap = jobGraphOperatorDetails.get().getOperators();
            }
            for (JobVertex jobVertex : copyJobGraph.getVerticesSortedTopologicallyFromSources()) {
                int intValue = map.get(jobVertex.getID()).intValue();
                if (intValue > 0 && intValue != jobVertex.getParallelism()) {
                    jobVertex.setParallelism(intValue);
                    jobVertex.setMaxParallelism(jobVertex.getMaxParallelism());
                    Map<OperatorID, OperatorDetails> map2 = emptyMap;
                    jobVertex.getOperatorIDs().forEach(operatorIDPair -> {
                        OperatorID generatedOperatorID = operatorIDPair.getGeneratedOperatorID();
                        if (map2.containsKey(generatedOperatorID)) {
                            ((OperatorDetails) map2.get(generatedOperatorID)).setParallelism(intValue);
                        }
                    });
                }
            }
            return Optional.of(copyJobGraph);
        } catch (IOException e) {
            FlinkException flinkException = new FlinkException("Cannot serialize ExecutionConfig of jobGraph.");
            flinkException.addSuppressed(e);
            throw flinkException;
        }
    }

    public static Optional<JobGraph> modifyParallelismForJobGraphIgnoringOperators(JobGraph jobGraph, Map<JobVertexID, Integer> map, BlobServer blobServer) throws FlinkException {
        if (map.isEmpty()) {
            return Optional.empty();
        }
        try {
            JobGraph copyJobGraph = copyJobGraph(jobGraph, blobServer);
            LOG.info("creating rescaled jobGraph with id " + copyJobGraph.getJobID());
            ExecutionConfig jobExecutionConfig = getJobExecutionConfig(jobGraph);
            Optional<Integer> max = map.values().stream().max((v0, v1) -> {
                return v0.compareTo(v1);
            });
            jobExecutionConfig.getClass();
            max.ifPresent((v1) -> {
                r1.setParallelism(v1);
            });
            copyJobGraph.setExecutionConfig(jobExecutionConfig);
            for (JobVertex jobVertex : copyJobGraph.getVerticesSortedTopologicallyFromSources()) {
                int intValue = map.get(jobVertex.getID()).intValue();
                if (intValue > 0 && intValue != jobVertex.getParallelism()) {
                    jobVertex.setParallelism(intValue);
                    jobVertex.setMaxParallelism(jobVertex.getMaxParallelism());
                }
            }
            return Optional.of(copyJobGraph);
        } catch (IOException e) {
            FlinkException flinkException = new FlinkException("Cannot serialize ExecutionConfig of jobGraph.");
            flinkException.addSuppressed(e);
            throw flinkException;
        }
    }

    public static Optional<JobGraph> modifyParallelismByResourceEstimation(JobGraph jobGraph, List<JobGraph> list, DynamicResourceEstimator dynamicResourceEstimator, BlobServer blobServer) throws FlinkException {
        try {
            return dynamicResourceEstimator.calcNewParallelismJobGraph(jobGraph, list, (jobGraph2, map) -> {
                try {
                    return modifyParallelismForJobGraph(jobGraph2, (Map<JobVertexID, Integer>) map, blobServer);
                } catch (Exception e) {
                    LOG.warn("Error during modification of job parallelism", e);
                    return Optional.empty();
                }
            });
        } catch (Exception e) {
            LOG.warn("Error during calculation of job parallelism", e);
            return Optional.empty();
        }
    }

    public static ExecutionConfig getJobExecutionConfig(JobGraph jobGraph) throws FlinkException {
        try {
            return (ExecutionConfig) jobGraph.getSerializedExecutionConfig().deserializeValue(Thread.currentThread().getContextClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new FlinkException("JobExecutionConfig cannot be deserialized", e);
        }
    }

    public static boolean compareParallelism(JobGraph jobGraph, JobGraph jobGraph2) {
        Preconditions.checkState(jobGraph.getNumberOfVertices() == jobGraph2.getNumberOfVertices(), "Jobs should contain the same number of vertices");
        List<JobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        List<JobVertex> verticesSortedTopologicallyFromSources2 = jobGraph2.getVerticesSortedTopologicallyFromSources();
        for (int i = 0; i < verticesSortedTopologicallyFromSources.size(); i++) {
            if (verticesSortedTopologicallyFromSources.get(i).getParallelism() != verticesSortedTopologicallyFromSources2.get(i).getParallelism()) {
                return false;
            }
        }
        return true;
    }
}
