package org.apache.hudi.common.table.log;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/common/table/log/HoodieLogFileReader.class */
public class HoodieLogFileReader implements HoodieLogFormat.Reader {
    public static final int DEFAULT_BUFFER_SIZE = 16777216;
    private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1048576;
    private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
    private final Configuration hadoopConf;
    private final FSDataInputStream inputStream;
    private final HoodieLogFile logFile;
    private final byte[] magicBuffer;
    private final Schema readerSchema;
    private InternalSchema internalSchema;
    private final String keyField;
    private boolean readBlockLazily;
    private long reverseLogFilePosition;
    private long lastReverseLogFilePosition;
    private boolean reverseReader;
    private boolean enableRecordLookups;
    private boolean closed;
    private transient Thread shutdownThread;

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema, int i, boolean z) throws IOException {
        this(fileSystem, hoodieLogFile, schema, i, z, false);
    }

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema, int i, boolean z, boolean z2) throws IOException {
        this(fileSystem, hoodieLogFile, schema, i, z, z2, false, HoodieRecord.RECORD_KEY_METADATA_FIELD);
    }

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema, int i, boolean z, boolean z2, boolean z3, String str) throws IOException {
        this(fileSystem, hoodieLogFile, schema, i, z, z2, z3, str, InternalSchema.getEmptyInternalSchema());
    }

    public HoodieLogFileReader(FileSystem fileSystem, HoodieLogFile hoodieLogFile, Schema schema, int i, boolean z, boolean z2, boolean z3, String str, InternalSchema internalSchema) throws IOException {
        this.magicBuffer = new byte[6];
        this.internalSchema = InternalSchema.getEmptyInternalSchema();
        this.closed = false;
        this.shutdownThread = null;
        this.hadoopConf = fileSystem.getConf();
        this.logFile = new HoodieLogFile(FSUtils.makeQualified(fileSystem, hoodieLogFile.getPath()), Long.valueOf(hoodieLogFile.getFileSize()));
        this.inputStream = getFSDataInputStream(fileSystem, this.logFile, i);
        this.readerSchema = schema;
        this.readBlockLazily = z;
        this.reverseReader = z2;
        this.enableRecordLookups = z3;
        this.keyField = str;
        this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
        if (this.reverseReader) {
            long fileSize = this.logFile.getFileSize();
            this.lastReverseLogFilePosition = fileSize;
            this.reverseLogFilePosition = fileSize;
        }
        addShutDownHook();
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Reader
    public HoodieLogFile getLogFile() {
        return this.logFile;
    }

    private void addShutDownHook() {
        this.shutdownThread = new Thread(() -> {
            try {
                close();
            } catch (Exception e) {
                LOG.warn("unable to close input stream for log file " + this.logFile, e);
            }
        });
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
    }

    private HoodieLogBlock readBlock() throws IOException {
        try {
            int readLong = (int) this.inputStream.readLong();
            if (isBlockCorrupted(readLong)) {
                return createCorruptBlock();
            }
            HoodieLogFormat.LogFormatVersion readVersion = readVersion();
            HoodieLogBlock.HoodieLogBlockType tryReadBlockType = tryReadBlockType(readVersion);
            Map<HoodieLogBlock.HeaderMetadataType, String> logMetadata = readVersion.hasHeader() ? HoodieLogBlock.getLogMetadata(this.inputStream) : null;
            int readLong2 = readVersion.getVersion() != 0 ? (int) this.inputStream.readLong() : readLong;
            long pos = this.inputStream.getPos();
            Option<byte[]> tryReadContent = HoodieLogBlock.tryReadContent(this.inputStream, Integer.valueOf(readLong2), this.readBlockLazily && readVersion.getVersion() != 0);
            Map<HoodieLogBlock.HeaderMetadataType, String> logMetadata2 = readVersion.hasFooter() ? HoodieLogBlock.getLogMetadata(this.inputStream) : null;
            if (readVersion.hasLogBlockLength()) {
                this.inputStream.readLong();
            }
            HoodieLogBlock.HoodieLogBlockContentLocation hoodieLogBlockContentLocation = new HoodieLogBlock.HoodieLogBlockContentLocation(this.hadoopConf, this.logFile, pos, readLong2, this.inputStream.getPos());
            switch ((HoodieLogBlock.HoodieLogBlockType) Objects.requireNonNull(tryReadBlockType)) {
                case AVRO_DATA_BLOCK:
                    return readVersion.getVersion() == 0 ? HoodieAvroDataBlock.getBlock(tryReadContent.get(), this.readerSchema, this.internalSchema) : new HoodieAvroDataBlock(this.inputStream, tryReadContent, this.readBlockLazily, hoodieLogBlockContentLocation, Option.ofNullable(this.readerSchema), logMetadata, logMetadata2, this.keyField, this.internalSchema);
                case HFILE_DATA_BLOCK:
                    ValidationUtils.checkState(readVersion.getVersion() != 0, String.format("HFile block could not be of version (%d)", 0));
                    return new HoodieHFileDataBlock(this.inputStream, tryReadContent, this.readBlockLazily, hoodieLogBlockContentLocation, Option.ofNullable(this.readerSchema), logMetadata, logMetadata2, this.enableRecordLookups, this.logFile.getPath());
                case PARQUET_DATA_BLOCK:
                    ValidationUtils.checkState(readVersion.getVersion() != 0, String.format("Parquet block could not be of version (%d)", 0));
                    return new HoodieParquetDataBlock(this.inputStream, tryReadContent, this.readBlockLazily, hoodieLogBlockContentLocation, Option.ofNullable(this.readerSchema), logMetadata, logMetadata2, this.keyField);
                case DELETE_BLOCK:
                    return new HoodieDeleteBlock(tryReadContent, this.inputStream, this.readBlockLazily, Option.of(hoodieLogBlockContentLocation), logMetadata, logMetadata2);
                case COMMAND_BLOCK:
                    return new HoodieCommandBlock(tryReadContent, this.inputStream, this.readBlockLazily, Option.of(hoodieLogBlockContentLocation), logMetadata, logMetadata2);
                default:
                    throw new HoodieNotSupportedException("Unsupported Block " + tryReadBlockType);
            }
        } catch (EOFException | CorruptedLogFileException e) {
            return createCorruptBlock();
        }
    }

    @Nullable
    private HoodieLogBlock.HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion logFormatVersion) throws IOException {
        if (logFormatVersion.getVersion() == 0) {
            return null;
        }
        int readInt = this.inputStream.readInt();
        ValidationUtils.checkArgument(readInt < HoodieLogBlock.HoodieLogBlockType.values().length, "Invalid block byte type found " + readInt);
        return HoodieLogBlock.HoodieLogBlockType.values()[readInt];
    }

    private HoodieLogBlock createCorruptBlock() throws IOException {
        LOG.info("Log " + this.logFile + " has a corrupted block at " + this.inputStream.getPos());
        long pos = this.inputStream.getPos();
        Pair<Long, Boolean> scanForNextAvailableBlockOffset = scanForNextAvailableBlockOffset();
        long longValue = scanForNextAvailableBlockOffset.getLeft().longValue();
        this.inputStream.seek(pos);
        LOG.info("Next available block in " + this.logFile + " starts at " + longValue);
        int i = (int) (longValue - pos);
        long pos2 = this.inputStream.getPos();
        HashMap hashMap = new HashMap();
        Option<byte[]> empty = Option.empty();
        try {
            empty = HoodieLogBlock.tryReadContent(this.inputStream, Integer.valueOf(i), this.readBlockLazily);
        } catch (EOFException e) {
            hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, String.valueOf(Integer.MIN_VALUE));
        }
        if (scanForNextAvailableBlockOffset.getRight().booleanValue()) {
            hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, String.valueOf(Integer.MIN_VALUE));
        }
        return new HoodieCorruptBlock(empty, this.inputStream, this.readBlockLazily, Option.of(new HoodieLogBlock.HoodieLogBlockContentLocation(this.hadoopConf, this.logFile, pos2, i, longValue)), hashMap, new HashMap());
    }

    private boolean isBlockCorrupted(int i) throws IOException {
        long pos = this.inputStream.getPos();
        try {
            this.inputStream.seek((pos + i) - 8);
            long readLong = this.inputStream.readLong() - this.magicBuffer.length;
            if (i != readLong) {
                LOG.info("Found corrupted block in file " + this.logFile + ". Header block size(" + i + ") did not match the footer block size(" + readLong + VisibilityConstants.CLOSED_PARAN);
                this.inputStream.seek(pos);
                return true;
            }
            try {
                try {
                    readMagic();
                    this.inputStream.seek(pos);
                    return false;
                } catch (CorruptedLogFileException e) {
                    LOG.info("Found corrupted block in file " + this.logFile + ". No magic hash found right after footer block size entry");
                    this.inputStream.seek(pos);
                    return true;
                }
            } catch (Throwable th) {
                this.inputStream.seek(pos);
                throw th;
            }
        } catch (EOFException e2) {
            LOG.info("Found corrupted block in file " + this.logFile + " with block size(" + i + ") running past EOF");
            this.inputStream.seek(pos);
            return true;
        }
    }

    private Pair<Long, Boolean> scanForNextAvailableBlockOffset() throws IOException {
        byte[] bArr = new byte[1048576];
        boolean z = false;
        while (true) {
            long pos = this.inputStream.getPos();
            try {
                Arrays.fill(bArr, (byte) 0);
                this.inputStream.readFully(bArr, 0, bArr.length);
            } catch (EOFException e) {
                z = true;
            }
            long indexOf = Bytes.indexOf(bArr, HoodieLogFormat.MAGIC);
            if (indexOf >= 0) {
                LOG.info(String.format("found next available block on pos: %s", Long.valueOf(pos + indexOf)));
                return Pair.of(Long.valueOf(pos + indexOf), false);
            }
            if (z) {
                long pos2 = this.inputStream.getPos();
                LOG.info(String.format("Attempted to get next available block, but trigger EOF, now current inputStream pos: %s", Long.valueOf(pos2)));
                return Pair.of(Long.valueOf(pos2), true);
            }
            this.inputStream.seek((pos + bArr.length) - HoodieLogFormat.MAGIC.length);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.inputStream.close();
        if (null != this.shutdownThread) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
        this.closed = true;
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        try {
            return readMagic();
        } catch (IOException e) {
            throw new HoodieIOException("IOException when reading logfile " + this.logFile, e);
        }
    }

    private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
        return new HoodieLogFormatVersion(this.inputStream.readInt());
    }

    private boolean readMagic() throws IOException {
        try {
            boolean hasNextMagic = hasNextMagic();
            if (hasNextMagic) {
                return hasNextMagic;
            }
            throw new CorruptedLogFileException(this.logFile + " could not be read. Did not find the magic bytes at the start of the block");
        } catch (EOFException e) {
            return false;
        }
    }

    private boolean hasNextMagic() throws IOException {
        this.inputStream.readFully(this.magicBuffer, 0, 6);
        return Arrays.equals(this.magicBuffer, HoodieLogFormat.MAGIC);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public HoodieLogBlock next() {
        try {
            return readBlock();
        } catch (IOException e) {
            throw new HoodieIOException("IOException when reading logblock from log file " + this.logFile, e);
        }
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Reader
    public boolean hasPrev() {
        try {
            if (!this.reverseReader) {
                throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
            }
            this.reverseLogFilePosition = this.lastReverseLogFilePosition;
            this.reverseLogFilePosition -= 8;
            this.lastReverseLogFilePosition = this.reverseLogFilePosition;
            this.inputStream.seek(this.reverseLogFilePosition);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override // org.apache.hudi.common.table.log.HoodieLogFormat.Reader
    public HoodieLogBlock prev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        long readLong = this.inputStream.readLong();
        long pos = this.inputStream.getPos();
        try {
            this.inputStream.seek(this.reverseLogFilePosition - readLong);
            hasNext();
            this.reverseLogFilePosition -= readLong;
            this.lastReverseLogFilePosition = this.reverseLogFilePosition;
            return next();
        } catch (Exception e) {
            this.inputStream.seek(pos);
            throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, fallback to forward reading of logfile");
        }
    }

    public long moveToPrev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        this.inputStream.seek(this.lastReverseLogFilePosition);
        long readLong = this.inputStream.readLong();
        this.inputStream.seek(this.reverseLogFilePosition - readLong);
        this.reverseLogFilePosition -= readLong;
        this.lastReverseLogFilePosition = this.reverseLogFilePosition;
        return this.reverseLogFilePosition;
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
    }

    private static FSDataInputStream getFSDataInputStream(FileSystem fileSystem, HoodieLogFile hoodieLogFile, int i) throws IOException {
        FSDataInputStream open = fileSystem.open(hoodieLogFile.getPath(), i);
        return FSUtils.isGCSFileSystem(fileSystem) ? new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(open, hoodieLogFile, i), true) : open.getWrappedStream() instanceof FSInputStream ? new TimedFSDataInputStream(hoodieLogFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(open.getWrappedStream(), i))) : open;
    }

    private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fSDataInputStream, HoodieLogFile hoodieLogFile, int i) {
        if (fSDataInputStream.getWrappedStream() instanceof FSInputStream) {
            return new TimedFSDataInputStream(hoodieLogFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(fSDataInputStream.getWrappedStream(), i)));
        }
        if (!(fSDataInputStream.getWrappedStream() instanceof FSDataInputStream) || !(fSDataInputStream.getWrappedStream().getWrappedStream() instanceof FSInputStream)) {
            return fSDataInputStream;
        }
        return new TimedFSDataInputStream(hoodieLogFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(fSDataInputStream.getWrappedStream().getWrappedStream(), i)));
    }
}
