package org.apache.hadoop.fs.obs.input;

import com.google.common.base.Preconditions;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.obs.OBSCommonUtils;
import org.apache.hadoop.fs.obs.OBSConstants;
import org.apache.hadoop.fs.obs.OBSFileStatus;
import org.apache.hadoop.fs.obs.OBSFileSystem;
import org.apache.hadoop.fs.obs.input.OBSMemArtsPartnerInputStream;
import org.apache.hadoop.fs.obs.memartscc.MemArtsCCClient;
import org.apache.hadoop.fs.obs.memartscc.MemArtsCCInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/obs/input/OBSMemArtsCCInputStream.class */
public class OBSMemArtsCCInputStream extends FSInputStream implements CanSetReadahead, ByteBufferReadable {
    private static final Logger LOG = LoggerFactory.getLogger(OBSMemArtsCCInputStream.class);
    OBSMemArtsPartnerInputStream partnerInputStream;
    State state;
    private final FileSystem.Statistics statistics;
    private final MemArtsCCClient memArtsCCClient;
    private final String bucket;
    private final String key;
    private final long contentLength;
    private final String uri;
    private OBSFileSystem fs;
    private volatile boolean closed = false;
    private long nextReadPos = 0;
    private MemArtsCCInputStream ccStream;
    private int bufSize;
    private byte[] tailBuf;
    private Counter oCounter;
    private Counter mCounter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/fs/obs/input/OBSMemArtsCCInputStream$Counter.class */
    public static class Counter {
        private volatile long num;
        private volatile long totalTime;
        private volatile long totalLen;

        private Counter() {
        }

        public void increase(long j, long j2) {
            this.totalTime += j;
            this.totalLen += j2;
            this.num++;
        }

        public String toString() {
            return String.format("[count=%s,time=%s,size=%s]", Long.valueOf(this.num), Long.valueOf(this.totalTime), Long.valueOf(this.totalLen));
        }
    }

    /* loaded from: input_file:org/apache/hadoop/fs/obs/input/OBSMemArtsCCInputStream$State.class */
    public enum State {
        NEW("NEW"),
        OREAD("OREAD"),
        MREAD("MREAD"),
        CLOSED("CLOSED");

        private String state;

        State(String str) {
            this.state = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.state;
        }
    }

    public OBSMemArtsCCInputStream(String str, String str2, long j, FileSystem.Statistics statistics, long j2, long j3, OBSFileSystem oBSFileSystem, OBSFileStatus oBSFileStatus) {
        Preconditions.checkArgument(OBSCommonUtils.isStringNotEmpty(str), "No Bucket");
        Preconditions.checkArgument(OBSCommonUtils.isStringNotEmpty(str2), "No Key");
        Preconditions.checkArgument(j >= 0, "Negative content length");
        this.bucket = str;
        this.key = str2;
        this.contentLength = j;
        this.memArtsCCClient = oBSFileSystem.getMemArtsCCClient();
        this.statistics = statistics;
        this.uri = "obs://" + this.bucket + "/" + this.key;
        this.fs = oBSFileSystem;
        this.partnerInputStream = new OBSMemArtsPartnerInputStream(str, str2, j, oBSFileSystem.getObsClient(), statistics, j2, oBSFileSystem, this);
        this.state = State.MREAD;
        this.bufSize = oBSFileSystem.getConf().getInt(OBSConstants.MEMARTSCC_BUFFER_SIZE, 8192);
        this.ccStream = new MemArtsCCInputStream(this.memArtsCCClient, str2, oBSFileStatus, j3, this.bufSize, oBSFileSystem.getConf().getInt(OBSConstants.MEMARTSCC_DIRECTBUFFER_SIZE, 1048576));
        this.oCounter = new Counter();
        this.mCounter = new Counter();
        LOG.debug("create OBSMemArtsCCInputStream[{}] for file {}", Integer.valueOf(hashCode()), str2);
    }

    public synchronized int available() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        long j = this.contentLength - this.nextReadPos;
        if (j > 2147483647L) {
            return Integer.MAX_VALUE;
        }
        return (int) j;
    }

    public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
        LOG.debug("read(buf,off,len), offset[{}], len[{}] ", Integer.valueOf(i), Integer.valueOf(i2));
        this.fs.checkOpen();
        checkStreamOpen();
        long currentTimeMillis = System.currentTimeMillis();
        long id = Thread.currentThread().getId();
        validatePositionedReadArgs(this.nextReadPos, bArr, i, i2);
        if (i2 == 0) {
            return 0;
        }
        if (this.contentLength == 0 || this.nextReadPos >= this.contentLength) {
            return -1;
        }
        int posInTailBuf = posInTailBuf(this.nextReadPos);
        if (posInTailBuf == -1 || this.tailBuf == null) {
            int readInState = readInState(bArr, i, i2, false);
            if (readInState > 0) {
                this.nextReadPos += readInState;
            }
            incrementBytesRead(readInState);
            LOG.debug("Read-3args uri:{}, contentLength:{}, destLen:{}, readLen:{}, position:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, Long.valueOf(this.contentLength), Integer.valueOf(i2), Integer.valueOf(readInState), Long.valueOf(readInState >= 0 ? this.nextReadPos - 1 : this.nextReadPos), Long.valueOf(id), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            return readInState;
        }
        int min = Math.min((int) (this.contentLength - this.nextReadPos), Math.min(i2, bArr.length - i));
        System.arraycopy(this.tailBuf, posInTailBuf, bArr, i, min);
        this.nextReadPos += min;
        this.partnerInputStream.seek(this.nextReadPos);
        this.ccStream.seek(this.nextReadPos);
        incrementBytesRead(min);
        return min;
    }

    private int readInputStream(InputStream inputStream, byte[] bArr, int i, int i2, boolean z, Counter counter, State state) throws IOException {
        int read;
        int i3;
        long j = this.nextReadPos;
        long nanoTime = System.nanoTime();
        if (z) {
            read = inputStream.read();
            i3 = 1;
        } else {
            read = inputStream.read(bArr, i, i2);
            i3 = read;
        }
        long nanoTime2 = System.nanoTime();
        counter.increase(nanoTime2 - nanoTime, i3);
        LOG.debug("{} {}({},{},{})", new Object[]{Integer.valueOf(hashCode()), state, Long.valueOf(j), Integer.valueOf(i3), Long.valueOf(nanoTime2 - nanoTime)});
        return read;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x000b. Please report as an issue. */
    private int readInState(byte[] bArr, int i, int i2, boolean z) throws IOException {
        switch (this.state) {
            case NEW:
                this.partnerInputStream.reopen("open first connection", this.nextReadPos, i2);
                stateTransitionToORead();
            case OREAD:
                try {
                    this.partnerInputStream.seek(this.nextReadPos);
                    return readInputStream(this.partnerInputStream, bArr, i, i2, z, this.oCounter, State.OREAD);
                } catch (OBSMemArtsPartnerInputStream.OReadToMReadTransitionException e) {
                    if (this.state != State.OREAD) {
                        throw new IllegalStateException("state must be oread");
                    }
                    stateTransitionToMRead();
                }
            case MREAD:
                try {
                    return tryToReadFromCCStream(bArr, i, i2, z);
                } catch (EOFException e2) {
                    return -1;
                } catch (IOException e3) {
                    LOG.error("tryToReadFromCCStream offset[{}] len[{}] of [{}] failed, due to exception[{}]", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), this.uri, e3});
                    throw e3;
                }
            default:
                throw new IllegalStateException("unreachable code");
        }
    }

    public synchronized int read() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        long currentTimeMillis = System.currentTimeMillis();
        long id = Thread.currentThread().getId();
        if (this.contentLength == 0 || this.nextReadPos >= this.contentLength) {
            return -1;
        }
        int posInTailBuf = posInTailBuf(this.nextReadPos);
        if (posInTailBuf != -1 && this.tailBuf != null) {
            int i = this.tailBuf[posInTailBuf] & 255;
            this.nextReadPos++;
            this.partnerInputStream.seek(this.nextReadPos);
            this.ccStream.seek(this.nextReadPos);
            incrementBytesRead(1L);
            return i;
        }
        int readInState = readInState(null, 0, 0, true);
        if (readInState >= 0) {
            this.nextReadPos++;
            incrementBytesRead(1L);
        }
        LOG.debug("read-0arg uri:{}, contentLength:{}, position:{}, readValue:{}, thread:{}, timeUsedMilliSec:{}", new Object[]{this.uri, Long.valueOf(this.contentLength), Long.valueOf(readInState >= 0 ? this.nextReadPos - 1 : this.nextReadPos), Integer.valueOf(readInState), Long.valueOf(id), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        return readInState;
    }

    public synchronized int read(ByteBuffer byteBuffer) throws IOException {
        int remaining = byteBuffer.remaining();
        if (remaining == 0) {
            return 0;
        }
        byte[] bArr = new byte[remaining];
        int read = read(bArr, 0, remaining);
        if (read != -1) {
            byteBuffer.put(bArr, 0, read);
        }
        return read;
    }

    private int posInTailBuf(long j) {
        if (j < 0 || j < this.contentLength - this.bufSize) {
            return -1;
        }
        long j2 = this.contentLength - this.bufSize;
        if (j2 < 0) {
            j2 = 0;
        }
        long j3 = j - j2;
        if (j3 >= 0 && j3 < this.bufSize) {
            return (int) j3;
        }
        LOG.warn("nextReadPos is in invalid state, pos = {}, contentLength  = {}, bufSize = {}", new Object[]{Long.valueOf(j), Long.valueOf(this.contentLength), Integer.valueOf(this.bufSize)});
        return -1;
    }

    public synchronized void seek(long j) throws IOException {
        LOG.debug("seek(targetPos), targetPos [{}] ", Long.valueOf(j));
        this.fs.checkOpen();
        checkStreamOpen();
        if (j < 0) {
            throw new EOFException("Cannot seek to a negative offset " + j);
        }
        if (j > this.contentLength) {
            throw new EOFException("Cannot seek after EOF");
        }
        if (this.contentLength <= 0) {
            return;
        }
        if (posInTailBuf(j) != -1 && this.tailBuf == null) {
            byte[] bArr = new byte[this.bufSize];
            long j2 = this.contentLength - this.bufSize;
            if (j2 < 0) {
                j2 = 0;
            }
            this.nextReadPos = j2;
            this.partnerInputStream.seek(j2);
            this.ccStream.seek(j2);
            int i = 0;
            do {
                int read = read(bArr, i, this.bufSize - i);
                if (read == -1) {
                    break;
                } else {
                    i += read;
                }
            } while (i < this.bufSize);
            this.tailBuf = bArr;
        }
        this.nextReadPos = j;
        this.partnerInputStream.seek(j);
        this.ccStream.seek(j);
    }

    public synchronized long getPos() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        if (this.nextReadPos < 0) {
            return 0L;
        }
        return this.nextReadPos;
    }

    public boolean seekToNewSource(long j) throws IOException {
        return false;
    }

    public synchronized void setReadahead(Long l) throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        this.ccStream.setReadahead(l);
    }

    public synchronized void close() throws IOException {
        if (!this.closed) {
            this.fs.checkOpen();
            super.close();
            if (this.partnerInputStream != null) {
                this.partnerInputStream.close();
                this.partnerInputStream = null;
            }
            if (this.ccStream != null) {
                this.ccStream.close();
                this.ccStream = null;
            }
            this.closed = true;
        }
        LOG.debug("{} SUMMARY: OREAD{}, MREAD{}", new Object[]{Integer.valueOf(hashCode()), this.oCounter, this.mCounter});
    }

    private void checkStreamOpen() throws IOException {
        if (this.closed) {
            throw new IOException(this.uri + ": Stream is closed!");
        }
    }

    private void incrementBytesRead(long j) {
        if (this.statistics == null || j <= 0) {
            return;
        }
        this.statistics.incrementBytesRead(j);
    }

    private int tryToReadFromCCStream(byte[] bArr, int i, int i2, boolean z) throws IOException {
        try {
            return readInputStream(this.ccStream, bArr, i, i2, z, this.mCounter, State.MREAD);
        } catch (IOException e) {
            LOG.debug("{} mread escape, caused by {}", Integer.valueOf(hashCode()), e.getMessage());
            if (getState() != State.MREAD) {
                throw new IllegalStateException("cachemiss reopen must in state mread");
            }
            this.partnerInputStream.seek(this.nextReadPos);
            try {
                int readInputStream = readInputStream(this.partnerInputStream, bArr, i, i2, z, this.oCounter, State.OREAD);
                stateTransitionToORead();
                return readInputStream;
            } catch (OBSMemArtsPartnerInputStream.OReadToMReadTransitionException e2) {
                LOG.error("catch unexpected reopen signal, {}", e2.getMessage());
                throw new IllegalStateException("catch unexpected reopen signal", e2);
            }
        }
    }

    public State getState() {
        return this.state;
    }

    private void stateTransitionToMRead() throws IOException {
        if (this.state != State.OREAD) {
            throw new IllegalStateException("cannot transit state from " + this.state.toString() + " to mread");
        }
        this.state = State.MREAD;
        this.ccStream.seek(this.nextReadPos);
    }

    private void stateTransitionToORead() {
        if (this.state != State.MREAD && this.state != State.NEW) {
            throw new IllegalStateException("cannot transit state from " + this.state.toString() + " to oread");
        }
        this.state = State.OREAD;
    }
}
