package org.apache.hudi.org.apache.hadoop.hbase.replication.regionserver;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.org.apache.commons.lang3.StringUtils;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
import org.apache.hudi.org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hudi.org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WAL;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WALTailingReader;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.class */
public class WALEntryStream implements Closeable {
    private static final Logger LOG;
    private WALTailingReader reader;
    private WALTailingReader.State state;
    private Path currentPath;
    private WAL.Entry currentEntry;
    private long currentPositionOfEntry;
    private long currentPositionOfReader = 0;
    private final ReplicationSourceLogQueue logQueue;
    private final String walGroupId;
    private final FileSystem fs;
    private final Configuration conf;
    private final WALFileLengthProvider walFileLengthProvider;
    private final MetricsSource metrics;
    private boolean eofAutoRecovery;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream$HasNext.class */
    public enum HasNext {
        YES,
        RETRY,
        RETRY_IMMEDIATELY,
        NO
    }

    public WALEntryStream(ReplicationSourceLogQueue replicationSourceLogQueue, FileSystem fileSystem, Configuration configuration, long j, WALFileLengthProvider wALFileLengthProvider, MetricsSource metricsSource, String str) {
        this.currentPositionOfEntry = 0L;
        this.logQueue = replicationSourceLogQueue;
        this.fs = fileSystem;
        this.conf = configuration;
        this.currentPositionOfEntry = j;
        this.walFileLengthProvider = wALFileLengthProvider;
        this.metrics = metricsSource;
        this.walGroupId = str;
        this.eofAutoRecovery = configuration.getBoolean("replication.source.eof.autorecovery", false);
    }

    public HasNext hasNext() {
        return this.currentEntry == null ? tryAdvanceEntry() : HasNext.YES;
    }

    public WAL.Entry peek() {
        return this.currentEntry;
    }

    public WAL.Entry next() {
        if (this.currentEntry == null) {
            throw new IllegalStateException("Call hasNext first");
        }
        WAL.Entry peek = peek();
        this.currentPositionOfEntry = this.currentPositionOfReader;
        this.currentEntry = null;
        this.state = null;
        return peek;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        closeReader();
    }

    public long getPosition() {
        return this.currentPositionOfEntry;
    }

    public Path getCurrentPath() {
        return this.currentPath;
    }

    private String getCurrentPathStat() {
        StringBuilder sb = new StringBuilder();
        if (this.currentPath != null) {
            sb.append("currently replicating from: ").append(this.currentPath).append(" at position: ").append(this.currentPositionOfEntry).append(StringUtils.LF);
        } else {
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    private void setCurrentPath(Path path) {
        this.currentPath = path;
    }

    private void resetReader() throws IOException {
        if (this.currentPositionOfEntry > 0) {
            this.reader.resetTo(this.currentPositionOfEntry, this.state.resetCompression());
        } else {
            this.reader.resetTo(-1L, true);
        }
    }

    @SuppressWarnings(value = {"DCN_NULLPOINTER_EXCEPTION"}, justification = "HDFS-4380")
    private HasNext prepareReader() {
        if (this.reader != null) {
            if (this.state == null || this.state == WALTailingReader.State.NORMAL) {
                return HasNext.YES;
            }
            LOG.debug("Reset reader {} to pos {}, reset compression={}", new Object[]{this.currentPath, Long.valueOf(this.currentPositionOfEntry), Boolean.valueOf(this.state.resetCompression())});
            try {
                resetReader();
                return HasNext.YES;
            } catch (FileNotFoundException e) {
                throw new UncheckedIOException(e);
            } catch (IOException e2) {
                LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", new Object[]{this.currentPath, Long.valueOf(this.currentPositionOfEntry), Boolean.valueOf(this.state.resetCompression()), e2});
                return HasNext.RETRY;
            }
        }
        Path peek = this.logQueue.getQueue(this.walGroupId).peek();
        if (peek == null) {
            LOG.debug("No more WAL files in queue");
            setCurrentPath(null);
            return HasNext.NO;
        }
        setCurrentPath(peek);
        boolean isPresent = this.walFileLengthProvider.getLogFileSizeIfBeingWritten(peek, true).isPresent();
        LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", new Object[]{peek, Long.valueOf(this.currentPositionOfEntry), Boolean.valueOf(isPresent)});
        try {
            this.reader = WALFactory.createTailingReader(this.fs, peek, this.conf, this.currentPositionOfEntry > 0 ? this.currentPositionOfEntry : -1L);
            return HasNext.YES;
        } catch (WALHeaderEOFException e3) {
            if (!this.eofAutoRecovery) {
                return HasNext.RETRY;
            }
            LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", new Object[]{peek, Long.valueOf(this.currentPositionOfEntry), e3});
            if (isPresent) {
                return HasNext.RETRY;
            }
            dequeueCurrentLog();
            return HasNext.RETRY_IMMEDIATELY;
        } catch (LeaseNotRecoveredException e4) {
            LOG.warn("Try to recover the WAL lease " + peek, e4);
            AbstractFSWALProvider.recoverLease(this.conf, peek);
            return HasNext.RETRY;
        } catch (IOException | NullPointerException e5) {
            LOG.warn("Failed to open WAL reader for path: {}", peek, e5);
            return HasNext.RETRY;
        }
    }

    private HasNext lastAttempt() {
        LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", new Object[]{this.currentPath, Long.valueOf(this.currentPositionOfEntry), Boolean.valueOf(this.state.resetCompression())});
        try {
            resetReader();
            Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition = readNextEntryAndRecordReaderPosition();
            this.state = readNextEntryAndRecordReaderPosition.getFirst();
            if (!$assertionsDisabled && readNextEntryAndRecordReaderPosition.getSecond().booleanValue()) {
                throw new AssertionError();
            }
            if (!this.state.eof()) {
                return this.state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
            }
            if (checkAllBytesParsed()) {
                dequeueCurrentLog();
                return HasNext.RETRY_IMMEDIATELY;
            }
            this.currentPositionOfEntry = 0L;
            this.currentPositionOfReader = 0L;
            this.state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
            return HasNext.RETRY;
        } catch (IOException e) {
            LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", new Object[]{this.currentPath, Long.valueOf(this.currentPositionOfEntry), Boolean.valueOf(this.state.resetCompression()), e});
            return HasNext.RETRY;
        }
    }

    private HasNext tryAdvanceEntry() {
        HasNext prepareReader = prepareReader();
        if (prepareReader != HasNext.YES) {
            return prepareReader;
        }
        Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition = readNextEntryAndRecordReaderPosition();
        this.state = readNextEntryAndRecordReaderPosition.getFirst();
        boolean booleanValue = readNextEntryAndRecordReaderPosition.getSecond().booleanValue();
        LOG.trace("Reading WAL {}; result={}, currently open for write={}", new Object[]{this.currentPath, this.state, Boolean.valueOf(booleanValue)});
        switch (this.state) {
            case NORMAL:
                return HasNext.YES;
            case EOF_WITH_TRAILER:
                if (booleanValue && this.logQueue.getQueue(this.walGroupId).size() <= 1) {
                    LOG.warn("We have reached the trailer while reading the file '{}' which is currently beingWritten, but it is the last file in log queue {}. This should not happen typically, try to read again so we will not miss anything", this.currentPath, this.walGroupId);
                    return HasNext.RETRY;
                }
                if (!$assertionsDisabled && booleanValue && this.logQueue.getQueue(this.walGroupId).size() <= 1) {
                    throw new AssertionError();
                }
                dequeueCurrentLog();
                return HasNext.RETRY_IMMEDIATELY;
            case EOF_AND_RESET:
            case EOF_AND_RESET_COMPRESSION:
                return booleanValue ? HasNext.RETRY : lastAttempt();
            case ERROR_AND_RESET:
            case ERROR_AND_RESET_COMPRESSION:
                return HasNext.RETRY;
            default:
                throw new IllegalArgumentException("Unknown read next result: " + this.state);
        }
    }

    private FileStatus getCurrentPathFileStatus() throws IOException {
        try {
            return this.fs.getFileStatus(this.currentPath);
        } catch (FileNotFoundException e) {
            Path findArchivedLog = AbstractFSWALProvider.findArchivedLog(this.currentPath, this.conf);
            if (findArchivedLog != null) {
                return this.fs.getFileStatus(findArchivedLog);
            }
            throw e;
        }
    }

    private boolean checkAllBytesParsed() {
        long currentTrailerSize = currentTrailerSize();
        FileStatus fileStatus = null;
        try {
            fileStatus = getCurrentPathFileStatus();
        } catch (IOException e) {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = this.currentPath;
            objArr[1] = currentTrailerSize < 0 ? "was not" : "was";
            objArr[2] = getCurrentPathStat();
            objArr[3] = e;
            logger.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", objArr);
            this.metrics.incrUnknownFileLengthForClosedWAL();
        }
        if (fileStatus != null) {
            if (currentTrailerSize < 0) {
                if (this.currentPositionOfReader < fileStatus.getLen()) {
                    long len = fileStatus.getLen() - this.currentPositionOfReader;
                    LOG.warn("Reached the end of WAL {}. It was not closed cleanly, so we did not parse {} bytes of data.", this.currentPath, Long.valueOf(len));
                    this.metrics.incrUncleanlyClosedWALs();
                    this.metrics.incrBytesSkippedInUncleanlyClosedWALs(len);
                }
            } else if (this.currentPositionOfReader + currentTrailerSize < fileStatus.getLen()) {
                LOG.warn("Processing end of WAL {} at position {}, which is too far away from reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", new Object[]{this.currentPath, Long.valueOf(this.currentPositionOfReader), Long.valueOf(fileStatus.getLen()), getCurrentPathStat()});
                this.metrics.incrRestartedWALReading();
                this.metrics.incrRepeatedFileBytes(this.currentPositionOfReader);
                return false;
            }
        }
        LOG.debug("Reached the end of {} and length of the file is {}", this.currentPath, fileStatus == null ? "N/A" : Long.valueOf(fileStatus.getLen()));
        this.metrics.incrCompletedWAL();
        return true;
    }

    private void dequeueCurrentLog() {
        LOG.debug("EOF, closing {}", this.currentPath);
        closeReader();
        this.logQueue.remove(this.walGroupId);
        setCurrentPath(null);
        this.currentPositionOfEntry = 0L;
        this.state = null;
    }

    private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() {
        OptionalLong empty = this.logQueue.getQueueSize(this.walGroupId) > 1 ? OptionalLong.empty() : this.walFileLengthProvider.getLogFileSizeIfBeingWritten(this.currentPath, true);
        WALTailingReader.Result next = this.reader.next(empty.orElse(-1L));
        long entryEndPos = next.getEntryEndPos();
        WAL.Entry entry = next.getEntry();
        if (next.getState() == WALTailingReader.State.NORMAL) {
            LOG.trace("reading entry: {} ", entry);
            this.metrics.incrLogEditsRead();
            this.metrics.incrLogReadInBytes(entryEndPos - this.currentPositionOfEntry);
            this.currentEntry = next.getEntry();
            this.currentPositionOfReader = entryEndPos;
        } else {
            LOG.trace("reading entry failed with: {}", next.getState());
            this.currentEntry = null;
            try {
                this.currentPositionOfReader = this.reader.getPosition();
            } catch (IOException e) {
                LOG.warn("failed to get current position of reader", e);
                if (next.getState().resetCompression()) {
                    return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION, Boolean.valueOf(empty.isPresent()));
                }
            }
        }
        return Pair.newPair(next.getState(), Boolean.valueOf(empty.isPresent()));
    }

    private void closeReader() {
        if (this.reader != null) {
            this.reader.close();
            this.reader = null;
        }
    }

    private long currentTrailerSize() {
        long j = -1;
        if (this.reader instanceof AbstractProtobufWALReader) {
            j = ((AbstractProtobufWALReader) this.reader).trailerSize();
        }
        return j;
    }

    static {
        $assertionsDisabled = !WALEntryStream.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(WALEntryStream.class);
    }
}
