package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManager.class */
public class TaskManager {
    private final Logger log;
    private final Time time;
    private final TasksRegistry tasks;
    private final UUID processId;
    private final String logPrefix;
    private final Admin adminClient;
    private final StateDirectory stateDirectory;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final ChangelogReader changelogReader;
    private final TopologyMetadata topologyMetadata;
    private final TaskExecutor taskExecutor;
    private Consumer<byte[], byte[]> mainConsumer;
    private DeleteRecordsResult deleteRecordsResult;
    private boolean rebalanceInProgress = false;
    private final Set<TaskId> lockedTaskDirectories = new HashSet();
    private final ActiveTaskCreator activeTaskCreator;
    private final StandbyTaskCreator standbyTaskCreator;
    private final StateUpdater stateUpdater;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskManager(Time time, ChangelogReader changelogReader, UUID uuid, String str, ActiveTaskCreator activeTaskCreator, StandbyTaskCreator standbyTaskCreator, TasksRegistry tasksRegistry, TopologyMetadata topologyMetadata, Admin admin, StateDirectory stateDirectory, StateUpdater stateUpdater) {
        this.time = time;
        this.processId = uuid;
        this.logPrefix = str;
        this.adminClient = admin;
        this.stateDirectory = stateDirectory;
        this.changelogReader = changelogReader;
        this.topologyMetadata = topologyMetadata;
        this.activeTaskCreator = activeTaskCreator;
        this.standbyTaskCreator = standbyTaskCreator;
        this.processingMode = topologyMetadata.processingMode();
        LogContext logContext = new LogContext(str);
        this.log = logContext.logger(getClass());
        this.stateUpdater = stateUpdater;
        this.tasks = tasksRegistry;
        this.taskExecutor = new TaskExecutor(this.tasks, this, topologyMetadata.taskExecutionMetadata(), logContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMainConsumer(Consumer<byte[], byte[]> consumer) {
        this.mainConsumer = consumer;
    }

    public double totalProducerBlockedTime() {
        return this.activeTaskCreator.totalProducerBlockedTime();
    }

    public UUID processId() {
        return this.processId;
    }

    public TopologyMetadata topologyMetadata() {
        return this.topologyMetadata;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerGroupMetadata consumerGroupMetadata() {
        return this.mainConsumer.groupMetadata();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void consumerCommitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        this.mainConsumer.commitSync(map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamsProducer streamsProducerForTask(TaskId taskId) {
        return this.activeTaskCreator.streamsProducerForTask(taskId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamsProducer threadProducer() {
        return this.activeTaskCreator.threadProducer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean rebalanceInProgress() {
        return this.rebalanceInProgress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRebalanceStart(Set<String> set) {
        this.topologyMetadata.addSubscribedTopicsFromMetadata(set, this.logPrefix);
        tryToLockAllNonEmptyTaskDirectories();
        this.rebalanceInProgress = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRebalanceComplete() {
        if (this.stateUpdater == null) {
            this.mainConsumer.pause(this.mainConsumer.assignment());
        } else {
            Set set = (Set) this.tasks.allTasks().stream().flatMap(task -> {
                return task.inputPartitions().stream();
            }).collect(Collectors.toSet());
            HashSet hashSet = new HashSet(this.mainConsumer.assignment());
            hashSet.removeAll(set);
            this.mainConsumer.pause(hashSet);
        }
        releaseLockedUnassignedTaskDirectories();
        this.rebalanceInProgress = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean handleCorruption(Set<TaskId> set) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Iterator<TaskId> it = set.iterator();
        while (it.hasNext()) {
            Task task = this.tasks.task(it.next());
            if (task.isActive()) {
                hashSet.add(task);
            } else {
                hashSet2.add(task);
            }
        }
        closeDirtyAndRevive(hashSet2, true);
        try {
            commitTasksAndMaybeUpdateCommittableOffsets((Collection) allTasks().values().stream().filter(task2 -> {
                return task2.state() == Task.State.RUNNING || task2.state() == Task.State.RESTORING;
            }).filter(task3 -> {
                return !set.contains(task3.id());
            }).collect(Collectors.toSet()), new HashMap<>());
        } catch (TimeoutException e) {
            this.log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived");
            Collection<Task> hashSet3 = new HashSet<>(this.tasks.activeTasks());
            hashSet3.removeAll(hashSet);
            closeDirtyAndRevive(hashSet3, false);
        } catch (TaskCorruptedException e2) {
            this.log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the tasks to clean and revive: {}", e2.corruptedTasks());
            hashSet.addAll(this.tasks.tasks(e2.corruptedTasks()));
        }
        closeDirtyAndRevive(hashSet, true);
        return !hashSet.isEmpty();
    }

    private void closeDirtyAndRevive(Collection<Task> collection, boolean z) {
        for (Task task : collection) {
            if (task.state() != Task.State.CLOSED) {
                Set<TopicPartition> changelogPartitions = task.changelogPartitions();
                if (z && this.stateUpdater == null) {
                    task.markChangelogAsCorrupted(changelogPartitions);
                }
                try {
                    task.prepareCommit();
                } catch (RuntimeException e) {
                    this.log.error("Error flushing cache for corrupted task {} ", task.id(), e);
                }
                try {
                    task.suspend();
                    if (z) {
                        task.postCommit(true);
                    }
                } catch (RuntimeException e2) {
                    this.log.error("Error suspending corrupted task {} ", task.id(), e2);
                }
                task.closeDirty();
            }
            if (task.isActive()) {
                Set assignment = this.mainConsumer.assignment();
                Set<TopicPartition> inputPartitions = task.inputPartitions();
                Set<TopicPartition> intersection = Utils.intersection(HashSet::new, assignment, new Set[]{inputPartitions});
                if (!intersection.equals(inputPartitions)) {
                    this.log.warn("Expected the current consumer assignment {} to contain the input partitions {}. Will proceed to recover.", assignment, inputPartitions);
                }
                task.addPartitionsForOffsetReset(intersection);
            }
            if (this.stateUpdater != null) {
                this.tasks.removeTask(task);
            }
            task.revive();
            if (this.stateUpdater != null) {
                this.tasks.addPendingTasksToInit(Collections.singleton(task));
            }
        }
    }

    public void handleAssignment(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
        this.log.info("Handle new assignment with:\n\tNew active tasks: {}\n\tNew standby tasks: {}\n\tExisting active tasks: {}\n\tExisting standby tasks: {}", new Object[]{map.keySet(), map2.keySet(), activeTaskIds(), standbyTaskIds()});
        this.topologyMetadata.addSubscribedTopicsFromAssignment((Set) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet()), this.logPrefix);
        HashMap hashMap = new HashMap(map);
        HashMap hashMap2 = new HashMap(map2);
        HashMap hashMap3 = new HashMap();
        TreeSet treeSet = new TreeSet(Comparator.comparing((v0) -> {
            return v0.id();
        }));
        this.tasks.clearPendingTasksToCreate();
        this.tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(hashMap));
        this.tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(hashMap2));
        if (this.stateUpdater == null) {
            handleTasksWithoutStateUpdater(hashMap, hashMap2, hashMap3, treeSet);
        } else {
            handleTasksWithStateUpdater(hashMap, hashMap2, hashMap3, treeSet);
        }
        maybeThrowTaskExceptions(closeAndRecycleTasks(hashMap3, treeSet));
        createNewTasks(hashMap, hashMap2);
    }

    private void maybeThrowTaskExceptions(Map<TaskId, RuntimeException> map) {
        if (map.isEmpty()) {
            return;
        }
        this.log.error("Get exceptions for the following tasks: {}", map);
        HashSet hashSet = new HashSet();
        KafkaException kafkaException = null;
        KafkaException kafkaException2 = null;
        for (Map.Entry<TaskId, RuntimeException> entry : map.entrySet()) {
            TaskId key = entry.getKey();
            KafkaException kafkaException3 = (RuntimeException) entry.getValue();
            if (!(kafkaException3 instanceof StreamsException)) {
                kafkaException = kafkaException3 instanceof KafkaException ? new StreamsException((Throwable) kafkaException3, key) : new StreamsException("Encounter unexpected fatal error for task " + key, kafkaException3, key);
            } else if (kafkaException3 instanceof TaskMigratedException) {
                kafkaException2 = (TaskMigratedException) kafkaException3;
            } else if (kafkaException3 instanceof TaskCorruptedException) {
                this.log.warn("Encounter corrupted task" + key + ", will group it with other corrupted tasks and handle together", kafkaException3);
                hashSet.add(key);
            } else {
                ((StreamsException) kafkaException3).setTaskId(key);
                kafkaException = (StreamsException) kafkaException3;
            }
        }
        if (kafkaException != null) {
            throw kafkaException;
        }
        if (kafkaException2 == null) {
            throw new TaskCorruptedException(hashSet);
        }
        throw kafkaException2;
    }

    private void createNewTasks(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
        Collection<Task> createTasks = this.activeTaskCreator.createTasks(this.mainConsumer, map);
        Collection<Task> createTasks2 = this.standbyTaskCreator.createTasks(map2);
        if (this.stateUpdater == null) {
            this.tasks.addActiveTasks(createTasks);
            this.tasks.addStandbyTasks(createTasks2);
        } else {
            this.tasks.addPendingTasksToInit(createTasks);
            this.tasks.addPendingTasksToInit(createTasks2);
        }
    }

    private void handleTasksWithoutStateUpdater(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2, Map<Task, Set<TopicPartition>> map3, Set<Task> set) {
        for (Task task : this.tasks.allTasks()) {
            TaskId id = task.id();
            if (map.containsKey(id)) {
                if (task.isActive()) {
                    Set<TopicPartition> set2 = map.get(id);
                    if (this.tasks.updateActiveTaskInputPartitions(task, set2)) {
                        task.updateInputPartitions(set2, this.topologyMetadata.nodeToSourceTopics(task.id()));
                    }
                    task.resume();
                } else {
                    map3.put(task, map.get(id));
                }
                map.remove(id);
            } else if (map2.containsKey(id)) {
                if (task.isActive()) {
                    map3.put(task, map2.get(id));
                } else {
                    updateInputPartitionsOfStandbyTaskIfTheyChanged(task, map2.get(id));
                    task.resume();
                }
                map2.remove(id);
            } else {
                set.add(task);
            }
        }
    }

    private void updateInputPartitionsOfStandbyTaskIfTheyChanged(Task task, Set<TopicPartition> set) {
        if (task.inputPartitions().equals(set)) {
            return;
        }
        task.updateInputPartitions(set, this.topologyMetadata.nodeToSourceTopics(task.id()));
    }

    private void handleTasksWithStateUpdater(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2, Map<Task, Set<TopicPartition>> map3, Set<Task> set) {
        handleTasksPendingInitialization();
        handleRunningAndSuspendedTasks(map, map2, map3, set);
        handleTasksInStateUpdater(map, map2);
    }

    private void handleTasksPendingInitialization() {
        for (Task task : this.tasks.drainPendingTasksToInit()) {
            task.suspend();
            task.closeClean();
        }
    }

    private void handleRunningAndSuspendedTasks(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2, Map<Task, Set<TopicPartition>> map3, Set<Task> set) {
        for (Task task : this.tasks.allTasks()) {
            if (!task.isActive()) {
                throw new IllegalStateException("Standby tasks should only be managed by the state updater, but standby task " + task.id() + " is managed by the stream thread");
            }
            TaskId id = task.id();
            if (map.containsKey(id)) {
                handleReAssignedActiveTask(task, map.get(id));
                map.remove(id);
            } else if (map2.containsKey(id)) {
                map3.put(task, map2.get(id));
                map2.remove(id);
            } else {
                set.add(task);
            }
        }
    }

    private void handleReAssignedActiveTask(Task task, Set<TopicPartition> set) {
        if (this.tasks.updateActiveTaskInputPartitions(task, set)) {
            task.updateInputPartitions(set, this.topologyMetadata.nodeToSourceTopics(task.id()));
        }
        if (task.state() == Task.State.SUSPENDED) {
            task.resume();
            moveTaskFromTasksRegistryToStateUpdater(task);
        }
    }

    private void moveTaskFromTasksRegistryToStateUpdater(Task task) {
        this.tasks.removeTask(task);
        this.stateUpdater.add(task);
    }

    private void handleTasksInStateUpdater(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
        for (Task task : this.stateUpdater.getTasks()) {
            TaskId id = task.id();
            if (map.containsKey(id)) {
                Set<TopicPartition> set = map.get(id);
                if (task.isActive()) {
                    updateInputPartitionsOrRemoveTaskFromTasksToSuspend(task, set);
                } else {
                    removeTaskToRecycleFromStateUpdater(id, set);
                }
                map.remove(id);
            } else if (map2.containsKey(id)) {
                if (task.isActive()) {
                    removeTaskToRecycleFromStateUpdater(id, map2.get(id));
                }
                map2.remove(id);
            } else {
                removeUnusedTaskFromStateUpdater(id);
            }
        }
    }

    private void updateInputPartitionsOrRemoveTaskFromTasksToSuspend(Task task, Set<TopicPartition> set) {
        TaskId id = task.id();
        if (task.inputPartitions().equals(set)) {
            this.tasks.removePendingActiveTaskToSuspend(id);
        } else {
            this.stateUpdater.remove(id);
            this.tasks.addPendingTaskToUpdateInputPartitions(id, set);
        }
    }

    private void removeTaskToRecycleFromStateUpdater(TaskId taskId, Set<TopicPartition> set) {
        this.stateUpdater.remove(taskId);
        this.tasks.addPendingTaskToRecycle(taskId, set);
    }

    private void removeUnusedTaskFromStateUpdater(TaskId taskId) {
        this.stateUpdater.remove(taskId);
        this.tasks.addPendingTaskToCloseClean(taskId);
    }

    private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(Map<TaskId, Set<TopicPartition>> map) {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, Set<TopicPartition>> next = it.next();
            TaskId key = next.getKey();
            if (key.topologyName() != null && !this.topologyMetadata.namedTopologiesView().contains(key.topologyName())) {
                this.log.info("Cannot create the assigned task {} since it's topology name cannot be recognized, will put it aside as pending for now and create later when topology metadata gets refreshed", key);
                hashMap.put(key, next.getValue());
                it.remove();
            }
        }
        return hashMap;
    }

    private Map<TaskId, RuntimeException> closeAndRecycleTasks(Map<Task, Set<TopicPartition>> map, Set<Task> set) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        TreeSet treeSet = new TreeSet(Comparator.comparing((v0) -> {
            return v0.id();
        }));
        ArrayList<Task> arrayList = new ArrayList(set);
        arrayList.addAll(map.keySet());
        for (Task task : arrayList) {
            try {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                if (!prepareCommit.isEmpty()) {
                    this.log.error("Task {} should have been committed when it was suspended, but it reports non-empty offsets {} to commit; this means it failed during last commit and hence should be closed dirty", task.id(), prepareCommit);
                    treeSet.add(task);
                } else if (!task.isActive()) {
                    task.suspend();
                    task.postCommit(true);
                }
            } catch (RuntimeException e) {
                this.log.error(String.format("Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:", task.id()), e);
                linkedHashMap.putIfAbsent(task.id(), e);
                treeSet.add(task);
            }
        }
        set.removeAll(treeSet);
        for (Task task2 : set) {
            try {
                closeTaskClean(task2);
            } catch (RuntimeException e2) {
                this.log.error(String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task2.id()), e2);
                if (task2.state() != Task.State.CLOSED) {
                    treeSet.add(task2);
                }
                linkedHashMap.putIfAbsent(task2.id(), e2);
            }
        }
        map.keySet().removeAll(treeSet);
        for (Map.Entry<Task, Set<TopicPartition>> entry : map.entrySet()) {
            Task key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            try {
                if (key.isActive()) {
                    this.tasks.replaceActiveWithStandby(convertActiveToStandby((StreamTask) key, value));
                } else {
                    this.tasks.replaceStandbyWithActive(convertStandbyToActive((StandbyTask) key, value));
                }
            } catch (RuntimeException e3) {
                this.log.error(String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", key.id()), e3);
                linkedHashMap.putIfAbsent(key.id(), e3);
                treeSet.add(key);
            }
        }
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            closeTaskDirty((Task) it.next(), true);
        }
        return linkedHashMap;
    }

    private StandbyTask convertActiveToStandby(StreamTask streamTask, Set<TopicPartition> set) {
        StandbyTask createStandbyTaskFromActive = this.standbyTaskCreator.createStandbyTaskFromActive(streamTask, set);
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(streamTask.id());
        return createStandbyTaskFromActive;
    }

    private StreamTask convertStandbyToActive(StandbyTask standbyTask, Set<TopicPartition> set) {
        return this.activeTaskCreator.createActiveTaskFromStandby(standbyTask, set, this.mainConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean tryToCompleteRestoration(long j, java.util.function.Consumer<Set<TopicPartition>> consumer) {
        boolean z = true;
        this.changelogReader.enforceRestoreActive();
        LinkedList<Task> linkedList = new LinkedList();
        for (Task task : this.tasks.allTasks()) {
            try {
                task.initializeIfNeeded();
                task.clearTaskTimeout();
            } catch (TimeoutException e) {
                task.maybeInitTaskTimeoutOrThrow(j, e);
                z = false;
            } catch (LockException e2) {
                this.log.debug("Could not initialize task {} since: {}; will retry", task.id(), e2.getMessage());
                z = false;
            }
            if (task.isActive()) {
                linkedList.add(task);
            }
        }
        if (z && !linkedList.isEmpty()) {
            Set<TopicPartition> completedChangelogs = this.changelogReader.completedChangelogs();
            for (Task task2 : linkedList) {
                if (completedChangelogs.containsAll(task2.changelogPartitions())) {
                    try {
                        task2.completeRestoration(consumer);
                        task2.clearTaskTimeout();
                    } catch (TimeoutException e3) {
                        task2.maybeInitTaskTimeoutOrThrow(j, e3);
                        this.log.debug(String.format("Could not complete restoration for %s due to the following exception; will retry", task2.id()), e3);
                        z = false;
                    }
                } else {
                    z = false;
                }
            }
        }
        if (z) {
            this.mainConsumer.resume(this.mainConsumer.assignment());
            this.changelogReader.transitToUpdateStandby();
        }
        return z;
    }

    public boolean checkStateUpdater(long j, java.util.function.Consumer<Set<TopicPartition>> consumer) {
        addTasksToStateUpdater();
        if (this.stateUpdater.hasExceptionsAndFailedTasks()) {
            handleExceptionsFromStateUpdater();
        }
        if (this.stateUpdater.hasRemovedTasks()) {
            handleRemovedTasksFromStateUpdater();
        }
        if (this.stateUpdater.restoresActiveTasks()) {
            handleRestoredTasksFromStateUpdater(j, consumer);
        }
        return (this.stateUpdater.restoresActiveTasks() || this.tasks.hasPendingTasksToRecycle() || this.tasks.hasPendingTasksToInit()) ? false : true;
    }

    private void recycleTaskFromStateUpdater(Task task, Set<TopicPartition> set, Set<Task> set2, Map<TaskId, RuntimeException> map) {
        Task task2 = null;
        try {
            task.suspend();
            task2 = task.isActive() ? convertActiveToStandby((StreamTask) task, set) : convertStandbyToActive((StandbyTask) task, set);
            task2.initializeIfNeeded();
            this.stateUpdater.add(task2);
        } catch (RuntimeException e) {
            TaskId id = task.id();
            this.log.error(String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", id), e);
            if (task.state() != Task.State.CLOSED) {
                set2.add(task);
            }
            if (task2 != null && task2.state() != Task.State.CLOSED) {
                set2.add(task2);
            }
            map.putIfAbsent(id, e);
        }
    }

    private void closeTaskClean(Task task, Set<Task> set, Map<TaskId, RuntimeException> map) {
        try {
            task.suspend();
            task.closeClean();
            if (task.isActive()) {
                this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
            }
        } catch (RuntimeException e) {
            this.log.error(String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id()), e);
            if (task.state() != Task.State.CLOSED) {
                set.add(task);
            }
            map.putIfAbsent(task.id(), e);
        }
    }

    private void transitRestoredTaskToRunning(Task task, long j, java.util.function.Consumer<Set<TopicPartition>> consumer) {
        try {
            task.completeRestoration(consumer);
            this.tasks.addTask(task);
            this.mainConsumer.resume(task.inputPartitions());
            task.clearTaskTimeout();
        } catch (TimeoutException e) {
            task.maybeInitTaskTimeoutOrThrow(j, e);
            this.log.debug(String.format("Could not complete restoration for %s due to the following exception; will retry", task.id()), e);
        }
    }

    private void addTasksToStateUpdater() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Task task : this.tasks.drainPendingTasksToInit()) {
            try {
                task.initializeIfNeeded();
                this.stateUpdater.add(task);
            } catch (RuntimeException e) {
                this.tasks.addTask(task);
                linkedHashMap.put(task.id(), e);
            } catch (LockException e2) {
                this.log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", e2);
                this.tasks.addPendingTasksToInit(Collections.singleton(task));
            }
        }
        maybeThrowTaskExceptions(linkedHashMap);
    }

    public void handleExceptionsFromStateUpdater() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (StateUpdater.ExceptionAndTasks exceptionAndTasks : this.stateUpdater.drainExceptionsAndFailedTasks()) {
            RuntimeException exception = exceptionAndTasks.exception();
            for (Task task : exceptionAndTasks.getTasks()) {
                this.tasks.addTask(task);
                linkedHashMap.put(task.id(), exception);
            }
        }
        maybeThrowTaskExceptions(linkedHashMap);
    }

    private void handleRemovedTasksFromStateUpdater() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        TreeSet treeSet = new TreeSet(Comparator.comparing((v0) -> {
            return v0.id();
        }));
        for (Task task : this.stateUpdater.drainRemovedTasks()) {
            Set<TopicPartition> removePendingTaskToRecycle = this.tasks.removePendingTaskToRecycle(task.id());
            if (removePendingTaskToRecycle != null) {
                recycleTaskFromStateUpdater(task, removePendingTaskToRecycle, treeSet, linkedHashMap);
            } else if (this.tasks.removePendingTaskToCloseClean(task.id())) {
                closeTaskClean(task, treeSet, linkedHashMap);
            } else if (this.tasks.removePendingTaskToCloseDirty(task.id())) {
                treeSet.add(task);
            } else {
                Set<TopicPartition> removePendingTaskToUpdateInputPartitions = this.tasks.removePendingTaskToUpdateInputPartitions(task.id());
                if (removePendingTaskToUpdateInputPartitions != null) {
                    task.updateInputPartitions(removePendingTaskToUpdateInputPartitions, this.topologyMetadata.nodeToSourceTopics(task.id()));
                    this.stateUpdater.add(task);
                } else {
                    if (!this.tasks.removePendingActiveTaskToSuspend(task.id())) {
                        throw new IllegalStateException("Got a removed task " + task.id() + " from the state updater that is not for recycle, closing, or updating input partitions; this should not happen");
                    }
                    task.suspend();
                    this.tasks.addTask(task);
                }
            }
        }
        Iterator<Task> it = treeSet.iterator();
        while (it.hasNext()) {
            closeTaskDirty(it.next(), false);
        }
        maybeThrowTaskExceptions(linkedHashMap);
    }

    private void handleRestoredTasksFromStateUpdater(long j, java.util.function.Consumer<Set<TopicPartition>> consumer) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        TreeSet treeSet = new TreeSet(Comparator.comparing((v0) -> {
            return v0.id();
        }));
        for (StreamTask streamTask : this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO)) {
            Set<TopicPartition> removePendingTaskToRecycle = this.tasks.removePendingTaskToRecycle(streamTask.id());
            if (removePendingTaskToRecycle != null) {
                recycleTaskFromStateUpdater(streamTask, removePendingTaskToRecycle, treeSet, linkedHashMap);
            } else if (this.tasks.removePendingTaskToCloseClean(streamTask.id())) {
                closeTaskClean(streamTask, treeSet, linkedHashMap);
            } else if (this.tasks.removePendingTaskToCloseDirty(streamTask.id())) {
                treeSet.add(streamTask);
            } else {
                Set<TopicPartition> removePendingTaskToUpdateInputPartitions = this.tasks.removePendingTaskToUpdateInputPartitions(streamTask.id());
                if (removePendingTaskToUpdateInputPartitions != null) {
                    streamTask.updateInputPartitions(removePendingTaskToUpdateInputPartitions, this.topologyMetadata.nodeToSourceTopics(streamTask.id()));
                    transitRestoredTaskToRunning(streamTask, j, consumer);
                } else if (this.tasks.removePendingActiveTaskToSuspend(streamTask.id())) {
                    streamTask.suspend();
                    this.tasks.addTask(streamTask);
                } else {
                    transitRestoredTaskToRunning(streamTask, j, consumer);
                }
            }
        }
        Iterator<Task> it = treeSet.iterator();
        while (it.hasNext()) {
            closeTaskDirty(it.next(), false);
        }
        maybeThrowTaskExceptions(linkedHashMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRevocation(Collection<TopicPartition> collection) {
        Set<TopicPartition> hashSet = new HashSet<>((Collection<? extends TopicPartition>) collection);
        Set<Task> hashSet2 = new HashSet<>();
        Set<Task> hashSet3 = new HashSet<>();
        Map<Task, Map<TopicPartition, OffsetAndMetadata>> hashMap = new HashMap<>();
        AtomicReference<RuntimeException> atomicReference = new AtomicReference<>(null);
        for (Task task : activeTaskIterable()) {
            if (hashSet.containsAll(task.inputPartitions())) {
                hashSet2.add(task);
                hashSet.removeAll(task.inputPartitions());
            } else if (task.commitNeeded()) {
                hashSet3.add(task);
            }
        }
        addRevokedTasksInStateUpdaterToPendingTasksToSuspend(hashSet);
        if (!hashSet.isEmpty()) {
            this.log.debug("The following revoked partitions {} are missing from the current task partitions. It could potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.", hashSet);
        }
        prepareCommitAndAddOffsetsToMap(hashSet2, hashMap);
        boolean z = !hashMap.isEmpty();
        if (z) {
            prepareCommitAndAddOffsetsToMap(hashSet3, hashMap);
        }
        HashSet hashSet4 = new HashSet();
        try {
            this.taskExecutor.commitOffsetsOrTransaction(hashMap);
        } catch (TimeoutException e) {
            this.log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");
            hashSet4.addAll(hashMap.keySet());
            closeDirtyAndRevive(hashSet4, false);
        } catch (RuntimeException e2) {
            this.log.error("Exception caught while committing those revoked tasks " + hashSet2, e2);
            atomicReference.compareAndSet(null, e2);
            hashSet4.addAll(hashMap.keySet());
        } catch (TaskCorruptedException e3) {
            this.log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", e3.corruptedTasks());
            hashSet4.addAll(this.tasks.tasks(e3.corruptedTasks()));
            closeDirtyAndRevive(hashSet4, true);
        }
        for (Task task2 : hashSet2) {
            if (!hashSet4.contains(task2)) {
                try {
                    task2.postCommit(true);
                } catch (RuntimeException e4) {
                    this.log.error("Exception caught while post-committing task " + task2.id(), e4);
                    maybeSetFirstException(false, maybeWrapTaskException(e4, task2.id()), atomicReference);
                }
            }
        }
        if (z) {
            for (Task task3 : hashSet3) {
                if (!hashSet4.contains(task3)) {
                    try {
                        task3.postCommit(false);
                    } catch (RuntimeException e5) {
                        this.log.error("Exception caught while post-committing task " + task3.id(), e5);
                        maybeSetFirstException(false, maybeWrapTaskException(e5, task3.id()), atomicReference);
                    }
                }
            }
        }
        for (Task task4 : hashSet2) {
            try {
                task4.suspend();
            } catch (RuntimeException e6) {
                this.log.error("Caught the following exception while trying to suspend revoked task " + task4.id(), e6);
                maybeSetFirstException(false, maybeWrapTaskException(e6, task4.id()), atomicReference);
            }
        }
        if (atomicReference.get() != null) {
            throw atomicReference.get();
        }
    }

    private void addRevokedTasksInStateUpdaterToPendingTasksToSuspend(Set<TopicPartition> set) {
        if (this.stateUpdater != null) {
            for (Task task : this.stateUpdater.getTasks()) {
                if (task.isActive() && set.containsAll(task.inputPartitions())) {
                    this.tasks.addPendingActiveTaskToSuspend(task.id());
                    set.removeAll(task.inputPartitions());
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable, org.apache.kafka.streams.errors.StreamsException] */
    private void prepareCommitAndAddOffsetsToMap(Set<Task> set, Map<Task, Map<TopicPartition, OffsetAndMetadata>> map) {
        for (Task task : set) {
            try {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                if (!prepareCommit.isEmpty()) {
                    map.put(task, prepareCommit);
                }
            } catch (Exception e) {
                throw new StreamsException(e, task.id());
            } catch (StreamsException e2) {
                e2.setTaskId(task.id());
                throw e2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleLostAll() {
        this.log.debug("Closing lost active tasks as zombies.");
        closeRunningTasksDirty();
        removeLostActiveTasksFromStateUpdater();
        if (this.processingMode == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            this.activeTaskCreator.reInitializeThreadProducer();
        }
    }

    private void closeRunningTasksDirty() {
        for (Task task : this.tasks.allTasks()) {
            if (task.isActive()) {
                closeTaskDirty(task, true);
            }
        }
    }

    private void removeLostActiveTasksFromStateUpdater() {
        if (this.stateUpdater != null) {
            for (Task task : this.stateUpdater.getTasks()) {
                if (task.isActive()) {
                    this.tasks.addPendingTaskToCloseDirty(task.id());
                    this.stateUpdater.remove(task.id());
                }
            }
        }
    }

    public void signalResume() {
        if (this.stateUpdater != null) {
            this.stateUpdater.signalResume();
        }
    }

    public Map<TaskId, Long> getTaskOffsetSums() {
        HashMap hashMap = new HashMap();
        Map<TaskId, Task> allTasks = allTasks();
        Set<TaskId> union = Utils.union(HashSet::new, new Set[]{this.lockedTaskDirectories, allTasks.keySet()});
        for (Task task : allTasks.values()) {
            if (task.state() != Task.State.CREATED && task.state() != Task.State.CLOSED) {
                Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
                if (changelogOffsets.isEmpty()) {
                    this.log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", task.id());
                } else {
                    hashMap.put(task.id(), Long.valueOf(sumOfChangelogOffsets(task.id(), changelogOffsets)));
                }
                union.remove(task.id());
            }
        }
        for (TaskId taskId : union) {
            File checkpointFileFor = this.stateDirectory.checkpointFileFor(taskId);
            try {
                if (checkpointFileFor.exists()) {
                    hashMap.put(taskId, Long.valueOf(sumOfChangelogOffsets(taskId, new OffsetCheckpoint(checkpointFileFor).read())));
                }
            } catch (IOException e) {
                this.log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", taskId), e);
            }
        }
        return hashMap;
    }

    private void tryToLockAllNonEmptyTaskDirectories() {
        this.lockedTaskDirectories.clear();
        Map<TaskId, Task> allTasks = allTasks();
        for (StateDirectory.TaskDirectory taskDirectory : this.stateDirectory.listNonEmptyTaskDirectories()) {
            File file = taskDirectory.file();
            try {
                TaskId parseTaskDirectoryName = StateManagerUtil.parseTaskDirectoryName(file.getName(), taskDirectory.namedTopology());
                if (this.stateDirectory.lock(parseTaskDirectoryName)) {
                    this.lockedTaskDirectories.add(parseTaskDirectoryName);
                    if (!allTasks.containsKey(parseTaskDirectoryName)) {
                        this.log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", parseTaskDirectoryName);
                    }
                }
            } catch (TaskIdFormatException e) {
            }
        }
    }

    private void releaseLockedDirectoriesForTasks(Set<TaskId> set) {
        Iterator<TaskId> it = this.lockedTaskDirectories.iterator();
        while (it.hasNext()) {
            TaskId next = it.next();
            if (set.contains(next)) {
                this.stateDirectory.unlock(next);
                it.remove();
            }
        }
    }

    private void releaseLockedUnassignedTaskDirectories() {
        Iterator<TaskId> it = this.lockedTaskDirectories.iterator();
        while (it.hasNext()) {
            TaskId next = it.next();
            if (!this.tasks.contains(next)) {
                this.stateDirectory.unlock(next);
                it.remove();
            }
        }
    }

    private long sumOfChangelogOffsets(TaskId taskId, Map<TopicPartition, Long> map) {
        long j = 0;
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            long longValue = entry.getValue().longValue();
            if (longValue == -2) {
                return -2L;
            }
            if (longValue != -4) {
                if (longValue < 0) {
                    throw new StreamsException(new IllegalStateException("Expected not to get a sentinel offset, but got: " + entry), taskId);
                }
                j += longValue;
                if (j < 0) {
                    this.log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", taskId);
                    return Long.MAX_VALUE;
                }
            }
        }
        return j;
    }

    private void closeTaskDirty(Task task, boolean z) {
        try {
            task.prepareCommit();
        } catch (RuntimeException e) {
            this.log.error("Error flushing caches of dirty task {} ", task.id(), e);
        }
        try {
            task.suspend();
        } catch (RuntimeException e2) {
            this.log.error("Error suspending dirty task {}: {}", task.id(), e2.getMessage());
        }
        task.closeDirty();
        if (z) {
            try {
                this.tasks.removeTask(task);
            } catch (RuntimeException e3) {
                this.log.error("Error removing dirty task {}: {}", task.id(), e3.getMessage());
                return;
            }
        }
        if (task.isActive()) {
            this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
        }
    }

    private void closeTaskClean(Task task) {
        task.closeClean();
        this.tasks.removeTask(task);
        if (task.isActive()) {
            this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown(boolean z) {
        shutdownStateUpdater();
        AtomicReference atomicReference = new AtomicReference(null);
        TreeSet treeSet = new TreeSet(Comparator.comparing((v0) -> {
            return v0.id();
        }));
        treeSet.addAll(this.tasks.activeTasks());
        executeAndMaybeSwallow(z, () -> {
            closeAndCleanUpTasks(treeSet, standbyTaskIterable(), z);
        }, (java.util.function.Consumer<RuntimeException>) runtimeException -> {
            atomicReference.compareAndSet(null, runtimeException);
        }, (java.util.function.Consumer<RuntimeException>) runtimeException2 -> {
            this.log.warn("Ignoring an exception while unlocking remaining task directories.", runtimeException2);
        });
        ActiveTaskCreator activeTaskCreator = this.activeTaskCreator;
        Objects.requireNonNull(activeTaskCreator);
        executeAndMaybeSwallow(z, activeTaskCreator::closeThreadProducerIfNeeded, (java.util.function.Consumer<RuntimeException>) runtimeException3 -> {
            atomicReference.compareAndSet(null, runtimeException3);
        }, (java.util.function.Consumer<RuntimeException>) runtimeException4 -> {
            this.log.warn("Ignoring an exception while closing thread producer.", runtimeException4);
        });
        this.tasks.clear();
        executeAndMaybeSwallow(z, this::releaseLockedUnassignedTaskDirectories, (java.util.function.Consumer<RuntimeException>) runtimeException5 -> {
            atomicReference.compareAndSet(null, runtimeException5);
        }, (java.util.function.Consumer<RuntimeException>) runtimeException6 -> {
            this.log.warn("Ignoring an exception while unlocking remaining task directories.", runtimeException6);
        });
        RuntimeException runtimeException7 = (RuntimeException) atomicReference.get();
        if (runtimeException7 != null) {
            throw runtimeException7;
        }
        this.log.info("Shutdown complete");
    }

    private void shutdownStateUpdater() {
        if (this.stateUpdater != null) {
            this.stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
            closeFailedTasksFromStateUpdater();
            addRestoredTasksToTaskRegistry();
            addRemovedTasksToTaskRegistry();
        }
    }

    private void closeFailedTasksFromStateUpdater() {
        for (Task task : (Set) this.stateUpdater.drainExceptionsAndFailedTasks().stream().flatMap(exceptionAndTasks -> {
            return exceptionAndTasks.getTasks().stream();
        }).collect(Collectors.toSet())) {
            try {
                task.prepareCommit();
            } catch (RuntimeException e) {
                this.log.error("Error flushing caches of dirty task {} ", task.id(), e);
            }
            try {
                task.suspend();
            } catch (RuntimeException e2) {
                this.log.error("Error suspending dirty task {}: {}", task.id(), e2.getMessage());
            }
            task.closeDirty();
            try {
                if (task.isActive()) {
                    this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
                }
            } catch (RuntimeException e3) {
                this.log.error("Error closing dirty task {}: {}", task.id(), e3.getMessage());
            }
        }
    }

    private void addRestoredTasksToTaskRegistry() {
        this.tasks.addActiveTasks((Collection) this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO).stream().map(streamTask -> {
            return streamTask;
        }).collect(Collectors.toSet()));
    }

    private void addRemovedTasksToTaskRegistry() {
        Set<Task> drainRemovedTasks = this.stateUpdater.drainRemovedTasks();
        HashSet hashSet = new HashSet();
        Iterator<Task> it = drainRemovedTasks.iterator();
        while (it.hasNext()) {
            Task next = it.next();
            if (next.isActive()) {
                it.remove();
                hashSet.add(next);
            }
        }
        this.tasks.addActiveTasks(hashSet);
        this.tasks.addStandbyTasks(drainRemovedTasks);
    }

    void closeAndCleanUpTasks(Collection<Task> collection, Collection<Task> collection2, boolean z) {
        AtomicReference<RuntimeException> atomicReference = new AtomicReference<>(null);
        HashSet hashSet = new HashSet();
        hashSet.addAll(tryCloseCleanActiveTasks(collection, z, atomicReference));
        hashSet.addAll(tryCloseCleanStandbyTasks(collection2, z, atomicReference));
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            closeTaskDirty((Task) it.next(), true);
        }
        RuntimeException runtimeException = atomicReference.get();
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Collection<Task> tryCloseCleanActiveTasks(Collection<Task> collection, boolean z, AtomicReference<RuntimeException> atomicReference) {
        if (!z) {
            return activeTaskIterable();
        }
        Comparator comparing = Comparator.comparing((v0) -> {
            return v0.id();
        });
        TreeSet treeSet = new TreeSet(comparing);
        TreeSet treeSet2 = new TreeSet(comparing);
        TreeSet<Task> treeSet3 = new TreeSet(comparing);
        HashMap hashMap = new HashMap();
        for (Task task : collection) {
            try {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                treeSet.add(task);
                if (!prepareCommit.isEmpty()) {
                    hashMap.put(task, prepareCommit);
                }
                treeSet3.add(task);
            } catch (RuntimeException e) {
                atomicReference.compareAndSet(null, new StreamsException(e, task.id()));
                treeSet2.add(task);
            } catch (TaskMigratedException e2) {
                treeSet2.add(task);
            } catch (StreamsException e3) {
                e3.setTaskId(task.id());
                atomicReference.compareAndSet(null, e3);
                treeSet2.add(task);
            }
        }
        if (this.processingMode != StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2 || treeSet2.isEmpty()) {
            try {
                this.taskExecutor.commitOffsetsOrTransaction(hashMap);
            } catch (RuntimeException e4) {
                this.log.error("Exception caught while committing tasks " + hashMap.keySet(), e4);
                maybeSetFirstException(false, e4, atomicReference);
                if (e4 instanceof TaskCorruptedException) {
                    Set<TaskId> corruptedTasks = ((TaskCorruptedException) e4).corruptedTasks();
                    Set set = (Set) treeSet.stream().filter(task2 -> {
                        return corruptedTasks.contains(task2.id());
                    }).collect(Collectors.toSet());
                    treeSet3.removeAll(set);
                    treeSet2.addAll(set);
                } else {
                    treeSet3.removeAll(treeSet);
                    treeSet2.addAll(treeSet);
                }
            }
            for (Task task3 : activeTaskIterable()) {
                try {
                    task3.postCommit(true);
                } catch (RuntimeException e5) {
                    this.log.error("Exception caught while post-committing task " + task3.id(), e5);
                    maybeSetFirstException(false, maybeWrapTaskException(e5, task3.id()), atomicReference);
                    treeSet2.add(task3);
                    treeSet3.remove(task3);
                }
            }
        } else {
            treeSet3.removeAll(treeSet);
            treeSet2.addAll(treeSet);
        }
        for (Task task4 : treeSet3) {
            try {
                task4.suspend();
                closeTaskClean(task4);
            } catch (RuntimeException e6) {
                this.log.error("Exception caught while clean-closing active task {}: {}", task4.id(), e6.getMessage());
                if (task4.state() != Task.State.CLOSED) {
                    treeSet2.add(task4);
                }
                maybeSetFirstException(true, maybeWrapTaskException(e6, task4.id()), atomicReference);
            }
        }
        return treeSet2;
    }

    private Collection<Task> tryCloseCleanStandbyTasks(Collection<Task> collection, boolean z, AtomicReference<RuntimeException> atomicReference) {
        if (!z) {
            return standbyTaskIterable();
        }
        HashSet hashSet = new HashSet();
        for (Task task : collection) {
            try {
                task.prepareCommit();
                task.postCommit(true);
                task.suspend();
                closeTaskClean(task);
            } catch (RuntimeException e) {
                this.log.error("Exception caught while clean-closing standby task {}: {}", task.id(), e.getMessage());
                if (task.state() != Task.State.CLOSED) {
                    hashSet.add(task);
                }
                maybeSetFirstException(true, maybeWrapTaskException(e, task.id()), atomicReference);
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> activeTaskIds() {
        return (Set) activeTaskStream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> standbyTaskIds() {
        return (Set) standbyTaskStream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    Map<TaskId, Task> allTasks() {
        if (this.stateUpdater == null) {
            return this.tasks.allTasksPerId();
        }
        Map<TaskId, Task> map = (Map) this.stateUpdater.getTasks().stream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, task -> {
            return task;
        }));
        map.putAll(this.tasks.allTasksPerId());
        return map;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> allOwnedTasks() {
        return this.tasks.allTasksPerId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Task> readOnlyAllTasks() {
        if (this.stateUpdater == null) {
            return Collections.unmodifiableSet(this.tasks.allTasks());
        }
        HashSet hashSet = new HashSet(this.stateUpdater.getTasks());
        hashSet.addAll(this.tasks.allTasks());
        return Collections.unmodifiableSet(hashSet);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> notPausedTasks() {
        return Collections.unmodifiableMap((Map) this.tasks.allTasks().stream().filter(task -> {
            return !this.topologyMetadata.isPaused(task.id().topologyName());
        }).collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, task2 -> {
            return task2;
        })));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> activeTaskMap() {
        return (Map) activeTaskStream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, task -> {
            return task;
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<Task> activeTaskIterable() {
        return (List) activeTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> activeTaskStream() {
        return this.tasks.allTasks().stream().filter((v0) -> {
            return v0.isActive();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> standbyTaskMap() {
        return (Map) standbyTaskStream().collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, task -> {
            return task;
        }));
    }

    private List<Task> standbyTaskIterable() {
        return (List) standbyTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> standbyTaskStream() {
        return this.tasks.allTasks().stream().filter(task -> {
            return !task.isActive();
        });
    }

    int commitAll() {
        return commit(this.tasks.allTasks());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addRecordsToTasks(ConsumerRecords<byte[], byte[]> consumerRecords) {
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            Task activeTasksForInputPartition = this.tasks.activeTasksForInputPartition(topicPartition);
            if (activeTasksForInputPartition == null) {
                this.log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", topicPartition, toString(">"));
                throw new NullPointerException("Task was unexpectedly missing for partition " + topicPartition);
            }
            activeTasksForInputPartition.addRecords(topicPartition, consumerRecords.records(topicPartition));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int commit(Collection<Task> collection) {
        int i = 0;
        HashMap hashMap = new HashMap();
        try {
            i = commitTasksAndMaybeUpdateCommittableOffsets(collection, hashMap);
        } catch (TimeoutException e) {
            hashMap.keySet().forEach(task -> {
                task.maybeInitTaskTimeoutOrThrow(this.time.milliseconds(), e);
            });
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommitActiveTasksPerUserRequested() {
        if (this.rebalanceInProgress) {
            return -1;
        }
        for (Task task : activeTaskIterable()) {
            if (task.commitRequested() && task.commitNeeded()) {
                return commit(activeTaskIterable());
            }
        }
        return 0;
    }

    private int commitTasksAndMaybeUpdateCommittableOffsets(Collection<Task> collection, Map<Task, Map<TopicPartition, OffsetAndMetadata>> map) {
        if (this.rebalanceInProgress) {
            return -1;
        }
        return this.taskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(collection, map);
    }

    public void updateTaskEndMetadata(TopicPartition topicPartition, Long l) {
        for (Task task : this.tasks.activeTasks()) {
            if ((task instanceof StreamTask) && task.inputPartitions().contains(topicPartition)) {
                ((StreamTask) task).updateEndOffsets(topicPartition, l);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleTopologyUpdates() {
        this.topologyMetadata.executeTopologyUpdatesAndBumpThreadVersion(this::createPendingTasks, this::maybeCloseTasksFromRemovedTopologies);
        if (this.topologyMetadata.isEmpty()) {
            this.log.info("Proactively unsubscribing from all topics due to empty topology");
            this.mainConsumer.unsubscribe();
        }
        this.topologyMetadata.maybeNotifyTopologyVersionListeners();
    }

    void maybeCloseTasksFromRemovedTopologies(Set<String> set) {
        try {
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            for (Task task : this.tasks.allTasks()) {
                if (!set.contains(task.id().topologyName())) {
                    if (task.isActive()) {
                        hashSet.add(task);
                    } else {
                        hashSet2.add(task);
                    }
                }
            }
            Set union = Utils.union(HashSet::new, new Set[]{hashSet, hashSet2});
            closeAndCleanUpTasks(hashSet, hashSet2, true);
            releaseLockedDirectoriesForTasks((Set) union.stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet()));
        } catch (Exception e) {
            this.log.error("Caught the following exception while closing tasks from a removed topology:", e);
        }
    }

    void createPendingTasks(Set<String> set) {
        createNewTasks(this.tasks.drainPendingActiveTasksForTopologies(set), this.tasks.drainPendingStandbyTasksForTopologies(set));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process(int i, Time time) {
        return this.taskExecutor.process(i, time);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordTaskProcessRatio(long j, long j2) {
        Iterator<Task> it = activeTaskIterable().iterator();
        while (it.hasNext()) {
            it.next().recordProcessTimeRatioAndBufferSize(j, j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        return this.taskExecutor.punctuate();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybePurgeCommittedRecords() {
        if (this.deleteRecordsResult == null || this.deleteRecordsResult.all().isDone()) {
            if (this.deleteRecordsResult != null && this.deleteRecordsResult.all().isCompletedExceptionally()) {
                this.log.debug("Previous delete-records request has failed: {}. Try sending the new request now", this.deleteRecordsResult.lowWatermarks());
            }
            HashMap hashMap = new HashMap();
            Iterator<Task> it = activeTaskIterable().iterator();
            while (it.hasNext()) {
                for (Map.Entry<TopicPartition, Long> entry : it.next().purgeableOffsets().entrySet()) {
                    hashMap.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue().longValue()));
                }
            }
            if (hashMap.isEmpty()) {
                return;
            }
            this.deleteRecordsResult = this.adminClient.deleteRecords(hashMap);
            this.log.trace("Sent delete-records request: {}", hashMap);
        }
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("TaskManager\n");
        sb.append(str).append("\tMetadataState:\n");
        sb.append(str).append("\tTasks:\n");
        for (Task task : this.tasks.allTasks()) {
            sb.append(str).append("\t\t").append(task.id()).append(" ").append(task.state()).append(" ").append(task.getClass().getSimpleName()).append('(').append(task.isActive() ? "active" : "standby").append(')');
        }
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<MetricName, Metric> producerMetrics() {
        return this.activeTaskCreator.producerMetrics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> producerClientIds() {
        return this.activeTaskCreator.producerClientIds();
    }

    Set<TaskId> lockedTaskDirectories() {
        return Collections.unmodifiableSet(this.lockedTaskDirectories);
    }

    private void maybeSetFirstException(boolean z, RuntimeException runtimeException, AtomicReference<RuntimeException> atomicReference) {
        if (z && (runtimeException instanceof TaskMigratedException)) {
            return;
        }
        atomicReference.compareAndSet(null, runtimeException);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private StreamsException maybeWrapTaskException(RuntimeException runtimeException, TaskId taskId) {
        if (!(runtimeException instanceof StreamsException)) {
            return new StreamsException(runtimeException, taskId);
        }
        StreamsException streamsException = (StreamsException) runtimeException;
        streamsException.setTaskId(taskId);
        return streamsException;
    }

    public static void executeAndMaybeSwallow(boolean z, Runnable runnable, java.util.function.Consumer<RuntimeException> consumer, java.util.function.Consumer<RuntimeException> consumer2) {
        try {
            runnable.run();
        } catch (RuntimeException e) {
            if (z) {
                consumer.accept(e);
            } else {
                consumer2.accept(e);
            }
        }
    }

    public static void executeAndMaybeSwallow(boolean z, Runnable runnable, String str, Logger logger) {
        executeAndMaybeSwallow(z, runnable, (java.util.function.Consumer<RuntimeException>) runtimeException -> {
            throw runtimeException;
        }, (java.util.function.Consumer<RuntimeException>) runtimeException2 -> {
            logger.debug("Ignoring error in unclean {}", str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean needsInitializationOrRestoration() {
        return activeTaskIterable().stream().anyMatch((v0) -> {
            return v0.needsInitializationOrRestoration();
        });
    }

    void addTask(Task task) {
        this.tasks.addTask(task);
    }

    TasksRegistry tasks() {
        return this.tasks;
    }
}
