package org.apache.hadoop.fs.swift.snative;

import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.swift.exceptions.SwiftConnectionClosedException;
import org.apache.hadoop.fs.swift.exceptions.SwiftException;
import org.apache.hadoop.fs.swift.http.HttpBodyContent;
import org.apache.hadoop.fs.swift.http.HttpInputStreamWithRelease;
import org.apache.hadoop.fs.swift.util.SwiftUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/swift/snative/SwiftNativeInputStream.class */
class SwiftNativeInputStream extends FSInputStream {
    private static final Logger LOG = LoggerFactory.getLogger(SwiftNativeInputStream.class);
    private final long bufferSize;
    private final SwiftNativeFileSystemStore nativeStore;
    private final FileSystem.Statistics statistics;
    private HttpInputStreamWithRelease httpStream;
    private final Path path;
    private long pos = 0;
    private long contentLength = -1;
    private String reasonClosed = "unopened";
    private long rangeOffset = 0;

    public SwiftNativeInputStream(SwiftNativeFileSystemStore swiftNativeFileSystemStore, FileSystem.Statistics statistics, Path path, long j) throws IOException {
        this.nativeStore = swiftNativeFileSystemStore;
        this.statistics = statistics;
        this.path = path;
        if (j <= 0) {
            throw new IllegalArgumentException("Invalid buffer size");
        }
        this.bufferSize = j;
        this.httpStream = swiftNativeFileSystemStore.getObject(path).getInputStream();
    }

    private synchronized void incPos(int i) {
        this.pos += i;
        this.rangeOffset += i;
        SwiftUtils.trace(LOG, "Inc: pos=%d bufferOffset=%d", Long.valueOf(this.pos), Long.valueOf(this.rangeOffset));
    }

    private synchronized void updateStartOfBufferPosition(long j, long j2) {
        this.pos = j;
        this.rangeOffset = 0L;
        this.contentLength = j2;
        SwiftUtils.trace(LOG, "Move: pos=%d; bufferOffset=%d; contentLength=%d", Long.valueOf(this.pos), Long.valueOf(this.rangeOffset), Long.valueOf(j2));
    }

    public synchronized int read() throws IOException {
        verifyOpen();
        int i = -1;
        try {
            i = this.httpStream.read();
        } catch (IOException e) {
            LOG.debug("IOException while reading " + this.path + ": " + e + ", attempting to reopen.", e);
            if (reopenBuffer()) {
                i = this.httpStream.read();
            }
        }
        if (i != -1) {
            incPos(1);
        }
        if (this.statistics != null && i != -1) {
            this.statistics.incrementBytesRead(1L);
        }
        return i;
    }

    public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
        SwiftUtils.debug(LOG, "read(buffer, %d, %d)", Integer.valueOf(i), Integer.valueOf(i2));
        SwiftUtils.validateReadArgs(bArr, i, i2);
        if (i2 == 0) {
            return 0;
        }
        int i3 = -1;
        try {
            verifyOpen();
            i3 = this.httpStream.read(bArr, i, i2);
        } catch (IOException e) {
            LOG.info("Received IOException while reading '" + this.path + "', attempting to reopen: " + e);
            LOG.debug("IOE on read()" + e, e);
            if (reopenBuffer()) {
                i3 = this.httpStream.read(bArr, i, i2);
            }
        }
        if (i3 > 0) {
            incPos(i3);
            if (this.statistics != null) {
                this.statistics.incrementBytesRead(i3);
            }
        }
        return i3;
    }

    private boolean reopenBuffer() throws IOException {
        innerClose("reopening buffer to trigger refresh");
        boolean z = false;
        try {
            fillBuffer(this.pos);
            z = true;
        } catch (EOFException e) {
            this.reasonClosed = "End of file";
        }
        return z;
    }

    public synchronized void close() throws IOException {
        innerClose("closed");
    }

    private void innerClose(String str) throws IOException {
        try {
            if (this.httpStream != null) {
                this.reasonClosed = str;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing HTTP input stream : " + str);
                }
                this.httpStream.close();
            }
        } finally {
            this.httpStream = null;
        }
    }

    private void verifyOpen() throws SwiftConnectionClosedException {
        if (this.httpStream == null) {
            throw new SwiftConnectionClosedException(this.reasonClosed);
        }
    }

    public synchronized String toString() {
        return "SwiftNativeInputStream position=" + this.pos + " buffer size = " + this.bufferSize + " " + (this.httpStream != null ? this.httpStream.toString() : " no input stream: " + this.reasonClosed);
    }

    protected void finalize() throws Throwable {
        if (this.httpStream != null) {
            LOG.error("Input stream is leaking handles by not being closed() properly: " + this.httpStream.toString());
        }
    }

    private int chompBytes(long j) throws IOException {
        int i = 0;
        if (this.httpStream != null) {
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 >= j) {
                    break;
                }
                if (this.httpStream.read() < 0) {
                    throw new SwiftException("Received error code while chomping input");
                }
                i++;
                incPos(1);
                j2 = j3 + 1;
            }
        }
        return i;
    }

    public synchronized void seek(long j) throws IOException {
        if (j < 0) {
            throw new EOFException("Cannot seek to a negative offset");
        }
        long j2 = j - this.pos;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Seek to " + j + "; current pos =" + this.pos + "; offset=" + j2);
        }
        if (j2 == 0) {
            LOG.debug("seek is no-op");
            return;
        }
        if (j2 < 0) {
            LOG.debug("seek is backwards");
        } else if (this.rangeOffset + j2 < this.bufferSize) {
            SwiftUtils.debug(LOG, "seek is within current stream; pos= %d ; targetPos=%d; offset= %d ; bufferOffset=%d", Long.valueOf(this.pos), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(this.rangeOffset));
            try {
                LOG.debug("chomping ");
                chompBytes(j2);
            } catch (IOException e) {
                LOG.debug("while chomping ", e);
            }
            if (j - this.pos == 0) {
                LOG.trace("chomping successful");
                return;
            }
            LOG.trace("chomping failed");
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Seek is beyond buffer size of " + this.bufferSize);
        }
        innerClose("seeking to " + j);
        fillBuffer(j);
    }

    private void fillBuffer(long j) throws IOException {
        long j2 = j + this.bufferSize;
        SwiftUtils.debug(LOG, "Fetching %d bytes starting at %d", Long.valueOf(j2), Long.valueOf(j));
        HttpBodyContent object = this.nativeStore.getObject(this.path, j, j2);
        this.httpStream = object.getInputStream();
        updateStartOfBufferPosition(j, object.getContentLength());
    }

    public synchronized long getPos() throws IOException {
        return this.pos;
    }

    public boolean seekToNewSource(long j) throws IOException {
        return false;
    }
}
