package org.apache.flink.table.runtime.functions.table.lookup.reload.inputformat;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.functions.table.lookup.reload.LookupSetLoader;
import org.apache.flink.table.runtime.functions.table.lookup.reload.ReloadManager;
import org.apache.flink.table.runtime.functions.table.lookup.reload.loader.TableRowsLoaderFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/lookup/reload/inputformat/InputFormatLookupSetLoader.class */
public class InputFormatLookupSetLoader<T, M extends MetricGroup> extends LookupSetLoader<T, M> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(InputFormatLookupSetLoader.class);
    private final TableRowsLoaderFactory<T> tableRowsLoaderFactory;
    private final InputFormat<RowData, InputSplit> initialInputFormat;
    private final RowType lookupTableRowType;

    public InputFormatLookupSetLoader(ReloadManager<T, M> reloadManager, TableRowsLoaderFactory<T> tableRowsLoaderFactory, InputFormat<RowData, ?> inputFormat, RowType rowType) {
        super(reloadManager);
        this.initialInputFormat = inputFormat;
        this.tableRowsLoaderFactory = tableRowsLoaderFactory;
        this.lookupTableRowType = rowType;
    }

    @Override // org.apache.flink.table.runtime.functions.table.lookup.reload.LookupSetLoader
    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        this.functionContext = functionContext;
        this.initialInputFormat.configure(functionContext.getConfiguration());
    }

    @Override // org.apache.flink.table.runtime.functions.table.lookup.reload.LookupSetLoader
    protected boolean updateSet() throws Exception {
        boolean interrupted;
        InputSplit[] createInputSplits = createInputSplits();
        int concurrencyLevel = getConcurrencyLevel(createInputSplits.length);
        this.reloadManager.preLoad(concurrencyLevel);
        Deque deque = (Deque) Arrays.stream(createInputSplits).map(this::createLoadTask).collect(Collectors.toCollection(ArrayDeque::new));
        ExecutorService executorService = null;
        try {
            try {
                InputSplitLoadTask inputSplitLoadTask = (InputSplitLoadTask) deque.pop();
                CompletableFuture<Void> completableFuture = null;
                if (!deque.isEmpty()) {
                    executorService = Executors.newFixedThreadPool(concurrencyLevel - 1);
                    completableFuture = CompletableFuture.allOf((CompletableFuture[]) deque.stream().map(inputSplitLoadTask2 -> {
                        return CompletableFuture.runAsync(inputSplitLoadTask2, executorService);
                    }).toArray(i -> {
                        return new CompletableFuture[i];
                    }));
                }
                inputSplitLoadTask.run();
                if (completableFuture != null) {
                    completableFuture.get();
                }
                interrupted = Thread.interrupted();
                if (executorService != null) {
                    executorService.shutdownNow();
                    if (!executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                        LOG.error("ExecutorService with InputSplit loading tasks was not terminated in timeout of {} ms.", 10000L);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                interrupted = Thread.interrupted();
                if (executorService != null) {
                    executorService.shutdownNow();
                    if (!executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                        LOG.error("ExecutorService with InputSplit loading tasks was not terminated in timeout of {} ms.", 10000L);
                    }
                }
            }
            return !interrupted;
        } catch (Throwable th) {
            Thread.interrupted();
            if (executorService != null) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                    LOG.error("ExecutorService with InputSplit loading tasks was not terminated in timeout of {} ms.", 10000L);
                }
            }
            throw th;
        }
    }

    private InputSplitLoadTask createLoadTask(InputSplit inputSplit) {
        try {
            InputFormat clone = InstantiationUtil.clone(this.initialInputFormat);
            clone.configure(this.functionContext.getConfiguration());
            return new InputSplitLoadTask(this.tableRowsLoaderFactory.createTableRowsLoader(this.reloadManager.getFillingSetOnReload(), this.functionContext), this.lookupTableRowType, clone, inputSplit);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create InputFormatLoadTask", e);
        }
    }

    private InputSplit[] createInputSplits() throws IOException {
        InputSplit[] createInputSplits = this.initialInputFormat.createInputSplits(1);
        if (LOG.isDebugEnabled()) {
            LOG.debug("InputFormat created {} InputSplits: {}", Integer.valueOf(createInputSplits.length), Arrays.deepToString(createInputSplits));
        }
        Preconditions.checkState(createInputSplits.length >= 1, "InputFormat must provide at least one input split to load data into Lookup " + this.reloadManager.getSetName());
        return createInputSplits;
    }

    private int getConcurrencyLevel(int i) {
        return Math.min(i, Runtime.getRuntime().availableProcessors());
    }
}
