package org.apache.hudi.org.apache.hadoop.hbase.replication.regionserver;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.org.apache.hadoop.hbase.Cell;
import org.apache.hudi.org.apache.hadoop.hbase.CellUtil;
import org.apache.hudi.org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hudi.org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.org.apache.hadoop.hbase.util.Threads;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WAL;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.rocksdb.SstFileManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.class */
public class ReplicationSourceWALReader extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
    private final ReplicationSourceLogQueue logQueue;
    private final FileSystem fs;
    private final Configuration conf;
    private final WALEntryFilter filter;
    private final ReplicationSource source;

    @InterfaceAudience.Private
    final BlockingQueue<WALEntryBatch> entryBatchQueue;
    private final long replicationBatchSizeCapacity;
    private final int replicationBatchCountCapacity;
    private long currentPosition;
    private final long sleepForRetries;
    private final int maxRetriesMultiplier;
    private final boolean eofAutoRecovery;
    private boolean isReaderRunning = true;
    private AtomicLong totalBufferUsed;
    private long totalBufferQuota;
    private final String walGroupId;

    public ReplicationSourceWALReader(FileSystem fileSystem, Configuration configuration, ReplicationSourceLogQueue replicationSourceLogQueue, long j, WALEntryFilter wALEntryFilter, ReplicationSource replicationSource, String str) {
        this.logQueue = replicationSourceLogQueue;
        this.currentPosition = j;
        this.fs = fileSystem;
        this.conf = configuration;
        this.filter = wALEntryFilter;
        this.source = replicationSource;
        this.replicationBatchSizeCapacity = this.conf.getLong("replication.source.size.capacity", SstFileManager.BYTES_MAX_DELETE_CHUNK_DEFAULT);
        this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
        int i = configuration.getInt("replication.source.nb.batches", 1);
        this.totalBufferUsed = replicationSource.getSourceManager().getTotalBufferUsed();
        this.totalBufferQuota = replicationSource.getSourceManager().getTotalBufferLimit();
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.eofAutoRecovery = configuration.getBoolean("replication.source.eof.autorecovery", false);
        this.entryBatchQueue = new LinkedBlockingQueue(i);
        this.walGroupId = str;
        LOG.info("peerClusterZnode=" + replicationSource.getQueueId() + ", ReplicationSourceWALReaderThread : " + replicationSource.getPeerId() + " inited, replicationBatchSizeCapacity=" + this.replicationBatchSizeCapacity + ", replicationBatchCountCapacity=" + this.replicationBatchCountCapacity + ", replicationBatchQueueCapacity=" + i);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int i = 1;
        while (isReaderRunning()) {
            try {
                WALEntryStream wALEntryStream = new WALEntryStream(this.logQueue, this.conf, this.currentPosition, this.source.getWALFileLengthProvider(), this.source.getServerWALsBelongTo(), this.source.getSourceMetrics(), this.walGroupId);
                Throwable th = null;
                while (isReaderRunning()) {
                    try {
                        try {
                            if (!this.source.isPeerEnabled()) {
                                Threads.sleep(this.sleepForRetries);
                            } else if (checkQuota()) {
                                WALEntryBatch readWALEntries = readWALEntries(wALEntryStream);
                                this.currentPosition = wALEntryStream.getPosition();
                                if (readWALEntries != null) {
                                    LOG.trace("Read {} WAL entries eligible for replication", Integer.valueOf(readWALEntries.getNbEntries()));
                                    this.entryBatchQueue.put(readWALEntries);
                                    i = 1;
                                } else {
                                    handleEmptyWALEntryBatch(wALEntryStream.getCurrentPath());
                                    wALEntryStream.reset();
                                }
                            }
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                            break;
                        }
                    } catch (Throwable th3) {
                        if (wALEntryStream != null) {
                            if (th != null) {
                                try {
                                    wALEntryStream.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                wALEntryStream.close();
                            }
                        }
                        throw th3;
                        break;
                    }
                }
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
            } catch (IOException | WALEntryFilterRetryableException e) {
                if (handleEofException(e)) {
                    i = 1;
                } else {
                    LOG.warn("Failed to read stream of replication entries or replication filter is recovering", e);
                    if (i < this.maxRetriesMultiplier) {
                        i++;
                    }
                    Threads.sleep(this.sleepForRetries * i);
                }
            } catch (InterruptedException e2) {
                LOG.trace("Interrupted while sleeping between WAL reads");
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean addEntryToBatch(WALEntryBatch wALEntryBatch, WAL.Entry entry) {
        WALEdit edit = entry.getEdit();
        if (edit == null || edit.isEmpty()) {
            return false;
        }
        long entrySizeIncludeBulkLoad = getEntrySizeIncludeBulkLoad(entry);
        long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
        wALEntryBatch.addEntry(entry, entrySizeIncludeBulkLoad);
        updateBatchStats(wALEntryBatch, entry, entrySizeIncludeBulkLoad);
        return acquireBufferQuota(entrySizeExcludeBulkLoad) || wALEntryBatch.getHeapSize() >= this.replicationBatchSizeCapacity || wALEntryBatch.getNbEntries() >= this.replicationBatchCountCapacity;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static final boolean switched(WALEntryStream wALEntryStream, Path path) {
        Path currentPath = wALEntryStream.getCurrentPath();
        return currentPath == null || !path.getName().equals(currentPath.getName());
    }

    protected WALEntryBatch readWALEntries(WALEntryStream wALEntryStream) throws IOException, InterruptedException {
        Path currentPath = wALEntryStream.getCurrentPath();
        if (!wALEntryStream.hasNext()) {
            if (currentPath == null || !switched(wALEntryStream, currentPath)) {
                return null;
            }
            return WALEntryBatch.endOfFile(currentPath);
        }
        if (currentPath == null) {
            currentPath = wALEntryStream.getCurrentPath();
        } else if (switched(wALEntryStream, currentPath)) {
            return WALEntryBatch.endOfFile(currentPath);
        }
        WALEntryBatch createBatch = createBatch(wALEntryStream);
        while (true) {
            WAL.Entry next = wALEntryStream.next();
            createBatch.setLastWalPosition(wALEntryStream.getPosition());
            WAL.Entry filterEntry = filterEntry(next);
            if (filterEntry != null && addEntryToBatch(createBatch, filterEntry)) {
                break;
            }
            boolean hasNext = wALEntryStream.hasNext();
            if (switched(wALEntryStream, currentPath)) {
                createBatch.setEndOfFile(true);
                break;
            }
            if (!hasNext) {
                break;
            }
        }
        return createBatch;
    }

    private void handleEmptyWALEntryBatch(Path path) throws InterruptedException {
        LOG.trace("Didn't read any new entries from WAL");
        if (!this.source.isRecovered()) {
            Thread.sleep(this.sleepForRetries);
        } else {
            setReaderRunning(false);
            this.entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
        }
    }

    private boolean handleEofException(Exception exc) {
        PriorityBlockingQueue<Path> queue = this.logQueue.getQueue(this.walGroupId);
        if (!(exc instanceof EOFException) && !(exc.getCause() instanceof EOFException)) {
            return false;
        }
        if ((!this.source.isRecovered() && queue.size() <= 1) || !this.eofAutoRecovery) {
            return false;
        }
        try {
            if (this.fs.getFileStatus(queue.peek()).getLen() != 0) {
                return false;
            }
            LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
            this.logQueue.remove(this.walGroupId);
            this.currentPosition = 0L;
            return true;
        } catch (IOException e) {
            LOG.warn("Couldn't get file length information about log " + queue.peek(), e);
            return false;
        }
    }

    public Path getCurrentPath() {
        WALEntryBatch peek = this.entryBatchQueue.peek();
        return peek != null ? peek.getLastWalPath() : this.logQueue.getQueue(this.walGroupId).peek();
    }

    private boolean checkQuota() {
        if (this.totalBufferUsed.get() <= this.totalBufferQuota) {
            return true;
        }
        LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", new Object[]{this.source.getPeerId(), Long.valueOf(this.totalBufferUsed.get()), Long.valueOf(this.totalBufferQuota)});
        Threads.sleep(this.sleepForRetries);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final WALEntryBatch createBatch(WALEntryStream wALEntryStream) {
        return new WALEntryBatch(this.replicationBatchCountCapacity, wALEntryStream.getCurrentPath());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final WAL.Entry filterEntry(WAL.Entry entry) {
        WAL.Entry filter = this.filter.filter(entry);
        if (entry != null && filter == null) {
            this.source.getSourceMetrics().incrLogEditsFiltered();
        }
        return filter;
    }

    public WALEntryBatch take() throws InterruptedException {
        return this.entryBatchQueue.take();
    }

    public WALEntryBatch poll(long j) throws InterruptedException {
        return this.entryBatchQueue.poll(j, TimeUnit.MILLISECONDS);
    }

    private long getEntrySizeIncludeBulkLoad(WAL.Entry entry) {
        return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(entry.getEdit());
    }

    public static long getEntrySizeExcludeBulkLoad(WAL.Entry entry) {
        return entry.getEdit().heapSize() + entry.getKey().estimatedSerializedSizeOf();
    }

    private void updateBatchStats(WALEntryBatch wALEntryBatch, WAL.Entry entry, long j) {
        WALEdit edit = entry.getEdit();
        wALEntryBatch.incrementHeapSize(j);
        Pair<Integer, Integer> countDistinctRowKeysAndHFiles = countDistinctRowKeysAndHFiles(edit);
        wALEntryBatch.incrementNbRowKeys(countDistinctRowKeysAndHFiles.getFirst().intValue());
        wALEntryBatch.incrementNbHFiles(countDistinctRowKeysAndHFiles.getSecond().intValue());
    }

    private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit wALEdit) {
        ArrayList<Cell> cells = wALEdit.getCells();
        int i = 1;
        int i2 = 0;
        Cell cell = cells.get(0);
        int size = wALEdit.size();
        for (int i3 = 0; i3 < size; i3++) {
            if (CellUtil.matchingQualifier(cells.get(i3), WALEdit.BULK_LOAD)) {
                try {
                    List storesList = WALEdit.getBulkLoadDescriptor(cells.get(i3)).getStoresList();
                    int size2 = storesList.size();
                    for (int i4 = 0; i4 < size2; i4++) {
                        i2 += ((WALProtos.StoreDescriptor) storesList.get(i4)).getStoreFileList().size();
                    }
                } catch (IOException e) {
                    LOG.error("Failed to deserialize bulk load entry from wal edit. Then its hfiles count will not be added into metric.", e);
                }
            }
            if (!CellUtil.matchingRows(cells.get(i3), cell)) {
                i++;
            }
            cell = cells.get(i3);
        }
        return new Pair<>(Integer.valueOf(i), Integer.valueOf(i2));
    }

    private int sizeOfStoreFilesIncludeBulkLoad(WALEdit wALEdit) {
        ArrayList<Cell> cells = wALEdit.getCells();
        int i = 0;
        int size = wALEdit.size();
        for (int i2 = 0; i2 < size; i2++) {
            if (CellUtil.matchingQualifier(cells.get(i2), WALEdit.BULK_LOAD)) {
                try {
                    List storesList = WALEdit.getBulkLoadDescriptor(cells.get(i2)).getStoresList();
                    int size2 = storesList.size();
                    for (int i3 = 0; i3 < size2; i3++) {
                        i = (int) (i + ((WALProtos.StoreDescriptor) storesList.get(i3)).getStoreFileSizeBytes());
                    }
                } catch (IOException e) {
                    LOG.error("Failed to deserialize bulk load entry from wal edit. Size of HFiles part of cell will not be considered in replication request size calculation.", e);
                }
            }
        }
        return i;
    }

    private boolean acquireBufferQuota(long j) {
        long addAndGet = this.totalBufferUsed.addAndGet(j);
        this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(addAndGet);
        return addAndGet >= this.totalBufferQuota;
    }

    public boolean isReaderRunning() {
        return this.isReaderRunning && !isInterrupted();
    }

    public void setReaderRunning(boolean z) {
        this.isReaderRunning = z;
    }
}
