package org.apache.hadoop.hbase.replication.regionserver;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.class */
public class ReplicationSourceLogQueue {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
    private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap();
    private MetricsSource metrics;
    private Configuration conf;
    private int queueSizePerGroup;
    private int logQueueWarnThreshold;
    private ReplicationSource source;

    public ReplicationSourceLogQueue(Configuration configuration, MetricsSource metricsSource, ReplicationSource replicationSource) {
        this.conf = configuration;
        this.metrics = metricsSource;
        this.source = replicationSource;
        this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
        this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
    }

    public boolean enqueueLog(Path path, String str) {
        boolean z = false;
        PriorityBlockingQueue<Path> priorityBlockingQueue = this.queues.get(str);
        if (priorityBlockingQueue == null) {
            priorityBlockingQueue = new PriorityBlockingQueue<>(this.queueSizePerGroup, new AbstractFSWALProvider.WALStartTimeComparator());
            priorityBlockingQueue.put(path);
            this.queues.put(str, priorityBlockingQueue);
        } else {
            z = true;
            priorityBlockingQueue.put(path);
        }
        this.metrics.incrSizeOfLogQueue();
        this.metrics.setOldestWalAge(getOldestWalAge());
        int size = priorityBlockingQueue.size();
        if (size > this.logQueueWarnThreshold) {
            LOG.warn("{} WAL group {} queue size: {} exceeds value of replication.source.log.queue.warn {}", new Object[]{this.source.logPeerId(), str, Integer.valueOf(size), Integer.valueOf(this.logQueueWarnThreshold)});
        }
        return z;
    }

    public int getQueueSize(String str) {
        PriorityBlockingQueue<Path> priorityBlockingQueue = this.queues.get(str);
        if (priorityBlockingQueue == null) {
            return 0;
        }
        return priorityBlockingQueue.size();
    }

    public int getNumQueues() {
        return this.queues.size();
    }

    public Map<String, PriorityBlockingQueue<Path>> getQueues() {
        return this.queues;
    }

    public PriorityBlockingQueue<Path> getQueue(String str) {
        return this.queues.get(str);
    }

    public void remove(String str) {
        PriorityBlockingQueue<Path> queue = getQueue(str);
        if (queue == null || queue.isEmpty()) {
            return;
        }
        queue.remove();
        this.metrics.decrSizeOfLogQueue();
        this.metrics.setOldestWalAge(getOldestWalAge());
    }

    public void clear(String str) {
        PriorityBlockingQueue<Path> queue = getQueue(str);
        while (!queue.isEmpty()) {
            queue.remove();
            this.metrics.decrSizeOfLogQueue();
        }
        this.metrics.setOldestWalAge(getOldestWalAge());
    }

    long getOldestWalAge() {
        long currentTime = EnvironmentEdgeManager.currentTime();
        long oldestWalTimestamp = getOldestWalTimestamp();
        if (oldestWalTimestamp == Long.MAX_VALUE) {
            oldestWalTimestamp = currentTime;
        }
        return currentTime - oldestWalTimestamp;
    }

    private long getOldestWalTimestamp() {
        long j = Long.MAX_VALUE;
        Iterator<Map.Entry<String, PriorityBlockingQueue<Path>>> it = this.queues.entrySet().iterator();
        while (it.hasNext()) {
            Path peek = it.next().getValue().peek();
            if (peek != null) {
                j = Math.min(j, AbstractFSWALProvider.WALStartTimeComparator.getTS(peek));
            }
        }
        return j;
    }

    public MetricsSource getMetrics() {
        return this.metrics;
    }
}
