package org.apache.hudi.org.apache.hadoop.hbase.io.asyncfs.monitor;

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hudi.org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hudi.org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.class */
public class StreamSlowMonitor implements ConfigurationObserver {
    private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class);
    private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY = "hbase.regionserver.async.wal.min.slow.detect.count";
    private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3;
    private static final String WAL_SLOW_DETECT_DATA_TTL_KEY = "hbase.regionserver.async.wal.slow.detect.data.ttl.ms";
    private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 600000;
    private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY = "hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min";
    private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH = 65536;
    public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY = "hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis";
    private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000;
    private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY = "hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs";
    private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20.0d;
    private final String name;
    private final LoadingCache<DatanodeInfo, Deque<PacketAckData>> datanodeSlowDataQueue;
    private final ExcludeDatanodeManager excludeDatanodeManager;
    private int minSlowDetectCount;
    private long slowDataTtl;
    private long slowPacketAckMs;
    private double minPacketFlushSpeedKBs;
    private long minLengthForSpeedCheck;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor$PacketAckData.class */
    public static class PacketAckData {
        private final long dataLength;
        private final long processTime;
        private final long timestamp = EnvironmentEdgeManager.currentTime();

        public PacketAckData(long j, long j2) {
            this.dataLength = j;
            this.processTime = j2;
        }

        public long getDataLength() {
            return this.dataLength;
        }

        public long getProcessTime() {
            return this.processTime;
        }

        public long getTimestamp() {
            return this.timestamp;
        }
    }

    public StreamSlowMonitor(Configuration configuration, String str, ExcludeDatanodeManager excludeDatanodeManager) {
        setConf(configuration);
        this.name = str;
        this.excludeDatanodeManager = excludeDatanodeManager;
        this.datanodeSlowDataQueue = CacheBuilder.newBuilder().maximumSize(configuration.getInt(ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, 3)).expireAfterWrite(configuration.getLong(ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY, 6L), TimeUnit.HOURS).build(new CacheLoader<DatanodeInfo, Deque<PacketAckData>>() { // from class: org.apache.hudi.org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor.1
            @Override // org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader
            public Deque<PacketAckData> load(DatanodeInfo datanodeInfo) throws Exception {
                return new ConcurrentLinkedDeque();
            }
        });
        LOG.info("New stream slow monitor {}", this.name);
    }

    public static StreamSlowMonitor create(Configuration configuration, String str) {
        return new StreamSlowMonitor(configuration, str, new ExcludeDatanodeManager(configuration));
    }

    public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long j, long j2, long j3, int i) {
        long currentTime = EnvironmentEdgeManager.currentTime();
        if ((j <= this.minLengthForSpeedCheck && j2 > this.slowPacketAckMs) || (j > this.minLengthForSpeedCheck && ((double) j) / ((double) j2) < this.minPacketFlushSpeedKBs)) {
            if ((j3 <= 0 || currentTime - j3 <= this.slowPacketAckMs / 2) && (j3 > 0 || i != 0)) {
                return;
            }
            LOG.info("Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, lastAckTimestamp={}, monitor name: {}", new Object[]{datanodeInfo, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i), Long.valueOf(j3), this.name});
            if (addSlowAckData(datanodeInfo, j, j2)) {
                this.excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack");
            }
        }
    }

    @Override // org.apache.hudi.org.apache.hadoop.hbase.conf.ConfigurationObserver
    public void onConfigurationChange(Configuration configuration) {
        setConf(configuration);
    }

    private boolean addSlowAckData(DatanodeInfo datanodeInfo, long j, long j2) {
        Deque<PacketAckData> unchecked = this.datanodeSlowDataQueue.getUnchecked(datanodeInfo);
        long currentTime = EnvironmentEdgeManager.currentTime();
        while (!unchecked.isEmpty() && (currentTime - unchecked.getFirst().getTimestamp() > this.slowDataTtl || unchecked.size() >= this.minSlowDetectCount)) {
            unchecked.removeFirst();
        }
        unchecked.addLast(new PacketAckData(j, j2));
        return unchecked.size() >= this.minSlowDetectCount;
    }

    private void setConf(Configuration configuration) {
        this.minSlowDetectCount = configuration.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY, 3);
        this.slowDataTtl = configuration.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, 600000L);
        this.slowPacketAckMs = configuration.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY, DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME);
        this.minLengthForSpeedCheck = configuration.getLong(DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY, 65536L);
        this.minPacketFlushSpeedKBs = configuration.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY, DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED);
    }

    public ExcludeDatanodeManager getExcludeDatanodeManager() {
        return this.excludeDatanodeManager;
    }
}
