package org.apache.flink.streaming.api.graph;

import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.shaded.guava31.com.google.common.hash.HashFunction;
import org.apache.flink.shaded.guava31.com.google.common.hash.Hasher;
import org.apache.flink.shaded.guava31.com.google.common.hash.Hashing;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphHasherV2.class */
public class StreamGraphHasherV2 implements StreamGraphHasher {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV2.class);

    @Override // org.apache.flink.streaming.api.graph.StreamGraphHasher
    public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
        HashFunction murmur3_128 = Hashing.murmur3_128(0);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        ArrayDeque arrayDeque = new ArrayDeque();
        ArrayList<Integer> arrayList = new ArrayList();
        Iterator<Integer> it = streamGraph.getSourceIDs().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        Collections.sort(arrayList);
        for (Integer num : arrayList) {
            arrayDeque.add(streamGraph.getStreamNode(num));
            hashSet.add(num);
        }
        while (true) {
            StreamNode streamNode = (StreamNode) arrayDeque.poll();
            if (streamNode == null) {
                return hashMap;
            }
            if (generateNodeHash(streamNode, murmur3_128, hashMap, streamGraph.isChainingEnabled(), streamGraph)) {
                Iterator<StreamEdge> it2 = streamNode.getOutEdges().iterator();
                while (it2.hasNext()) {
                    StreamNode targetVertex = streamGraph.getTargetVertex(it2.next());
                    if (!hashSet.contains(Integer.valueOf(targetVertex.getId()))) {
                        arrayDeque.add(targetVertex);
                        hashSet.add(Integer.valueOf(targetVertex.getId()));
                    }
                }
            } else {
                hashSet.remove(Integer.valueOf(streamNode.getId()));
            }
        }
    }

    private boolean generateNodeHash(StreamNode streamNode, HashFunction hashFunction, Map<Integer, byte[]> map, boolean z, StreamGraph streamGraph) {
        String transformationUID = streamNode.getTransformationUID();
        if (transformationUID == null) {
            Iterator<StreamEdge> it = streamNode.getInEdges().iterator();
            while (it.hasNext()) {
                if (!map.containsKey(Integer.valueOf(it.next().getSourceId()))) {
                    return false;
                }
            }
            if (map.put(Integer.valueOf(streamNode.getId()), generateDeterministicHash(streamNode, hashFunction.newHasher(), map, z, streamGraph)) != null) {
                throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
            }
            return true;
        }
        byte[] generateUserSpecifiedHash = generateUserSpecifiedHash(streamNode, hashFunction.newHasher());
        Iterator<byte[]> it2 = map.values().iterator();
        while (it2.hasNext()) {
            if (Arrays.equals(it2.next(), generateUserSpecifiedHash)) {
                throw new IllegalArgumentException("Hash collision on user-specified ID \"" + transformationUID + "\". Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.");
            }
        }
        if (map.put(Integer.valueOf(streamNode.getId()), generateUserSpecifiedHash) != null) {
            throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
        }
        return true;
    }

    private byte[] generateUserSpecifiedHash(StreamNode streamNode, Hasher hasher) {
        hasher.putString(streamNode.getTransformationUID(), Charset.forName("UTF-8"));
        return hasher.hash().asBytes();
    }

    private byte[] generateDeterministicHash(StreamNode streamNode, Hasher hasher, Map<Integer, byte[]> map, boolean z, StreamGraph streamGraph) {
        generateNodeLocalHash(hasher, map.size());
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (!((streamEdge.getPartitioner() instanceof ForwardPartitioner) && ((ForwardPartitioner) streamEdge.getPartitioner()).isReplacedHashPartitioner()) && isChainable(streamEdge, z, streamGraph)) {
                generateNodeLocalHash(hasher, map.size());
            }
        }
        byte[] asBytes = hasher.hash().asBytes();
        for (StreamEdge streamEdge2 : streamNode.getInEdges()) {
            byte[] bArr = map.get(Integer.valueOf(streamEdge2.getSourceId()));
            if (bArr == null) {
                throw new IllegalStateException("Missing hash for input node " + streamGraph.getSourceVertex(streamEdge2) + ". Cannot generate hash for " + streamNode + ".");
            }
            for (int i = 0; i < asBytes.length; i++) {
                asBytes[i] = (byte) ((asBytes[i] * 37) ^ bArr[i]);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Generated hash '" + StringUtils.byteToHexString(asBytes) + "' for node '" + streamNode.toString() + "' {id: " + streamNode.getId() + ", parallelism: " + streamNode.getParallelism() + ", user function: " + (streamNode.getOperatorFactory() instanceof UdfStreamOperatorFactory ? ((UdfStreamOperatorFactory) streamNode.getOperatorFactory()).getUserFunctionClassName() : "") + "}");
        }
        return asBytes;
    }

    private void generateNodeLocalHash(Hasher hasher, int i) {
        hasher.putInt(i);
    }

    private boolean isChainable(StreamEdge streamEdge, boolean z, StreamGraph streamGraph) {
        return z && StreamingJobGraphGenerator.isChainable(streamEdge, streamGraph);
    }
}
