package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StateSerdes;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.class */
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
    private Logger log;
    private String logPrefix;
    private static final UUID FUTURE_ID = UUID.randomUUID();
    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = Comparator.comparing((v0) -> {
        return v0.topic();
    }).thenComparingInt((v0) -> {
        return v0.partition();
    });
    private String userEndPoint;
    private AssignorConfiguration.AssignmentConfigs assignmentConfigs;
    private Supplier<Consumer<byte[], byte[]>> mainConsumerSupplier;
    private Admin adminClient;
    private TaskManager taskManager;
    private StreamsMetadataState streamsMetadataState;
    private PartitionGrouper partitionGrouper;
    private AtomicInteger assignmentErrorCode;
    private AtomicLong nextScheduledRebalanceMs;
    private Queue<StreamsException> nonFatalExceptionsToHandle;
    private Time time;
    protected int usedSubscriptionMetadataVersion = 11;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol;
    private AssignorConfiguration.AssignmentListener assignmentListener;
    private Supplier<TaskAssignor> taskAssignorSupplier;
    private byte uniqueField;
    private Map<String, String> clientTags;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor$AssignedPartition.class */
    public static class AssignedPartition implements Comparable<AssignedPartition> {
        private final TaskId taskId;
        private final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition topicPartition) {
            this.taskId = taskId;
            this.partition = topicPartition;
        }

        @Override // java.lang.Comparable
        public int compareTo(AssignedPartition assignedPartition) {
            return StreamsPartitionAssignor.PARTITION_COMPARATOR.compare(this.partition, assignedPartition.partition);
        }

        public boolean equals(Object obj) {
            return (obj instanceof AssignedPartition) && compareTo((AssignedPartition) obj) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor$ClientMetadata.class */
    public static class ClientMetadata {
        private final HostInfo hostInfo;
        private final ClientState state;
        private final SortedSet<String> consumers = new TreeSet();

        ClientMetadata(UUID uuid, String str, Map<String, String> map) {
            this.hostInfo = HostInfo.buildFromEndpoint(str);
            this.state = new ClientState(uuid, map);
        }

        void addConsumer(String str, List<TopicPartition> list) {
            this.consumers.add(str);
            this.state.incrementCapacity();
            this.state.addOwnedPartitions(list, str);
        }

        void addPreviousTasksAndOffsetSums(String str, Map<TaskId, Long> map) {
            this.state.addPreviousTasksAndOffsetSums(str, map);
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    public void configure(Map<String, ?> map) {
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(map);
        this.logPrefix = assignorConfiguration.logPrefix();
        this.log = new LogContext(this.logPrefix).logger(getClass());
        this.usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(this.usedSubscriptionMetadataVersion);
        ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();
        this.mainConsumerSupplier = () -> {
            return (Consumer) Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified");
        };
        this.adminClient = (Admin) Objects.requireNonNull(referenceContainer.adminClient, "Admin client was not specified");
        this.taskManager = (TaskManager) Objects.requireNonNull(referenceContainer.taskManager, "TaskManager was not specified");
        this.streamsMetadataState = (StreamsMetadataState) Objects.requireNonNull(referenceContainer.streamsMetadataState, "StreamsMetadataState was not specified");
        this.assignmentErrorCode = referenceContainer.assignmentErrorCode;
        this.nextScheduledRebalanceMs = referenceContainer.nextScheduledRebalanceMs;
        this.nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle;
        this.time = (Time) Objects.requireNonNull(referenceContainer.time, "Time was not specified");
        this.assignmentConfigs = assignorConfiguration.assignmentConfigs();
        this.partitionGrouper = new PartitionGrouper();
        this.userEndPoint = assignorConfiguration.userEndPoint();
        this.internalTopicManager = assignorConfiguration.internalTopicManager();
        this.copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
        this.rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
        Objects.requireNonNull(assignorConfiguration);
        this.taskAssignorSupplier = assignorConfiguration::taskAssignor;
        this.assignmentListener = assignorConfiguration.assignmentListener();
        this.uniqueField = (byte) 0;
        this.clientTags = referenceContainer.clientTags;
    }

    public String name() {
        return StreamsMetricsImpl.GROUP_PREFIX_WO_DELIMITER;
    }

    public List<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ConsumerPartitionAssignor.RebalanceProtocol.EAGER);
        if (this.rebalanceProtocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            arrayList.add(this.rebalanceProtocol);
        }
        return arrayList;
    }

    public ByteBuffer subscriptionUserData(Set<String> set) {
        handleRebalanceStart(set);
        this.uniqueField = (byte) (this.uniqueField + 1);
        Set<String> namedTopologiesView = this.taskManager.topologyMetadata().namedTopologiesView();
        return new SubscriptionInfo(this.usedSubscriptionMetadataVersion, 11, this.taskManager.processId(), this.userEndPoint, this.taskManager.topologyMetadata().hasNamedTopologies() ? Utils.filterMap(this.taskManager.getTaskOffsetSums(), entry -> {
            return namedTopologiesView.contains(((TaskId) entry.getKey()).topologyName());
        }) : this.taskManager.getTaskOffsetSums(), this.uniqueField, this.assignmentErrorCode.get(), this.clientTags).encode();
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> errorAssignment(Map<UUID, ClientMetadata> map, int i) {
        HashMap hashMap = new HashMap();
        Iterator<ClientMetadata> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().consumers.iterator();
            while (it2.hasNext()) {
                hashMap.put((String) it2.next(), new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(11, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), i).encode()));
            }
        }
        return hashMap;
    }

    public ConsumerPartitionAssignor.GroupAssignment assign(Cluster cluster, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
        UUID processId;
        Map groupSubscription2 = groupSubscription.groupSubscription();
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        HashMap hashMap2 = new HashMap();
        int i = 11;
        int i2 = 11;
        boolean z = false;
        boolean z2 = false;
        int i3 = -1;
        for (Map.Entry entry : groupSubscription2.entrySet()) {
            String str = (String) entry.getKey();
            ConsumerPartitionAssignor.Subscription subscription = (ConsumerPartitionAssignor.Subscription) entry.getValue();
            SubscriptionInfo decode = SubscriptionInfo.decode(subscription.userData());
            int version = decode.version();
            if (decode.errorCode() == AssignorError.SHUTDOWN_REQUESTED.code()) {
                z = true;
            }
            i = updateMinReceivedVersion(version, i);
            i2 = updateMinSupportedVersion(decode.latestSupportedVersion(), i2);
            if (version > 11) {
                i3 = version;
                processId = FUTURE_ID;
                if (!hashMap.containsKey(FUTURE_ID)) {
                    hashMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap()));
                }
            } else {
                processId = decode.processId();
            }
            hashMap2.computeIfAbsent(processId, uuid -> {
                return new HashMap();
            }).put(str, subscription.rackId());
            ClientMetadata clientMetadata = hashMap.get(processId);
            if (clientMetadata == null) {
                clientMetadata = new ClientMetadata(decode.processId(), decode.userEndPoint(), decode.clientTags());
                hashMap.put(decode.processId(), clientMetadata);
            }
            clientMetadata.addConsumer(str, subscription.ownedPartitions());
            int size = hashSet.size();
            hashSet.addAll(subscription.ownedPartitions());
            if (hashSet.size() < size + subscription.ownedPartitions().size()) {
                z2 = true;
            }
            clientMetadata.addPreviousTasksAndOffsetSums(str, decode.taskOffsetSums());
        }
        if (z2) {
            this.log.warn("The previous assignment contains a partition more than once. \t Mapping: {}", groupSubscription2);
        }
        try {
            boolean checkMetadataVersions = checkMetadataVersions(i, i2, i3);
            this.log.debug("Constructed client metadata {} from the member subscriptions.", hashMap);
            if (z) {
                return new ConsumerPartitionAssignor.GroupAssignment(errorAssignment(hashMap, AssignorError.SHUTDOWN_REQUESTED.code()));
            }
            RepartitionTopics prepareRepartitionTopics = prepareRepartitionTopics(cluster);
            Map<TopicPartition, PartitionInfo> map = prepareRepartitionTopics.topicPartitionsInfo();
            Cluster withPartitions = cluster.withPartitions(map);
            this.log.debug("Created repartition topics {} from the parsed topology.", map.values());
            Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyTopicsInfoMapExcluding = this.taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(prepareRepartitionTopics.topologiesWithMissingInputTopics());
            HashSet hashSet2 = new HashSet();
            HashMap hashMap3 = new HashMap();
            for (Map.Entry<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> entry2 : subtopologyTopicsInfoMapExcluding.entrySet()) {
                hashSet2.addAll(entry2.getValue().sourceTopics);
                hashMap3.put(entry2.getKey(), entry2.getValue().sourceTopics);
            }
            Map<TaskId, Set<TopicPartition>> partitionGroups = this.partitionGrouper.partitionGroups(hashMap3, withPartitions);
            HashSet hashSet3 = new HashSet();
            boolean assignTasksToClients = assignTasksToClients(withPartitions, hashSet2, subtopologyTopicsInfoMapExcluding, hashMap, partitionGroups, hashMap2, hashSet3);
            HashMap hashMap4 = new HashMap();
            HashMap hashMap5 = new HashMap();
            if (i >= 2) {
                populatePartitionsByHostMaps(hashMap4, hashMap5, partitionGroups, hashMap);
            }
            return new ConsumerPartitionAssignor.GroupAssignment(computeNewAssignment(hashSet3, hashMap, partitionGroups, hashMap4, hashMap5, hashSet, i, i2, checkMetadataVersions, assignTasksToClients));
        } catch (MissingSourceTopicException e) {
            this.log.error("Caught an error in the task assignment. Returning an error assignment.", e);
            return new ConsumerPartitionAssignor.GroupAssignment(errorAssignment(hashMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
        } catch (TaskAssignmentException e2) {
            this.log.error("Caught an error in the task assignment. Returning an error assignment.", e2);
            return new ConsumerPartitionAssignor.GroupAssignment(errorAssignment(hashMap, AssignorError.ASSIGNMENT_ERROR.code()));
        }
    }

    private boolean checkMetadataVersions(int i, int i2, int i3) {
        boolean z;
        if (i3 == -1) {
            z = false;
        } else {
            if (i < 3) {
                throw new TaskAssignmentException("Received a future (version probing) subscription (version: " + i3 + ") and an incompatible pre Kafka 2.0 subscription (version: " + i + ") at the same time.");
            }
            z = true;
            this.log.info("Received a future (version probing) subscription (version: {}). Sending assignment back (with supported version {}).", Integer.valueOf(i3), Integer.valueOf(i2));
        }
        if (i < 11) {
            this.log.info("Downgrade metadata to version {}. Latest supported version is {}.", Integer.valueOf(i), 11);
        }
        if (i2 < 11) {
            this.log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.", Integer.valueOf(i2), 11);
        }
        return z;
    }

    private RepartitionTopics prepareRepartitionTopics(Cluster cluster) {
        RepartitionTopics repartitionTopics = new RepartitionTopics(this.taskManager.topologyMetadata(), this.internalTopicManager, this.copartitionedTopicsEnforcer, cluster, this.logPrefix);
        repartitionTopics.setup();
        if (!repartitionTopics.missingSourceTopicExceptions().isEmpty()) {
            if (!this.taskManager.topologyMetadata().hasNamedTopologies()) {
                throw new MissingSourceTopicException("Missing source topics.");
            }
            this.nonFatalExceptionsToHandle.addAll(repartitionTopics.missingSourceTopicExceptions());
        }
        return repartitionTopics;
    }

    private void populateTasksForMaps(Map<TopicPartition, TaskId> map, Map<TopologyMetadata.Subtopology, Set<TaskId>> map2, Set<String> set, Map<TaskId, Set<TopicPartition>> map3, Cluster cluster) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : map3.entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            for (TopicPartition topicPartition : value) {
                map.put(topicPartition, key);
                if (hashSet.contains(topicPartition)) {
                    this.log.warn("Partition {} is assigned to more than one tasks: {}", topicPartition, map3);
                }
            }
            hashSet.addAll(value);
            map2.computeIfAbsent(new TopologyMetadata.Subtopology(key.subtopology(), key.topologyName()), subtopology -> {
                return new HashSet();
            }).add(key);
        }
        checkAllPartitions(set, map3, hashSet, cluster);
    }

    private void checkAllPartitions(Set<String> set, Map<TaskId, Set<TopicPartition>> map, Set<TopicPartition> set2, Cluster cluster) {
        for (String str : set) {
            List<PartitionInfo> partitionsForTopic = cluster.partitionsForTopic(str);
            if (partitionsForTopic.isEmpty()) {
                this.log.warn("No partitions found for topic {}", str);
            } else {
                for (PartitionInfo partitionInfo : partitionsForTopic) {
                    TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    if (!set2.contains(topicPartition)) {
                        this.log.warn("Partition {} is not assigned to any tasks: {} Possible causes of a partition not getting assigned is that another topic defined in the topology has not been created when starting your streams application, resulting in no tasks created for this topology at all.", topicPartition, map);
                    }
                }
            }
        }
    }

    private boolean assignTasksToClients(Cluster cluster, Set<String> set, Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> map, Map<UUID, ClientMetadata> map2, Map<TaskId, Set<TopicPartition>> map3, Map<UUID, Map<String, Optional<String>>> map4, Set<TaskId> set2) {
        if (!set2.isEmpty()) {
            throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        populateTasksForMaps(hashMap, hashMap2, set, map3, cluster);
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, map, hashMap2, this.logPrefix);
        changelogTopics.setup();
        HashMap hashMap3 = new HashMap();
        boolean populateClientStatesMap = populateClientStatesMap(hashMap3, map2, hashMap, changelogTopics);
        this.log.info("{} client nodes and {} consumers participating in this rebalance: \n{}.", new Object[]{Integer.valueOf(hashMap3.size()), hashMap3.values().stream().map((v0) -> {
            return v0.capacity();
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }), hashMap3.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(entry -> {
            return entry.getKey() + ": " + ((ClientState) entry.getValue()).consumers();
        }).collect(Collectors.joining(Utils.NL))});
        Set<TaskId> keySet = map3.keySet();
        set2.addAll(changelogTopics.statefulTaskIds());
        this.log.info("Assigning stateful tasks: {}\nand stateless tasks: {}", set2, keySet.stream().filter(taskId -> {
            return !set2.contains(taskId);
        }).collect(Collectors.toSet()));
        this.log.debug("Assigning tasks and {} standby replicas to client nodes {}", Integer.valueOf(numStandbyReplicas()), hashMap3);
        boolean assign = createTaskAssignor(populateClientStatesMap).assign(hashMap3, keySet, set2, new RackAwareTaskAssignor(cluster, map3, changelogTopics.changelogPartionsForTask(), hashMap2, map4, this.internalTopicManager, this.assignmentConfigs, this.time), this.assignmentConfigs);
        this.log.info("Assigned {} total tasks including {} stateful tasks to {} client nodes.", new Object[]{Integer.valueOf(keySet.size()), Integer.valueOf(set2.size()), Integer.valueOf(hashMap3.size())});
        this.log.info("Assignment of tasks to nodes: {}", hashMap3.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(entry2 -> {
            return entry2.getKey() + "=" + ((ClientState) entry2.getValue()).currentAssignment();
        }).collect(Collectors.joining(Utils.NL)));
        return assign;
    }

    private TaskAssignor createTaskAssignor(boolean z) {
        TaskAssignor taskAssignor = this.taskAssignorSupplier.get();
        if (!(taskAssignor instanceof StickyTaskAssignor) && !z) {
            this.log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and trigger another rebalance to retry.");
            return new FallbackPriorTaskAssignor();
        }
        return taskAssignor;
    }

    private boolean populateClientStatesMap(Map<UUID, ClientState> map, Map<UUID, ClientMetadata> map2, Map<TopicPartition, TaskId> map3, ChangelogTopics changelogTopics) {
        Map<TaskId, Long> map4;
        boolean z;
        try {
            map4 = computeEndOffsetSumsByTask(ClientUtils.getEndOffsets(ClientUtils.fetchEndOffsetsResult(changelogTopics.preExistingNonSourceTopicBasedPartitions(), this.adminClient), changelogTopics.preExistingNonSourceTopicBasedPartitions()), ClientUtils.fetchCommittedOffsets(changelogTopics.preExistingSourceTopicBasedPartitions(), this.mainConsumerSupplier.get()), changelogTopics);
            z = true;
        } catch (StreamsException | TimeoutException e) {
            this.log.info("Failed to retrieve all end offsets for changelogs, and hence could not calculate the per-task lag; this is not a fatal error but would cause the assignor to fallback to a naive algorithm", e);
            map4 = (Map) changelogTopics.statefulTaskIds().stream().collect(Collectors.toMap(taskId -> {
                return taskId;
            }, taskId2 -> {
                return -3L;
            }));
            z = false;
        }
        for (Map.Entry<UUID, ClientMetadata> entry : map2.entrySet()) {
            UUID key = entry.getKey();
            ClientState clientState = entry.getValue().state;
            clientState.initializePrevTasks(map3, this.taskManager.topologyMetadata().hasNamedTopologies());
            clientState.computeTaskLags(key, map4);
            map.put(key, clientState);
        }
        return z;
    }

    private Map<TaskId, Long> computeEndOffsetSumsByTask(Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> map, Map<TopicPartition, Long> map2, ChangelogTopics changelogTopics) {
        long offset;
        HashMap hashMap = new HashMap();
        for (TaskId taskId : changelogTopics.statefulTaskIds()) {
            hashMap.put(taskId, 0L);
            Iterator<TopicPartition> it = changelogTopics.preExistingPartitionsFor(taskId).iterator();
            while (true) {
                if (it.hasNext()) {
                    TopicPartition next = it.next();
                    if (map2.containsKey(next)) {
                        offset = map2.get(next).longValue();
                    } else {
                        if (!map.containsKey(next)) {
                            this.log.debug("Fetched offsets did not contain the changelog {} of task {}", next, taskId);
                            throw new IllegalStateException("Could not get end offset for " + next);
                        }
                        offset = map.get(next).offset();
                    }
                    long longValue = ((Long) hashMap.get(taskId)).longValue() + offset;
                    if (longValue < 0) {
                        hashMap.put(taskId, Long.MAX_VALUE);
                        break;
                    }
                    hashMap.put(taskId, Long.valueOf(longValue));
                }
            }
        }
        return hashMap;
    }

    private void populatePartitionsByHostMaps(Map<HostInfo, Set<TopicPartition>> map, Map<HostInfo, Set<TopicPartition>> map2, Map<TaskId, Set<TopicPartition>> map3, Map<UUID, ClientMetadata> map4) {
        for (Map.Entry<UUID, ClientMetadata> entry : map4.entrySet()) {
            HostInfo hostInfo = entry.getValue().hostInfo;
            if (hostInfo != null) {
                HashSet hashSet = new HashSet();
                HashSet hashSet2 = new HashSet();
                ClientState clientState = entry.getValue().state;
                Iterator<TaskId> it = clientState.activeTasks().iterator();
                while (it.hasNext()) {
                    hashSet.addAll(map3.get(it.next()));
                }
                Iterator<TaskId> it2 = clientState.standbyTasks().iterator();
                while (it2.hasNext()) {
                    hashSet2.addAll(map3.get(it2.next()));
                }
                map.put(hostInfo, hashSet);
                map2.put(hostInfo, hashSet2);
            }
        }
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> computeNewAssignment(Set<TaskId> set, Map<UUID, ClientMetadata> map, Map<TaskId, Set<TopicPartition>> map2, Map<HostInfo, Set<TopicPartition>> map3, Map<HostInfo, Set<TopicPartition>> map4, Set<TopicPartition> set2, int i, int i2, boolean z, boolean z2) {
        boolean z3 = z2 || z;
        HashMap hashMap = new HashMap();
        for (Map.Entry<UUID, ClientMetadata> entry : map.entrySet()) {
            UUID key = entry.getKey();
            ClientMetadata value = entry.getValue();
            ClientState clientState = value.state;
            SortedSet sortedSet = value.consumers;
            HashMap hashMap2 = new HashMap();
            Map<String, List<TaskId>> assignTasksToThreads = assignTasksToThreads(clientState.statefulActiveTasks(), true, sortedSet, clientState, hashMap2);
            Map<String, List<TaskId>> assignTasksToThreads2 = assignTasksToThreads(clientState.standbyTasks(), true, sortedSet, clientState, hashMap2);
            for (Map.Entry<String, List<TaskId>> entry2 : assignTasksToThreads(clientState.statelessActiveTasks(), false, sortedSet, clientState, hashMap2).entrySet()) {
                assignTasksToThreads.get(entry2.getKey()).addAll(entry2.getValue());
            }
            boolean z4 = z2 && key.equals(this.taskManager.processId());
            if (addClientAssignments(set, hashMap, value, map2, map3, map4, set2, assignTasksToThreads, assignTasksToThreads2, i, i2, z4) || z4) {
                z3 = true;
                this.log.debug("Requested client {} to schedule a followup rebalance", key);
            }
            this.log.info("Client {} per-consumer assignment:\n\tprev owned active {}\n\tprev owned standby {}\n\tassigned active {}\n\trevoking active {}\n\tassigned standby {}\n", new Object[]{key, value.state.prevOwnedActiveTasksByConsumer(), value.state.prevOwnedStandbyByConsumer(), value.state.assignedActiveTasksByConsumer(), value.state.revokingActiveTasksByConsumer(), value.state.assignedStandbyTasksByConsumer()});
        }
        if (z3) {
            this.assignmentListener.onAssignmentComplete(false);
            this.log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
        } else {
            this.assignmentListener.onAssignmentComplete(true);
            this.log.info("Finished stable assignment of tasks, no followup rebalances required.");
        }
        return hashMap;
    }

    private boolean addClientAssignments(Set<TaskId> set, Map<String, ConsumerPartitionAssignor.Assignment> map, ClientMetadata clientMetadata, Map<TaskId, Set<TopicPartition>> map2, Map<HostInfo, Set<TopicPartition>> map3, Map<HostInfo, Set<TopicPartition>> map4, Set<TopicPartition> set2, Map<String, List<TaskId>> map5, Map<String, List<TaskId>> map6, int i, int i2, boolean z) {
        boolean z2 = false;
        boolean z3 = z;
        for (String str : clientMetadata.consumers) {
            List<TaskId> list = map5.get(str);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Set<TaskId> populateActiveTaskAndPartitionsLists = populateActiveTaskAndPartitionsLists(arrayList, arrayList2, str, clientMetadata.state, list, map2, set2);
            AssignmentInfo assignmentInfo = new AssignmentInfo(i, i2, arrayList2, buildStandbyTaskMap(str, map6.get(str), populateActiveTaskAndPartitionsLists, set, map2, clientMetadata.state), map3, map4, AssignorError.NONE.code());
            if (!populateActiveTaskAndPartitionsLists.isEmpty()) {
                this.log.info("Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership.", str);
                assignmentInfo.setNextRebalanceTime(0L);
                z2 = true;
                z3 = false;
            } else if (z3) {
                long milliseconds = this.time.milliseconds() + probingRebalanceIntervalMs();
                this.log.info("Requesting followup rebalance be scheduled by {} for {} to probe for caught-up replica tasks.", str, Utils.toLogDateTimeFormat(milliseconds));
                assignmentInfo.setNextRebalanceTime(milliseconds);
                z3 = false;
            }
            map.put(str, new ConsumerPartitionAssignor.Assignment(arrayList, assignmentInfo.encode()));
        }
        return z2;
    }

    private Set<TaskId> populateActiveTaskAndPartitionsLists(List<TopicPartition> list, List<TaskId> list2, String str, ClientState clientState, List<TaskId> list3, Map<TaskId, Set<TopicPartition>> map, Set<TopicPartition> set) {
        ArrayList<AssignedPartition> arrayList = new ArrayList();
        TreeSet treeSet = new TreeSet();
        for (TaskId taskId : list3) {
            clientState.assignActiveToConsumer(taskId, str);
            ArrayList arrayList2 = new ArrayList();
            Iterator<TopicPartition> it = map.get(taskId).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TopicPartition next = it.next();
                String previousOwnerForPartition = clientState.previousOwnerForPartition(next);
                if ((previousOwnerForPartition == null || !previousOwnerForPartition.equals(str)) && set.contains(next)) {
                    this.log.info("Removing task {} from {} active assignment until it is safely revoked in followup rebalance", taskId, str);
                    treeSet.add(taskId);
                    clientState.revokeActiveFromConsumer(taskId, str);
                    arrayList2.clear();
                    clientState.unassignActive(taskId);
                    break;
                }
                arrayList2.add(new AssignedPartition(taskId, next));
            }
            arrayList.addAll(arrayList2);
        }
        Collections.sort(arrayList);
        for (AssignedPartition assignedPartition : arrayList) {
            list2.add(assignedPartition.taskId);
            list.add(assignedPartition.partition);
        }
        return treeSet;
    }

    private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(String str, Iterable<TaskId> iterable, Iterable<TaskId> iterable2, Set<TaskId> set, Map<TaskId, Set<TopicPartition>> map, ClientState clientState) {
        HashMap hashMap = new HashMap();
        for (TaskId taskId : iterable) {
            clientState.assignStandbyToConsumer(taskId, str);
            hashMap.put(taskId, map.get(taskId));
        }
        for (TaskId taskId2 : iterable2) {
            if (clientState.previouslyOwnedStandby(taskId2) && set.contains(taskId2)) {
                this.log.info("Adding removed stateful active task {} as a standby for {} until it is revoked and can be transitioned to active in a followup rebalance", taskId2, str);
                clientState.assignStandbyToConsumer(taskId2, str);
                clientState.assignStandby(taskId2);
                hashMap.put(taskId2, map.get(taskId2));
            }
        }
        return hashMap;
    }

    static Map<String, List<TaskId>> assignTasksToThreads(Collection<TaskId> collection, boolean z, SortedSet<String> sortedSet, ClientState clientState, Map<String, Integer> map) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = sortedSet.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new ArrayList());
        }
        int floor = (int) Math.floor(map.values().stream().reduce(Integer.valueOf(collection.size()), (v0, v1) -> {
            return Integer.sum(v0, v1);
        }).intValue() / sortedSet.size());
        PriorityQueue priorityQueue = new PriorityQueue(collection);
        LinkedList linkedList = new LinkedList();
        TreeMap treeMap = new TreeMap();
        if (!priorityQueue.isEmpty()) {
            for (String str : sortedSet) {
                List list = (List) hashMap.get(str);
                int intValue = floor - map.getOrDefault(str, 0).intValue();
                if (z) {
                    for (TaskId taskId : clientState.prevTasksByLag(str)) {
                        if (priorityQueue.contains(taskId)) {
                            if (list.size() < intValue) {
                                list.add(taskId);
                                priorityQueue.remove(taskId);
                            } else {
                                treeMap.put(taskId, str);
                            }
                        }
                    }
                }
                if (list.size() < intValue) {
                    linkedList.offer(str);
                }
            }
            while (!linkedList.isEmpty()) {
                TaskId taskId2 = (TaskId) priorityQueue.poll();
                if (taskId2 == null) {
                    throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                }
                String str2 = (String) linkedList.poll();
                List list2 = (List) hashMap.get(str2);
                list2.add(taskId2);
                if (list2.size() + map.getOrDefault(str2, 0).intValue() < floor) {
                    linkedList.offer(str2);
                }
            }
            if (!priorityQueue.isEmpty()) {
                for (String str3 : sortedSet) {
                    if (((List) hashMap.get(str3)).size() + map.getOrDefault(str3, 0).intValue() == floor) {
                        linkedList.add(str3);
                    }
                }
                for (Map.Entry entry : treeMap.entrySet()) {
                    TaskId taskId3 = (TaskId) entry.getKey();
                    String str4 = (String) entry.getValue();
                    if (linkedList.contains(str4) && priorityQueue.contains(taskId3)) {
                        ((List) hashMap.get(str4)).add(taskId3);
                        priorityQueue.remove(taskId3);
                        linkedList.remove(str4);
                    }
                }
                Iterator it2 = priorityQueue.iterator();
                while (it2.hasNext()) {
                    ((List) hashMap.get((String) linkedList.poll())).add((TaskId) it2.next());
                }
            }
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            String str5 = (String) entry2.getKey();
            map.put(str5, Integer.valueOf(map.getOrDefault(str5, 0).intValue() + ((List) entry2.getValue()).size()));
        }
        return hashMap;
    }

    private void validateMetadataVersions(int i, int i2) {
        if (i > this.usedSubscriptionMetadataVersion) {
            this.log.error("Leader sent back an assignment with version {} which was greater than our used version {}", Integer.valueOf(i), Integer.valueOf(this.usedSubscriptionMetadataVersion));
            throw new TaskAssignmentException("Sent a version " + this.usedSubscriptionMetadataVersion + " subscription but got an assignment with higher version " + i + ".");
        }
        if (i2 > 11) {
            this.log.error("Leader sent back assignment with commonly supported version {} that is greater than our actual latest supported version {}", Integer.valueOf(i2), 11);
            throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
        }
    }

    protected boolean maybeUpdateSubscriptionVersion(int i, int i2) {
        if (i < 3) {
            this.log.debug("Received an assignment version {} that is less than the earliest version that allows version probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.", Integer.valueOf(i), 3);
            return false;
        }
        if (i2 > this.usedSubscriptionMetadataVersion) {
            this.log.info("Sent a version {} subscription and group's latest commonly supported version is {} (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to {} for next rebalance.", new Object[]{Integer.valueOf(this.usedSubscriptionMetadataVersion), Integer.valueOf(i2), Integer.valueOf(i2)});
            this.usedSubscriptionMetadataVersion = i2;
            return true;
        }
        if (i >= this.usedSubscriptionMetadataVersion) {
            return false;
        }
        this.log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.", new Object[]{Integer.valueOf(this.usedSubscriptionMetadataVersion), Integer.valueOf(i), Integer.valueOf(i2)});
        this.usedSubscriptionMetadataVersion = i2;
        return true;
    }

    public void onAssignment(ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata consumerGroupMetadata) {
        Map<TaskId, Set<TopicPartition>> activeTasks;
        Map<HostInfo, Set<TopicPartition>> partitionsByHost;
        Map<HostInfo, Set<TopicPartition>> standbyPartitionByHost;
        Map<TopicPartition, PartitionInfo> topicPartitionInfo;
        long nextRebalanceMs;
        ArrayList arrayList = new ArrayList(assignment.partitions());
        arrayList.sort(PARTITION_COMPARATOR);
        AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
        if (decode.errCode() != AssignorError.NONE.code()) {
            this.assignmentErrorCode.set(decode.errCode());
            return;
        }
        int version = decode.version();
        int commonlySupportedVersion = decode.commonlySupportedVersion();
        validateMetadataVersions(version, commonlySupportedVersion);
        switch (version) {
            case 1:
                validateActiveTaskEncoding(arrayList, decode, this.logPrefix);
                activeTasks = getActiveTasks(arrayList, decode);
                partitionsByHost = Collections.emptyMap();
                standbyPartitionByHost = Collections.emptyMap();
                topicPartitionInfo = Collections.emptyMap();
                nextRebalanceMs = Long.MAX_VALUE;
                break;
            case 2:
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
            case 4:
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                validateActiveTaskEncoding(arrayList, decode, this.logPrefix);
                activeTasks = getActiveTasks(arrayList, decode);
                partitionsByHost = decode.partitionsByHost();
                standbyPartitionByHost = Collections.emptyMap();
                topicPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                nextRebalanceMs = Long.MAX_VALUE;
                break;
            case SubscriptionInfoData.TaskId.HIGHEST_SUPPORTED_VERSION /* 6 */:
                validateActiveTaskEncoding(arrayList, decode, this.logPrefix);
                activeTasks = getActiveTasks(arrayList, decode);
                partitionsByHost = decode.partitionsByHost();
                standbyPartitionByHost = decode.standbyPartitionByHost();
                topicPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                nextRebalanceMs = Long.MAX_VALUE;
                break;
            case 7:
            case StateSerdes.TIMESTAMP_SIZE /* 8 */:
            case SubscriptionInfoData.PartitionToOffsetSum.HIGHEST_SUPPORTED_VERSION /* 9 */:
            case StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION /* 10 */:
            case 11:
                validateActiveTaskEncoding(arrayList, decode, this.logPrefix);
                activeTasks = getActiveTasks(arrayList, decode);
                partitionsByHost = decode.partitionsByHost();
                standbyPartitionByHost = decode.standbyPartitionByHost();
                topicPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                nextRebalanceMs = decode.nextRebalanceMs();
                break;
            default:
                throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
        }
        maybeScheduleFollowupRebalance(nextRebalanceMs, version, commonlySupportedVersion, partitionsByHost.keySet());
        this.streamsMetadataState.onChange(partitionsByHost, standbyPartitionByHost, topicPartitionInfo);
        this.taskManager.handleAssignment(activeTasks, decode.standbyTasks());
    }

    private void maybeScheduleFollowupRebalance(long j, int i, int i2, Set<HostInfo> set) {
        if (maybeUpdateSubscriptionVersion(i, i2)) {
            this.log.info("Requested to schedule immediate rebalance due to version probing.");
            this.nextScheduledRebalanceMs.set(0L);
            return;
        }
        if (!verifyHostInfo(set)) {
            this.log.info("Requested to schedule immediate rebalance to update group with new host endpoint = {}.", this.userEndPoint);
            this.nextScheduledRebalanceMs.set(0L);
        } else if (j == 0) {
            this.log.info("Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
            this.nextScheduledRebalanceMs.set(0L);
        } else if (j < Long.MAX_VALUE) {
            this.log.info("Requested to schedule next probing rebalance at {} to try for a more balanced assignment.", Utils.toLogDateTimeFormat(j));
            this.nextScheduledRebalanceMs.set(j);
        } else {
            this.log.info("No followup rebalance was requested, resetting the rebalance schedule.");
            this.nextScheduledRebalanceMs.set(Long.MAX_VALUE);
        }
    }

    private boolean verifyHostInfo(Set<HostInfo> set) {
        if (this.userEndPoint == null || set.isEmpty()) {
            return true;
        }
        return set.contains(HostInfo.buildFromEndpoint(this.userEndPoint));
    }

    protected static Map<TaskId, Set<TopicPartition>> getActiveTasks(List<TopicPartition> list, AssignmentInfo assignmentInfo) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < list.size(); i++) {
            ((Set) hashMap.computeIfAbsent(assignmentInfo.activeTasks().get(i), taskId -> {
                return new HashSet();
            })).add(list.get(i));
        }
        return hashMap;
    }

    static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(Map<HostInfo, Set<TopicPartition>> map) {
        HashMap hashMap = new HashMap();
        Iterator<Set<TopicPartition>> it = map.values().iterator();
        while (it.hasNext()) {
            for (TopicPartition topicPartition : it.next()) {
                hashMap.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), (Node) null, new Node[0], new Node[0]));
            }
        }
        return hashMap;
    }

    private static void validateActiveTaskEncoding(List<TopicPartition> list, AssignmentInfo assignmentInfo, String str) {
        if (list.size() != assignmentInfo.activeTasks().size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", str, Integer.valueOf(list.size()), Integer.valueOf(assignmentInfo.activeTasks().size()), assignmentInfo));
        }
    }

    private int updateMinReceivedVersion(int i, int i2) {
        return Math.min(i, i2);
    }

    private int updateMinSupportedVersion(int i, int i2) {
        if (i < i2) {
            this.log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}", Integer.valueOf(i2), Integer.valueOf(i));
            return i;
        }
        this.log.debug("Current minimum supported version remains at {}, last seen supported version was {}", Integer.valueOf(i2), Integer.valueOf(i));
        return i2;
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol() {
        return this.rebalanceProtocol;
    }

    protected String userEndPoint() {
        return this.userEndPoint;
    }

    protected TaskManager taskManager() {
        return this.taskManager;
    }

    protected byte uniqueField() {
        return this.uniqueField;
    }

    protected Map<String, String> clientTags() {
        return this.clientTags;
    }

    protected void handleRebalanceStart(Set<String> set) {
        this.taskManager.handleRebalanceStart(set);
    }

    long acceptableRecoveryLag() {
        return this.assignmentConfigs.acceptableRecoveryLag;
    }

    int maxWarmupReplicas() {
        return this.assignmentConfigs.maxWarmupReplicas;
    }

    int numStandbyReplicas() {
        return this.assignmentConfigs.numStandbyReplicas;
    }

    long probingRebalanceIntervalMs() {
        return this.assignmentConfigs.probingRebalanceIntervalMs;
    }
}
