package org.apache.flink.runtime.blocklist;

import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/blocklist/DefaultBlocklistHandler.class */
public class DefaultBlocklistHandler implements BlocklistHandler, AutoCloseable {
    private final Logger log;
    private final Function<ResourceID, String> taskManagerNodeIdRetriever;
    private final BlocklistTracker blocklistTracker;
    private final BlocklistContext blocklistContext;
    private final Set<BlocklistListener> blocklistListeners = new HashSet();
    private final Duration timeoutCheckInterval;
    private volatile ScheduledFuture<?> timeoutCheckFuture;
    private final ComponentMainThreadExecutor mainThreadExecutor;

    /* loaded from: input_file:org/apache/flink/runtime/blocklist/DefaultBlocklistHandler$Factory.class */
    public static class Factory implements BlocklistHandler.Factory {
        private final Duration timeoutCheckInterval;

        public Factory(Duration duration) {
            this.timeoutCheckInterval = (Duration) Preconditions.checkNotNull(duration);
        }

        @Override // org.apache.flink.runtime.blocklist.BlocklistHandler.Factory
        public BlocklistHandler create(BlocklistContext blocklistContext, Function<ResourceID, String> function, ComponentMainThreadExecutor componentMainThreadExecutor, Logger logger) {
            logger.info("DefaultBlocklistHandler was created as BlocklistHandler.");
            return new DefaultBlocklistHandler(new DefaultBlocklistTracker(), blocklistContext, function, this.timeoutCheckInterval, componentMainThreadExecutor, logger);
        }
    }

    DefaultBlocklistHandler(BlocklistTracker blocklistTracker, BlocklistContext blocklistContext, Function<ResourceID, String> function, Duration duration, ComponentMainThreadExecutor componentMainThreadExecutor, Logger logger) {
        this.blocklistTracker = (BlocklistTracker) Preconditions.checkNotNull(blocklistTracker);
        this.blocklistContext = (BlocklistContext) Preconditions.checkNotNull(blocklistContext);
        this.taskManagerNodeIdRetriever = (Function) Preconditions.checkNotNull(function);
        this.timeoutCheckInterval = (Duration) Preconditions.checkNotNull(duration);
        this.mainThreadExecutor = (ComponentMainThreadExecutor) Preconditions.checkNotNull(componentMainThreadExecutor);
        this.log = (Logger) Preconditions.checkNotNull(logger);
        scheduleTimeoutCheck();
    }

    private void scheduleTimeoutCheck() {
        this.timeoutCheckFuture = this.mainThreadExecutor.schedule(() -> {
            removeTimeoutNodes();
            scheduleTimeoutCheck();
        }, this.timeoutCheckInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void removeTimeoutNodes() {
        assertRunningInMainThread();
        Collection<BlockedNode> removeTimeoutNodes = this.blocklistTracker.removeTimeoutNodes(System.currentTimeMillis());
        if (removeTimeoutNodes.isEmpty()) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Remove {} timeout blocked nodes, details {}. Total {} blocked nodes currently, details: {}.", new Object[]{Integer.valueOf(removeTimeoutNodes.size()), removeTimeoutNodes, Integer.valueOf(this.blocklistTracker.getAllBlockedNodes().size()), this.blocklistTracker.getAllBlockedNodes()});
        } else {
            this.log.info("Remove {} timeout blocked nodes. Total {} blocked nodes currently.", Integer.valueOf(removeTimeoutNodes.size()), Integer.valueOf(this.blocklistTracker.getAllBlockedNodes().size()));
        }
        this.blocklistContext.unblockResources(removeTimeoutNodes);
    }

    private void assertRunningInMainThread() {
        this.mainThreadExecutor.assertRunningInMainThread();
    }

    @Override // org.apache.flink.runtime.blocklist.BlocklistHandler
    public void addNewBlockedNodes(Collection<BlockedNode> collection, boolean z) {
        assertRunningInMainThread();
        this.log.info("Add new blocked nodes {}.", collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
        if (collection.isEmpty()) {
            return;
        }
        BlockedNodeAdditionResult addNewBlockedNodes = this.blocklistTracker.addNewBlockedNodes(collection);
        Collection<BlockedNode> newlyAddedNodes = addNewBlockedNodes.getNewlyAddedNodes();
        Collection collection2 = (Collection) Stream.concat(newlyAddedNodes.stream(), addNewBlockedNodes.getMergedNodes().stream()).collect(Collectors.toList());
        if (newlyAddedNodes.isEmpty()) {
            if (collection2.isEmpty()) {
                return;
            }
            this.blocklistListeners.forEach(blocklistListener -> {
                blocklistListener.notifyNewBlockedNodes(collection2, z);
            });
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Newly added {} blocked nodes, details: {}. Total {} blocked nodes currently, details: {}.", new Object[]{Integer.valueOf(newlyAddedNodes.size()), newlyAddedNodes, Integer.valueOf(this.blocklistTracker.getAllBlockedNodes().size()), this.blocklistTracker.getAllBlockedNodes()});
            } else {
                this.log.info("Newly added {} blocked nodes. Total {} blocked nodes currently.", Integer.valueOf(newlyAddedNodes.size()), Integer.valueOf(this.blocklistTracker.getAllBlockedNodes().size()));
            }
            this.blocklistListeners.forEach(blocklistListener2 -> {
                blocklistListener2.notifyNewBlockedNodes(collection2, z);
            });
            this.blocklistContext.blockResources(newlyAddedNodes, z);
        }
    }

    @Override // org.apache.flink.runtime.blocklist.BlocklistHandler
    public boolean isBlockedTaskManager(ResourceID resourceID) {
        assertRunningInMainThread();
        return this.blocklistTracker.isBlockedNode((String) Preconditions.checkNotNull(this.taskManagerNodeIdRetriever.apply(resourceID)));
    }

    @Override // org.apache.flink.runtime.blocklist.BlocklistHandler
    public Set<String> getAllBlockedNodeIds() {
        assertRunningInMainThread();
        return this.blocklistTracker.getAllBlockedNodeIds();
    }

    @Override // org.apache.flink.runtime.blocklist.BlocklistHandler
    public void registerBlocklistListener(BlocklistListener blocklistListener) {
        assertRunningInMainThread();
        Preconditions.checkNotNull(blocklistListener);
        if (this.blocklistListeners.contains(blocklistListener)) {
            return;
        }
        this.blocklistListeners.add(blocklistListener);
        Collection<BlockedNode> allBlockedNodes = this.blocklistTracker.getAllBlockedNodes();
        if (allBlockedNodes.isEmpty()) {
            return;
        }
        blocklistListener.notifyNewBlockedNodes(allBlockedNodes, false);
    }

    @Override // org.apache.flink.runtime.blocklist.BlocklistHandler
    public void deregisterBlocklistListener(BlocklistListener blocklistListener) {
        assertRunningInMainThread();
        Preconditions.checkNotNull(blocklistListener);
        this.blocklistListeners.remove(blocklistListener);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.timeoutCheckFuture != null) {
            this.timeoutCheckFuture.cancel(false);
        }
    }
}
