package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.connector.exception.RetryLookupException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/JoinedRowResultFuture.class */
public abstract class JoinedRowResultFuture<IN> implements ResultFuture<Object> {
    private static final Logger LOG = LoggerFactory.getLogger(JoinedRowResultFuture.class);
    protected final JoinedRowResultFuture<IN>.DelegateResultFuture delegate = new DelegateResultFuture();
    protected final GenericRowData nullRow;
    protected IN input;
    protected ResultFuture<RowData> realOutput;
    protected final AsyncFunction<IN, Object> fetcher;
    private final TableFunctionResultFuture<RowData> fetcherResultFuture;
    private final DataStructureConverter<RowData, Object> resultConverter;
    private final int maxRetries;
    private final Counter failedLookupsCounter;
    private final Histogram lookupRequestTimeHist;
    private int retryNum;
    private long resetTimeMs;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/JoinedRowResultFuture$DelegateResultFuture.class */
    protected final class DelegateResultFuture implements ResultFuture<RowData> {
        private Collection<RowData> collection;

        protected DelegateResultFuture() {
        }

        public void reset() {
            this.collection = null;
        }

        public void complete(Collection<RowData> collection) {
            this.collection = collection;
        }

        public void completeExceptionally(Throwable th) {
            JoinedRowResultFuture.this.completeExceptionally(th);
        }

        public Collection<RowData> getCollection() {
            return this.collection;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JoinedRowResultFuture(TableFunctionResultFuture<RowData> tableFunctionResultFuture, DataStructureConverter<RowData, Object> dataStructureConverter, int i, int i2, AsyncFunction<IN, Object> asyncFunction, Counter counter, Histogram histogram) {
        this.fetcherResultFuture = tableFunctionResultFuture;
        this.resultConverter = dataStructureConverter;
        this.maxRetries = i2;
        this.fetcher = asyncFunction;
        this.failedLookupsCounter = counter;
        this.lookupRequestTimeHist = histogram;
        this.nullRow = new GenericRowData(i);
    }

    public void reset(IN in, ResultFuture<RowData> resultFuture) {
        this.realOutput = resultFuture;
        this.input = in;
        this.retryNum = 0;
        this.resetTimeMs = System.currentTimeMillis();
        this.fetcherResultFuture.setInput(in);
        this.fetcherResultFuture.setResultFuture(this.delegate);
        this.delegate.reset();
    }

    public TableFunctionResultFuture<RowData> getFetcherResultFuture() {
        return this.fetcherResultFuture;
    }

    public void complete(Collection<Object> collection) {
        Collection<Object> arrayList;
        this.lookupRequestTimeHist.update(System.currentTimeMillis() - this.resetTimeMs);
        if (this.resultConverter.isIdentityConversion()) {
            arrayList = collection;
        } else {
            arrayList = new ArrayList(collection.size());
            for (Object obj : collection) {
                if (obj instanceof RowData) {
                    arrayList.add((RowData) obj);
                } else {
                    arrayList.add(this.resultConverter.toInternal(obj));
                }
            }
        }
        try {
            this.fetcherResultFuture.complete(arrayList);
            combineLeftRightRows();
        } catch (Throwable th) {
            completeExceptionally(th);
        }
    }

    protected abstract void combineLeftRightRows();

    public void completeExceptionally(Throwable th) {
        if (!(th instanceof RetryLookupException)) {
            this.realOutput.completeExceptionally(th);
            return;
        }
        if (this.retryNum >= this.maxRetries) {
            this.realOutput.completeExceptionally(new RuntimeException(String.format("Execution of async lookup failed after '%d' retries.", Integer.valueOf(this.retryNum)), th));
            return;
        }
        LOG.error("Async lookup error, retry time = {}.", Integer.valueOf(this.retryNum), th);
        this.failedLookupsCounter.inc();
        this.retryNum++;
        try {
            Thread.sleep(1000 * this.retryNum);
            this.fetcher.asyncInvoke(this.input, this);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() throws Exception {
        this.fetcherResultFuture.close();
    }
}
