package org.apache.hudi.org.apache.hadoop.hbase.client;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hudi.org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hudi.org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hudi.org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hudi.org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hudi.org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hudi.org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hudi.org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hudi.org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hudi.org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hudi.org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hudi.org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hudi.org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.class */
public class AsyncScanSingleRegionRpcRetryingCaller {
    private static final Logger LOG;
    private final Timer retryTimer;
    private final Scan scan;
    private final ScanMetrics scanMetrics;
    private final long scannerId;
    private final ScanResultCache resultCache;
    private final AdvancedScanResultConsumer consumer;
    private final ClientProtos.ClientService.Interface stub;
    private final HRegionLocation loc;
    private final boolean regionServerRemote;
    private final int priority;
    private final long scannerLeaseTimeoutPeriodNs;
    private final long pauseNs;
    private final long pauseForCQTBENs;
    private final int maxAttempts;
    private final long scanTimeoutNs;
    private final long rpcTimeoutNs;
    private final int startLogErrorsCnt;
    private final Runnable completeWhenNoMoreResultsInRegion;
    private final CompletableFuture<Boolean> future;
    private final HBaseRpcController controller;
    private byte[] nextStartRowWhenError;
    private boolean includeNextStartRowWhenError;
    private long nextCallStartNs;
    private int tries;
    private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
    private long nextCallSeq = -1;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller$ScanControllerImpl.class */
    public final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
        private final Optional<Cursor> cursor;
        private ScanResumerImpl resumer;
        private ScanControllerState state = ScanControllerState.INITIALIZED;
        private final Thread callerThread = Thread.currentThread();

        public ScanControllerImpl(Optional<Cursor> optional) {
            this.cursor = optional;
        }

        private void preCheck() {
            Preconditions.checkState(Thread.currentThread() == this.callerThread, "The current thread is %s, expected thread is %s, you should not call this method outside onNext or onHeartbeat", Thread.currentThread(), this.callerThread);
            Preconditions.checkState(this.state.equals(ScanControllerState.INITIALIZED), "Invalid Stopper state %s", this.state);
        }

        @Override // org.apache.hudi.org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanController
        public AdvancedScanResultConsumer.ScanResumer suspend() {
            preCheck();
            this.state = ScanControllerState.SUSPENDED;
            ScanResumerImpl scanResumerImpl = new ScanResumerImpl();
            this.resumer = scanResumerImpl;
            return scanResumerImpl;
        }

        @Override // org.apache.hudi.org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanController
        public void terminate() {
            preCheck();
            this.state = ScanControllerState.TERMINATED;
        }

        ScanControllerState destroy() {
            ScanControllerState scanControllerState = this.state;
            this.state = ScanControllerState.DESTROYED;
            return scanControllerState;
        }

        @Override // org.apache.hudi.org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanController
        public Optional<Cursor> cursor() {
            return this.cursor;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller$ScanControllerState.class */
    public enum ScanControllerState {
        INITIALIZED,
        SUSPENDED,
        TERMINATED,
        DESTROYED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller$ScanResumerImpl.class */
    public final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
        private ScanResumerState state;
        private ClientProtos.ScanResponse resp;
        private int numberOfCompleteRows;
        private Timeout leaseRenewer;

        private ScanResumerImpl() {
            this.state = ScanResumerState.INITIALIZED;
        }

        @Override // org.apache.hudi.org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer
        public void resume() {
            synchronized (this) {
                if (this.state == ScanResumerState.INITIALIZED) {
                    this.state = ScanResumerState.RESUMED;
                    return;
                }
                if (this.state == ScanResumerState.RESUMED) {
                    return;
                }
                this.state = ScanResumerState.RESUMED;
                if (this.leaseRenewer != null) {
                    this.leaseRenewer.cancel();
                }
                AsyncScanSingleRegionRpcRetryingCaller.this.completeOrNext(this.resp, this.numberOfCompleteRows);
            }
        }

        private void scheduleRenewLeaseTask() {
            this.leaseRenewer = AsyncScanSingleRegionRpcRetryingCaller.this.retryTimer.newTimeout(timeout -> {
                tryRenewLease();
            }, AsyncScanSingleRegionRpcRetryingCaller.this.scannerLeaseTimeoutPeriodNs / 2, TimeUnit.NANOSECONDS);
        }

        private synchronized void tryRenewLease() {
            if (this.state == ScanResumerState.RESUMED) {
                return;
            }
            AsyncScanSingleRegionRpcRetryingCaller.this.renewLease();
            scheduleRenewLeaseTask();
        }

        synchronized boolean prepare(ClientProtos.ScanResponse scanResponse, int i) {
            if (this.state == ScanResumerState.RESUMED) {
                return false;
            }
            this.state = ScanResumerState.SUSPENDED;
            this.resp = scanResponse;
            this.numberOfCompleteRows = i;
            if (!scanResponse.getMoreResultsInRegion()) {
                return true;
            }
            scheduleRenewLeaseTask();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller$ScanResumerState.class */
    public enum ScanResumerState {
        INITIALIZED,
        SUSPENDED,
        RESUMED
    }

    public AsyncScanSingleRegionRpcRetryingCaller(Timer timer, AsyncConnectionImpl asyncConnectionImpl, Scan scan, ScanMetrics scanMetrics, long j, ScanResultCache scanResultCache, AdvancedScanResultConsumer advancedScanResultConsumer, ClientProtos.ClientService.Interface r13, HRegionLocation hRegionLocation, boolean z, int i, long j2, long j3, long j4, int i2, long j5, long j6, int i3) {
        this.retryTimer = timer;
        this.scan = scan;
        this.scanMetrics = scanMetrics;
        this.scannerId = j;
        this.resultCache = scanResultCache;
        this.consumer = advancedScanResultConsumer;
        this.stub = r13;
        this.loc = hRegionLocation;
        this.regionServerRemote = z;
        this.scannerLeaseTimeoutPeriodNs = j2;
        this.pauseNs = j3;
        this.pauseForCQTBENs = j4;
        this.maxAttempts = i2;
        this.scanTimeoutNs = j5;
        this.rpcTimeoutNs = j6;
        this.startLogErrorsCnt = i3;
        if (scan.isReversed()) {
            this.completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
        } else {
            this.completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
        }
        this.future = new CompletableFuture<>();
        this.priority = i;
        this.controller = asyncConnectionImpl.rpcControllerFactory.newController();
        this.controller.setPriority(i);
        this.exceptions = new ArrayList();
    }

    private long elapsedMs() {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.nextCallStartNs);
    }

    private long remainingTimeNs() {
        return this.scanTimeoutNs - (System.nanoTime() - this.nextCallStartNs);
    }

    private void closeScanner() {
        ConnectionUtils.incRPCCallsMetrics(this.scanMetrics, this.regionServerRemote);
        ConnectionUtils.resetController(this.controller, this.rpcTimeoutNs, 200);
        this.stub.scan(this.controller, RequestConverter.buildScanRequest(this.scannerId, 0, true, false), scanResponse -> {
            if (this.controller.failed()) {
                LOG.warn("Call to " + this.loc.getServerName() + " for closing scanner id = " + this.scannerId + " for " + this.loc.getRegion().getEncodedName() + " of " + this.loc.getRegion().getTable() + " failed, ignore, probably already closed", this.controller.getFailed());
            }
        });
    }

    private void completeExceptionally(boolean z) {
        this.resultCache.clear();
        if (z) {
            closeScanner();
        }
        this.future.completeExceptionally(new RetriesExhaustedException(this.tries - 1, this.exceptions));
    }

    private void completeNoMoreResults() {
        this.future.complete(false);
    }

    private void completeWithNextStartRow(byte[] bArr, boolean z) {
        this.scan.withStartRow(bArr, z);
        this.future.complete(true);
    }

    private void completeWhenError(boolean z) {
        ConnectionUtils.incRPCRetriesMetrics(this.scanMetrics, z);
        this.resultCache.clear();
        if (z) {
            closeScanner();
        }
        if (this.nextStartRowWhenError != null) {
            this.scan.withStartRow(this.nextStartRowWhenError, this.includeNextStartRowWhenError);
        }
        this.future.complete(true);
    }

    private void onError(Throwable th) {
        long pauseTime;
        Throwable translateException = ConnectionUtils.translateException(th);
        if (this.tries > this.startLogErrorsCnt) {
            LOG.warn("Call to " + this.loc.getServerName() + " for scanner id = " + this.scannerId + " for " + this.loc.getRegion().getEncodedName() + " of " + this.loc.getRegion().getTable() + " failed, , tries = " + this.tries + ", maxAttempts = " + this.maxAttempts + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(this.scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + " ms", translateException);
        }
        boolean z = (translateException instanceof UnknownScannerException) || (translateException instanceof NotServingRegionException) || (translateException instanceof RegionServerStoppedException) || (translateException instanceof ScannerResetException);
        this.exceptions.add(new RetriesExhaustedException.ThrowableWithExtraContext(translateException, EnvironmentEdgeManager.currentTime(), ""));
        if (this.tries >= this.maxAttempts) {
            completeExceptionally(!z);
            return;
        }
        long j = translateException instanceof CallQueueTooBigException ? this.pauseForCQTBENs : this.pauseNs;
        if (this.scanTimeoutNs > 0) {
            long remainingTimeNs = remainingTimeNs() - ConnectionUtils.SLEEP_DELTA_NS;
            if (remainingTimeNs <= 0) {
                completeExceptionally(!z);
                return;
            }
            pauseTime = Math.min(remainingTimeNs, ConnectionUtils.getPauseTime(j, this.tries - 1));
        } else {
            pauseTime = ConnectionUtils.getPauseTime(j, this.tries - 1);
        }
        if (z) {
            completeWhenError(false);
            return;
        }
        if (translateException instanceof OutOfOrderScannerNextException) {
            completeWhenError(true);
        } else if (translateException instanceof DoNotRetryIOException) {
            completeExceptionally(true);
        } else {
            this.tries++;
            this.retryTimer.newTimeout(timeout -> {
                call();
            }, pauseTime, TimeUnit.NANOSECONDS);
        }
    }

    private void updateNextStartRowWhenError(Result result) {
        this.nextStartRowWhenError = result.getRow();
        this.includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
    }

    private void completeWhenNoMoreResultsInRegion() {
        if (ConnectionUtils.noMoreResultsForScan(this.scan, this.loc.getRegion())) {
            completeNoMoreResults();
        } else {
            completeWithNextStartRow(this.loc.getRegion().getEndKey(), true);
        }
    }

    private void completeReversedWhenNoMoreResultsInRegion() {
        if (ConnectionUtils.noMoreResultsForReverseScan(this.scan, this.loc.getRegion())) {
            completeNoMoreResults();
        } else {
            completeWithNextStartRow(this.loc.getRegion().getStartKey(), false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeOrNext(ClientProtos.ScanResponse scanResponse, int i) {
        if (scanResponse.hasMoreResults() && !scanResponse.getMoreResults()) {
            completeNoMoreResults();
            return;
        }
        if (this.scan.getLimit() > 0) {
            int limit = this.scan.getLimit() - i;
            if (!$assertionsDisabled && limit <= 0) {
                throw new AssertionError();
            }
            this.scan.setLimit(limit);
        }
        if (scanResponse.getMoreResultsInRegion()) {
            next();
        } else {
            this.completeWhenNoMoreResultsInRegion.run();
        }
    }

    private void onComplete(HBaseRpcController hBaseRpcController, ClientProtos.ScanResponse scanResponse) {
        ScanControllerImpl scanControllerImpl;
        if (hBaseRpcController.failed()) {
            onError(hBaseRpcController.getFailed());
            return;
        }
        ConnectionUtils.updateServerSideMetrics(this.scanMetrics, scanResponse);
        boolean z = scanResponse.hasHeartbeatMessage() && scanResponse.getHeartbeatMessage();
        int numberOfCompleteRows = this.resultCache.numberOfCompleteRows();
        try {
            Result[] results = ResponseConverter.getResults(hBaseRpcController.cellScanner(), scanResponse);
            ConnectionUtils.updateResultsMetrics(this.scanMetrics, results, z);
            Result[] addAndGet = this.resultCache.addAndGet((Result[]) Optional.ofNullable(results).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), z);
            if (addAndGet.length > 0) {
                scanControllerImpl = new ScanControllerImpl(scanResponse.hasCursor() ? Optional.of(ProtobufUtil.toCursor(scanResponse.getCursor())) : Optional.empty());
                updateNextStartRowWhenError(addAndGet[addAndGet.length - 1]);
                this.consumer.onNext(addAndGet, scanControllerImpl);
            } else {
                Optional empty = Optional.empty();
                if (scanResponse.hasCursor()) {
                    empty = Optional.of(ProtobufUtil.toCursor(scanResponse.getCursor()));
                } else if (this.scan.isNeedCursorResult() && results.length > 0) {
                    empty = Optional.of(new Cursor(results[results.length - 1].getRow()));
                }
                scanControllerImpl = new ScanControllerImpl(empty);
                if (z || empty.isPresent()) {
                    this.consumer.onHeartbeat(scanControllerImpl);
                }
            }
            ScanControllerState destroy = scanControllerImpl.destroy();
            if (destroy == ScanControllerState.TERMINATED) {
                if (scanResponse.getMoreResultsInRegion()) {
                    closeScanner();
                }
                completeNoMoreResults();
            } else {
                int numberOfCompleteRows2 = this.resultCache.numberOfCompleteRows() - numberOfCompleteRows;
                if (destroy == ScanControllerState.SUSPENDED && scanControllerImpl.resumer.prepare(scanResponse, numberOfCompleteRows2)) {
                    return;
                }
                completeOrNext(scanResponse, numberOfCompleteRows2);
            }
        } catch (IOException e) {
            LOG.warn("decode scan response failed", e);
            completeWhenError(true);
        }
    }

    private void call() {
        long j;
        if (this.scanTimeoutNs > 0) {
            long nanoTime = this.scanTimeoutNs - (System.nanoTime() - this.nextCallStartNs);
            if (nanoTime <= 0) {
                completeExceptionally(true);
                return;
            }
            j = nanoTime;
        } else {
            j = 0;
        }
        ConnectionUtils.incRPCCallsMetrics(this.scanMetrics, this.regionServerRemote);
        if (this.tries > 1) {
            ConnectionUtils.incRPCRetriesMetrics(this.scanMetrics, this.regionServerRemote);
        }
        ConnectionUtils.resetController(this.controller, j, this.priority);
        ClientProtos.ScanRequest buildScanRequest = RequestConverter.buildScanRequest(this.scannerId, this.scan.getCaching(), false, this.nextCallSeq, this.scan.isScanMetricsEnabled(), false, this.scan.getLimit());
        Context current = Context.current();
        this.stub.scan(this.controller, buildScanRequest, scanResponse -> {
            Scope makeCurrent = current.makeCurrent();
            Throwable th = null;
            try {
                try {
                    onComplete(this.controller, scanResponse);
                    if (makeCurrent != null) {
                        if (0 == 0) {
                            makeCurrent.close();
                            return;
                        }
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (makeCurrent != null) {
                    if (th != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        makeCurrent.close();
                    }
                }
                throw th4;
            }
        });
    }

    private void next() {
        this.nextCallSeq++;
        this.tries = 1;
        this.exceptions.clear();
        this.nextCallStartNs = System.nanoTime();
        call();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void renewLease() {
        ConnectionUtils.incRPCCallsMetrics(this.scanMetrics, this.regionServerRemote);
        this.nextCallSeq++;
        ConnectionUtils.resetController(this.controller, this.rpcTimeoutNs, this.priority);
        this.stub.scan(this.controller, RequestConverter.buildScanRequest(this.scannerId, 0, false, this.nextCallSeq, false, true, -1), scanResponse -> {
        });
    }

    public CompletableFuture<Boolean> start(HBaseRpcController hBaseRpcController, ClientProtos.ScanResponse scanResponse) {
        onComplete(hBaseRpcController, scanResponse);
        return this.future;
    }

    static {
        $assertionsDisabled = !AsyncScanSingleRegionRpcRetryingCaller.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
    }
}
