package org.apache.hadoop.fs.obs.input;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.obs.services.ObsClient;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.obs.OBSConstants;
import org.apache.hadoop.fs.obs.OBSFileSystem;
import org.apache.hadoop.fs.obs.input.ReadAheadBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/obs/input/OBSExtendInputStream.class */
public class OBSExtendInputStream extends FSInputStream implements CanSetReadahead, ByteBufferReadable {
    public static final Logger LOG = LoggerFactory.getLogger(OBSExtendInputStream.class);
    private OBSFileSystem fs;
    private final ObsClient client;
    private FileSystem.Statistics statistics;
    private final String bucketName;
    private final String key;
    private long contentLength;
    private boolean closed;
    private int maxReadAhead;
    private long readaheadSize;
    private long pos;
    private long nextPos;
    private long lastBufferStart;
    private byte[] buffer;
    private long bufferRemaining;
    private ExecutorService readAheadExecutorService;
    private Queue<ReadAheadBuffer> readAheadBufferQueue = new ArrayDeque();

    public OBSExtendInputStream(OBSFileSystem oBSFileSystem, Configuration configuration, ExecutorService executorService, String str, String str2, Long l, FileSystem.Statistics statistics) {
        LOG.info("use OBSExtendInputStream");
        this.fs = oBSFileSystem;
        this.client = this.fs.getObsClient();
        this.statistics = statistics;
        this.bucketName = str;
        this.key = str2;
        this.contentLength = l.longValue();
        this.readaheadSize = configuration.getLong(OBSConstants.READAHEAD_RANGE, OBSConstants.DEFAULT_READAHEAD_RANGE);
        this.maxReadAhead = configuration.getInt(OBSConstants.READAHEAD_MAX_NUM, 4);
        this.readAheadExecutorService = MoreExecutors.listeningDecorator(executorService);
        this.nextPos = 0L;
        this.lastBufferStart = -1L;
        this.pos = 0L;
        this.bufferRemaining = 0L;
        this.closed = false;
    }

    private void validateAndResetReopen(long j) throws EOFException {
        if (j < 0) {
            throw new EOFException("Cannot seek at negative position:" + j);
        }
        if (j > this.contentLength) {
            throw new EOFException("Cannot seek after EOF, contentLength:" + this.contentLength + " position:" + j);
        }
        if (this.buffer != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Aborting old stream to open at pos " + j);
            }
            this.buffer = null;
        }
    }

    private boolean isRandom(long j) {
        boolean z = true;
        if (j == this.nextPos) {
            z = false;
        } else {
            while (this.readAheadBufferQueue.size() != 0 && this.readAheadBufferQueue.element().getByteStart() != j) {
                this.readAheadBufferQueue.poll();
            }
        }
        return z;
    }

    private void getFromBuffer() throws IOException {
        ReadAheadBuffer poll = this.readAheadBufferQueue.poll();
        poll.lock();
        try {
            poll.await(ReadAheadBuffer.STATUS.INIT);
            if (poll.getStatus() == ReadAheadBuffer.STATUS.ERROR) {
                this.buffer = null;
            } else {
                this.buffer = poll.getBuffer();
            }
        } catch (InterruptedException e) {
            LOG.warn("interrupted when wait a read buffer");
        } finally {
            poll.unlock();
        }
        if (this.buffer == null) {
            throw new IOException("Null IO stream");
        }
    }

    private synchronized void reopen(long j) throws IOException {
        validateAndResetReopen(j);
        long j2 = j + this.readaheadSize > this.contentLength ? this.contentLength - j : this.readaheadSize;
        boolean isRandom = isRandom(j);
        this.nextPos = j + j2;
        int size = this.readAheadBufferQueue.size();
        if (size == 0) {
            this.lastBufferStart = j - j2;
        } else {
            this.lastBufferStart = ((ReadAheadBuffer[]) this.readAheadBufferQueue.toArray(new ReadAheadBuffer[size]))[size - 1].getByteStart();
        }
        int i = this.maxReadAhead - size;
        for (int i2 = 0; i2 < i && i2 < (size + 1) * 2 && this.lastBufferStart + (j2 * (i2 + 1)) < this.contentLength; i2++) {
            long j3 = this.lastBufferStart + (j2 * (i2 + 1));
            long j4 = (j3 + j2) - 1;
            if (j4 >= this.contentLength) {
                j4 = this.contentLength - 1;
            }
            ReadAheadBuffer readAheadBuffer = new ReadAheadBuffer(j3, j4);
            if (readAheadBuffer.getBuffer().length == 0) {
                readAheadBuffer.setStatus(ReadAheadBuffer.STATUS.SUCCESS);
            } else {
                this.readAheadExecutorService.execute(new ReadAheadTask(this.bucketName, this.key, this.client, readAheadBuffer));
            }
            this.readAheadBufferQueue.add(readAheadBuffer);
            if (isRandom) {
                break;
            }
        }
        getFromBuffer();
        this.pos = j;
        this.bufferRemaining = j2;
    }

    public synchronized int read() throws IOException {
        checkNotClosed();
        if (this.bufferRemaining <= 0 && this.pos < this.contentLength) {
            reopen(this.pos);
        }
        int i = -1;
        if (this.bufferRemaining != 0) {
            i = this.buffer[this.buffer.length - ((int) this.bufferRemaining)] & 255;
        }
        if (i >= 0) {
            this.pos++;
            this.bufferRemaining--;
        }
        incrementBytesRead(i);
        return i;
    }

    private void checkNotClosed() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
    }

    private void validateReadArgs(byte[] bArr, int i, int i2) {
        if (bArr == null) {
            throw new NullPointerException();
        }
        if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
            throw new IndexOutOfBoundsException();
        }
    }

    private void incrementBytesRead(long j) {
        if (this.statistics == null || j <= 0) {
            return;
        }
        this.statistics.incrementBytesRead(j);
    }

    public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
        checkNotClosed();
        validateReadArgs(bArr, i, i2);
        if (i2 == 0) {
            return 0;
        }
        int i3 = 0;
        while (this.pos < this.contentLength && i3 < i2) {
            if (this.bufferRemaining == 0) {
                reopen(this.pos);
            }
            int i4 = 0;
            for (int length = this.buffer.length - ((int) this.bufferRemaining); length < this.buffer.length; length++) {
                bArr[i + i3] = this.buffer[length];
                i4++;
                i3++;
                if (i + i3 >= i2) {
                    break;
                }
            }
            if (i4 > 0) {
                this.pos += i4;
                this.bufferRemaining -= i4;
            } else if (this.bufferRemaining != 0) {
                throw new IOException("Failed to read from stream. Remaining:" + this.bufferRemaining);
            }
        }
        incrementBytesRead(i3);
        if (i3 != 0 || i2 <= 0) {
            return i3;
        }
        return -1;
    }

    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.buffer = null;
    }

    public synchronized int available() throws IOException {
        checkNotClosed();
        long j = this.contentLength - this.pos;
        if (j > 2147483647L) {
            return Integer.MAX_VALUE;
        }
        return (int) j;
    }

    public synchronized void seek(long j) throws IOException {
        checkNotClosed();
        if (j < 0) {
            throw new EOFException("Cannot seek to a negative offset " + j);
        }
        if (this.contentLength > 0 && this.pos != j) {
            if (j <= this.pos || j >= this.pos + this.bufferRemaining) {
                this.pos = j;
                this.bufferRemaining = 0L;
            } else {
                long j2 = j - this.pos;
                this.pos = j;
                this.bufferRemaining -= j2;
            }
        }
    }

    public synchronized long getPos() throws IOException {
        checkNotClosed();
        return this.pos;
    }

    public boolean seekToNewSource(long j) throws IOException {
        checkNotClosed();
        return false;
    }

    public int read(ByteBuffer byteBuffer) throws IOException {
        int remaining = byteBuffer.remaining();
        if (remaining == 0) {
            return 0;
        }
        byte[] bArr = new byte[remaining];
        int read = read(bArr, 0, remaining);
        if (read != -1) {
            byteBuffer.put(bArr, 0, read);
        }
        return read;
    }

    public synchronized void setReadahead(Long l) throws IOException {
        checkNotClosed();
        if (l == null) {
            this.readaheadSize = OBSConstants.DEFAULT_READAHEAD_RANGE;
        } else {
            Preconditions.checkArgument(l.longValue() >= 0, "Negative readahead value");
            this.readaheadSize = l.longValue();
        }
    }
}
