package org.apache.hadoop.hdfs.server.blockmanagement;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate({"HDFS"})
/* loaded from: input_file:org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.class */
public class CacheReplicationMonitor extends Thread implements Closeable {
    private static final Logger LOG;
    private final FSNamesystem namesystem;
    private final BlockManager blockManager;
    private final CacheManager cacheManager;
    private final GSet<CachedBlock, CachedBlock> cachedBlocks;
    private static final Random random;
    private final long intervalMs;
    private final ReentrantLock lock;
    private final Condition doRescan;
    private final Condition scanFinished;
    private long completedScanCount = 0;
    private long curScanCount = -1;
    private long neededScanCount = 0;
    private boolean shutdown = false;
    private boolean mark = false;
    private int scannedDirectives;
    private long scannedBlocks;
    static final /* synthetic */ boolean $assertionsDisabled;

    public CacheReplicationMonitor(FSNamesystem fSNamesystem, CacheManager cacheManager, long j, ReentrantLock reentrantLock) {
        this.namesystem = fSNamesystem;
        this.blockManager = fSNamesystem.getBlockManager();
        this.cacheManager = cacheManager;
        this.cachedBlocks = cacheManager.getCachedBlocks();
        this.intervalMs = j;
        this.lock = reentrantLock;
        this.doRescan = this.lock.newCondition();
        this.scanFinished = this.lock.newCondition();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        long j = 0;
        Thread.currentThread().setName("CacheReplicationMonitor(" + System.identityHashCode(this) + ")");
        LOG.info("Starting CacheReplicationMonitor with interval " + this.intervalMs + " milliseconds");
        try {
            long monotonicNow = Time.monotonicNow();
            while (true) {
                this.lock.lock();
                while (!this.shutdown) {
                    try {
                        if (this.completedScanCount < this.neededScanCount) {
                            LOG.debug("Rescanning because of pending operations");
                        } else {
                            long j2 = (j + this.intervalMs) - monotonicNow;
                            if (j2 <= 0) {
                                LOG.debug("Rescanning after {} milliseconds", Long.valueOf(monotonicNow - j));
                            } else {
                                this.doRescan.await(j2, TimeUnit.MILLISECONDS);
                                monotonicNow = Time.monotonicNow();
                            }
                        }
                        this.lock.unlock();
                        j = monotonicNow;
                        this.mark = !this.mark;
                        rescan();
                        monotonicNow = Time.monotonicNow();
                        this.lock.lock();
                        try {
                            this.completedScanCount = this.curScanCount;
                            this.curScanCount = -1L;
                            this.scanFinished.signalAll();
                            this.lock.unlock();
                            LOG.debug("Scanned {} directive(s) and {} block(s) in {} millisecond(s).", new Object[]{Integer.valueOf(this.scannedDirectives), Long.valueOf(this.scannedBlocks), Long.valueOf(monotonicNow - j)});
                        } finally {
                        }
                    } finally {
                    }
                }
                LOG.debug("Shutting down CacheReplicationMonitor");
                this.lock.unlock();
                return;
            }
        } catch (InterruptedException e) {
            LOG.info("Shutting down CacheReplicationMonitor.");
        } catch (Throwable th) {
            LOG.error("Thread exiting", th);
            ExitUtil.terminate(1, th);
        }
    }

    public void waitForRescanIfNeeded() {
        Preconditions.checkArgument(!this.namesystem.hasWriteLock(), "Must not hold the FSN write lock when waiting for a rescan.");
        Preconditions.checkArgument(this.lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan.");
        if (this.neededScanCount <= this.completedScanCount) {
            return;
        }
        if (this.curScanCount < 0) {
            this.doRescan.signal();
        }
        while (!this.shutdown && this.completedScanCount < this.neededScanCount) {
            try {
                this.scanFinished.await();
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for CacheReplicationMonitor rescan", e);
                return;
            }
        }
    }

    public void setNeedsRescan() {
        Preconditions.checkArgument(this.lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit.");
        if (this.curScanCount >= 0) {
            this.neededScanCount = this.curScanCount + 1;
        } else {
            this.neededScanCount = this.completedScanCount + 1;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Preconditions.checkArgument(this.namesystem.hasWriteLock());
        this.lock.lock();
        try {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            this.doRescan.signalAll();
            this.scanFinished.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    private void rescan() throws InterruptedException {
        this.scannedDirectives = 0;
        this.scannedBlocks = 0L;
        try {
            this.namesystem.writeLock();
            try {
                this.lock.lock();
                if (this.shutdown) {
                    throw new InterruptedException("CacheReplicationMonitor was shut down.");
                }
                this.curScanCount = this.completedScanCount + 1;
                this.lock.unlock();
                resetStatistics();
                rescanCacheDirectives();
                rescanCachedBlockMap();
                this.blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } finally {
            this.namesystem.writeUnlock();
        }
    }

    private void resetStatistics() {
        Iterator<CachePool> it = this.cacheManager.getCachePools().iterator();
        while (it.hasNext()) {
            it.next().resetStatistics();
        }
        Iterator<CacheDirective> it2 = this.cacheManager.getCacheDirectives().iterator();
        while (it2.hasNext()) {
            it2.next().resetStatistics();
        }
    }

    private void rescanCacheDirectives() {
        FSDirectory fSDirectory = this.namesystem.getFSDirectory();
        long time = new Date().getTime();
        for (CacheDirective cacheDirective : this.cacheManager.getCacheDirectives()) {
            this.scannedDirectives++;
            if (cacheDirective.getExpiryTime() <= 0 || cacheDirective.getExpiryTime() > time) {
                String path = cacheDirective.getPath();
                try {
                    INode iNode = fSDirectory.getINode(path);
                    if (iNode == null) {
                        LOG.debug("Directive {}: No inode found at {}", Long.valueOf(cacheDirective.getId()), path);
                    } else if (iNode.isDirectory()) {
                        for (INode iNode2 : iNode.asDirectory().getChildrenList(Snapshot.CURRENT_STATE_ID)) {
                            if (iNode2.isFile()) {
                                rescanFile(cacheDirective, iNode2.asFile());
                            }
                        }
                    } else if (iNode.isFile()) {
                        rescanFile(cacheDirective, iNode.asFile());
                    } else {
                        LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ", Long.valueOf(cacheDirective.getId()), iNode);
                    }
                } catch (UnresolvedLinkException e) {
                    LOG.debug("Directive {}: got UnresolvedLinkException while resolving path {}", Long.valueOf(cacheDirective.getId()), path);
                }
            } else {
                LOG.debug("Directive {}: the directive expired at {} (now = {})", new Object[]{Long.valueOf(cacheDirective.getId()), Long.valueOf(cacheDirective.getExpiryTime()), Long.valueOf(time)});
            }
        }
    }

    private void rescanFile(CacheDirective cacheDirective, INodeFile iNodeFile) {
        BlockInfoContiguous[] blocks = iNodeFile.getBlocks();
        cacheDirective.addFilesNeeded(1L);
        long computeFileSizeNotIncludingLastUcBlock = iNodeFile.computeFileSizeNotIncludingLastUcBlock() * cacheDirective.getReplication();
        cacheDirective.addBytesNeeded(computeFileSizeNotIncludingLastUcBlock);
        CachePool pool = cacheDirective.getPool();
        if (pool.getBytesNeeded() > pool.getLimit()) {
            LOG.debug("Directive {}: not scanning file {} because bytesNeeded for pool {} is {}, but the pool's limit is {}", new Object[]{Long.valueOf(cacheDirective.getId()), iNodeFile.getFullPathName(), pool.getPoolName(), Long.valueOf(pool.getBytesNeeded()), Long.valueOf(pool.getLimit())});
            return;
        }
        long j = 0;
        for (BlockInfoContiguous blockInfoContiguous : blocks) {
            if (blockInfoContiguous.getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE)) {
                CachedBlock cachedBlock = new CachedBlock(new Block(blockInfoContiguous.getBlockId()).getBlockId(), cacheDirective.getReplication(), this.mark);
                CachedBlock cachedBlock2 = (CachedBlock) this.cachedBlocks.get(cachedBlock);
                if (cachedBlock2 == null) {
                    this.cachedBlocks.put(cachedBlock);
                    cachedBlock2 = cachedBlock;
                } else {
                    j += Math.min(cachedBlock2.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.CACHED).size(), (int) cacheDirective.getReplication()) * blockInfoContiguous.getNumBytes();
                    if (this.mark != cachedBlock2.getMark() || cachedBlock2.getReplication() < cacheDirective.getReplication()) {
                        cachedBlock2.setReplicationAndMark(cacheDirective.getReplication(), this.mark);
                    }
                }
                LOG.trace("Directive {}: setting replication for block {} to {}", new Object[]{Long.valueOf(cacheDirective.getId()), blockInfoContiguous, Short.valueOf(cachedBlock2.getReplication())});
            } else {
                LOG.trace("Directive {}: can't cache block {} because it is in state {}, not COMPLETE.", new Object[]{Long.valueOf(cacheDirective.getId()), blockInfoContiguous, blockInfoContiguous.getBlockUCState()});
            }
        }
        cacheDirective.addBytesCached(j);
        if (j == computeFileSizeNotIncludingLastUcBlock) {
            cacheDirective.addFilesCached(1L);
        }
        LOG.debug("Directive {}: caching {}: {}/{} bytes", new Object[]{Long.valueOf(cacheDirective.getId()), iNodeFile.getFullPathName(), Long.valueOf(j), Long.valueOf(computeFileSizeNotIncludingLastUcBlock)});
    }

    private String findReasonForNotCaching(CachedBlock cachedBlock, BlockInfoContiguous blockInfoContiguous) {
        if (blockInfoContiguous == null) {
            return "not tracked by the BlockManager";
        }
        if (!blockInfoContiguous.isComplete()) {
            return "not complete";
        }
        if (cachedBlock.getReplication() == 0) {
            return "not needed by any directives";
        }
        if (cachedBlock.getMark() == this.mark) {
            return null;
        }
        cachedBlock.setReplicationAndMark((short) 0, this.mark);
        return "no longer needed by any directives";
    }

    private void rescanCachedBlockMap() {
        for (DatanodeDescriptor datanodeDescriptor : this.blockManager.getDatanodeManager().getDatanodes()) {
            long cacheRemaining = datanodeDescriptor.getCacheRemaining();
            Iterator it = datanodeDescriptor.getPendingCached().iterator();
            while (it.hasNext()) {
                CachedBlock cachedBlock = (CachedBlock) it.next();
                BlockInfoContiguous storedBlock = this.blockManager.getStoredBlock(new Block(cachedBlock.getBlockId()));
                if (storedBlock == null) {
                    LOG.debug("Block {}: cannot be found in block manager and hence skipped from calculation for node {}.", Long.valueOf(cachedBlock.getBlockId()), datanodeDescriptor.getDatanodeUuid());
                } else if (storedBlock.getNumBytes() > cacheRemaining) {
                    LOG.debug("Block {}: removing from PENDING_CACHED for node {} because it cannot fit in remaining cache size {}.", new Object[]{Long.valueOf(cachedBlock.getBlockId()), datanodeDescriptor.getDatanodeUuid(), Long.valueOf(cacheRemaining)});
                    it.remove();
                } else {
                    cacheRemaining -= storedBlock.getNumBytes();
                }
            }
        }
        Iterator it2 = this.cachedBlocks.iterator();
        while (it2.hasNext()) {
            this.scannedBlocks++;
            CachedBlock cachedBlock2 = (CachedBlock) it2.next();
            List<DatanodeDescriptor> datanodes = cachedBlock2.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.PENDING_CACHED);
            List<DatanodeDescriptor> datanodes2 = cachedBlock2.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.CACHED);
            List<DatanodeDescriptor> datanodes3 = cachedBlock2.getDatanodes(DatanodeDescriptor.CachedBlocksList.Type.PENDING_UNCACHED);
            Iterator<DatanodeDescriptor> it3 = datanodes3.iterator();
            while (it3.hasNext()) {
                DatanodeDescriptor next = it3.next();
                if (!cachedBlock2.isInList(next.getCached())) {
                    LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} because the DataNode uncached it.", Long.valueOf(cachedBlock2.getBlockId()), next.getDatanodeUuid());
                    next.getPendingUncached().remove(cachedBlock2);
                    it3.remove();
                }
            }
            String findReasonForNotCaching = findReasonForNotCaching(cachedBlock2, this.blockManager.getStoredBlock(new Block(cachedBlock2.getBlockId())));
            short s = 0;
            if (findReasonForNotCaching != null) {
                LOG.trace("Block {}: can't cache block because it is {}", Long.valueOf(cachedBlock2.getBlockId()), findReasonForNotCaching);
            } else {
                s = cachedBlock2.getReplication();
            }
            int size = datanodes2.size();
            if (size >= s) {
                Iterator<DatanodeDescriptor> it4 = datanodes.iterator();
                while (it4.hasNext()) {
                    DatanodeDescriptor next2 = it4.next();
                    next2.getPendingCached().remove(cachedBlock2);
                    it4.remove();
                    LOG.trace("Block {}: removing from PENDING_CACHED for node {} because we already have {} cached replicas and we only need {}", new Object[]{Long.valueOf(cachedBlock2.getBlockId()), next2.getDatanodeUuid(), Integer.valueOf(size), Integer.valueOf(s)});
                }
            }
            if (size < s) {
                Iterator<DatanodeDescriptor> it5 = datanodes3.iterator();
                while (it5.hasNext()) {
                    DatanodeDescriptor next3 = it5.next();
                    next3.getPendingUncached().remove(cachedBlock2);
                    it5.remove();
                    LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} because we only have {} cached replicas and we need {}", new Object[]{Long.valueOf(cachedBlock2.getBlockId()), next3.getDatanodeUuid(), Integer.valueOf(size), Integer.valueOf(s)});
                }
            }
            int size2 = size - (datanodes3.size() + s);
            if (size2 > 0) {
                addNewPendingUncached(size2, cachedBlock2, datanodes2, datanodes3);
            } else {
                int size3 = s - (size + datanodes.size());
                if (size3 > 0) {
                    addNewPendingCached(size3, cachedBlock2, datanodes2, datanodes);
                }
            }
            if (s == 0 && datanodes3.isEmpty() && datanodes.isEmpty()) {
                LOG.trace("Block {}: removing from cachedBlocks, since neededCached == 0, and pendingUncached and pendingCached are empty.", Long.valueOf(cachedBlock2.getBlockId()));
                it2.remove();
            }
        }
    }

    private void addNewPendingUncached(int i, CachedBlock cachedBlock, List<DatanodeDescriptor> list, List<DatanodeDescriptor> list2) {
        LinkedList linkedList = new LinkedList();
        for (DatanodeDescriptor datanodeDescriptor : list) {
            if (!list2.contains(datanodeDescriptor)) {
                linkedList.add(datanodeDescriptor);
            }
        }
        while (i > 0) {
            if (linkedList.isEmpty()) {
                LOG.warn("Logic error: we're trying to uncache more replicas than actually exist for " + cachedBlock);
                return;
            }
            DatanodeDescriptor datanodeDescriptor2 = (DatanodeDescriptor) linkedList.remove(random.nextInt(linkedList.size()));
            list2.add(datanodeDescriptor2);
            boolean add = datanodeDescriptor2.getPendingUncached().add(cachedBlock);
            if (!$assertionsDisabled && !add) {
                throw new AssertionError();
            }
            i--;
        }
    }

    private void addNewPendingCached(int i, CachedBlock cachedBlock, List<DatanodeDescriptor> list, List<DatanodeDescriptor> list2) {
        BlockInfoContiguous storedBlock = this.blockManager.getStoredBlock(new Block(cachedBlock.getBlockId()));
        if (storedBlock == null) {
            LOG.debug("Block {}: can't add new cached replicas, because there is no record of this block on the NameNode.", Long.valueOf(cachedBlock.getBlockId()));
            return;
        }
        if (!storedBlock.isComplete()) {
            LOG.debug("Block {}: can't cache this block, because it is not yet complete.", Long.valueOf(cachedBlock.getBlockId()));
            return;
        }
        LinkedList linkedList = new LinkedList();
        int capacity = storedBlock.getCapacity();
        Collection<DatanodeDescriptor> corruptReplicas = this.blockManager.getCorruptReplicas(storedBlock);
        int i2 = 0;
        for (int i3 = 0; i3 < capacity; i3++) {
            DatanodeDescriptor datanode = storedBlock.getDatanode(i3);
            if (datanode != null && !datanode.isDecommissioned() && !datanode.isDecommissionInProgress() && ((corruptReplicas == null || !corruptReplicas.contains(datanode)) && !list2.contains(datanode) && !list.contains(datanode))) {
                long j = 0;
                Iterator it = datanode.getPendingCached().iterator();
                while (it.hasNext()) {
                    BlockInfoContiguous storedBlock2 = this.blockManager.getStoredBlock(new Block(((CachedBlock) it.next()).getBlockId()));
                    if (storedBlock2 != null) {
                        j -= storedBlock2.getNumBytes();
                    }
                }
                Iterator it2 = datanode.getPendingUncached().iterator();
                while (it2.hasNext()) {
                    BlockInfoContiguous storedBlock3 = this.blockManager.getStoredBlock(new Block(((CachedBlock) it2.next()).getBlockId()));
                    if (storedBlock3 != null) {
                        j += storedBlock3.getNumBytes();
                    }
                }
                long cacheRemaining = j + datanode.getCacheRemaining();
                if (cacheRemaining < storedBlock.getNumBytes()) {
                    LOG.trace("Block {}: DataNode {} is not a valid possibility because the block has size {}, but the DataNode only has {} bytes of cache remaining ({} pending bytes, {} already cached.)", new Object[]{Long.valueOf(storedBlock.getBlockId()), datanode.getDatanodeUuid(), Long.valueOf(storedBlock.getNumBytes()), Long.valueOf(cacheRemaining), Long.valueOf(j), Long.valueOf(datanode.getCacheRemaining())});
                    i2++;
                } else {
                    linkedList.add(datanode);
                }
            }
        }
        List<DatanodeDescriptor> chooseDatanodesForCaching = chooseDatanodesForCaching(linkedList, i, this.blockManager.getDatanodeManager().getStaleInterval());
        for (DatanodeDescriptor datanodeDescriptor : chooseDatanodesForCaching) {
            LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}", Long.valueOf(storedBlock.getBlockId()), datanodeDescriptor.getDatanodeUuid());
            list2.add(datanodeDescriptor);
            boolean add = datanodeDescriptor.getPendingCached().add(cachedBlock);
            if (!$assertionsDisabled && !add) {
                throw new AssertionError();
            }
        }
        if (i > chooseDatanodesForCaching.size()) {
            LOG.debug("Block {}: we only have {} of {} cached replicas. {} DataNodes have insufficient cache capacity.", new Object[]{Long.valueOf(storedBlock.getBlockId()), Integer.valueOf((cachedBlock.getReplication() - i) + chooseDatanodesForCaching.size()), Short.valueOf(cachedBlock.getReplication()), Integer.valueOf(i2)});
        }
    }

    private static List<DatanodeDescriptor> chooseDatanodesForCaching(List<DatanodeDescriptor> list, int i, long j) {
        List arrayList = new ArrayList(list);
        LinkedList linkedList = new LinkedList();
        List linkedList2 = new LinkedList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            DatanodeDescriptor datanodeDescriptor = (DatanodeDescriptor) it.next();
            if (datanodeDescriptor.isStale(j)) {
                it.remove();
                linkedList2.add(datanodeDescriptor);
            }
        }
        while (linkedList.size() < i) {
            if (arrayList.isEmpty()) {
                if (linkedList2.isEmpty()) {
                    break;
                }
                arrayList = linkedList2;
            }
            DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity = chooseRandomDatanodeByRemainingCapacity(arrayList);
            linkedList.add(chooseRandomDatanodeByRemainingCapacity);
            arrayList.remove(chooseRandomDatanodeByRemainingCapacity);
        }
        return linkedList;
    }

    private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(List<DatanodeDescriptor> list) {
        float f = 0.0f;
        Iterator<DatanodeDescriptor> it = list.iterator();
        while (it.hasNext()) {
            f += it.next().getCacheRemainingPercent();
        }
        TreeMap treeMap = new TreeMap();
        int i = 0;
        for (DatanodeDescriptor datanodeDescriptor : list) {
            i += Math.max(1, (int) ((datanodeDescriptor.getCacheRemainingPercent() / f) * 1000000.0f));
            treeMap.put(Integer.valueOf(i), datanodeDescriptor);
        }
        return (DatanodeDescriptor) treeMap.higherEntry(Integer.valueOf(random.nextInt(i))).getValue();
    }

    static {
        $assertionsDisabled = !CacheReplicationMonitor.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(CacheReplicationMonitor.class);
        random = new Random();
    }
}
