package org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.flink.table.connector.config.lookup.CacheReloadPolicy;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.InputSplitCacheLoadTask;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.loader.CacheLoaderFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/reload/manager/ReloadCacheManager.class */
public abstract class ReloadCacheManager extends AbstractRichFunction implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ReloadCacheManager.class);
    protected final LookupConfig lookupConfig;
    private final InputFormat<RowData, InputSplit> initialInputFormat;
    private final CacheLoaderFactory cacheLoaderFactory;
    private final RowType lookupTableRowType;
    protected volatile transient ConcurrentHashMap<GenericRowData, Collection<RowData>> cache;
    protected transient Consumer<Throwable> reloadFailCallback;
    private transient List<InputSplitCacheLoadTask> cacheLoadTasks;
    private transient Configuration parameters;
    protected boolean firstRun = true;
    private transient Counter loadCount;
    private volatile transient long latestLoadTimeMs;

    /* renamed from: org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager.ReloadCacheManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/reload/manager/ReloadCacheManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$table$connector$config$lookup$CacheReloadPolicy = new int[CacheReloadPolicy.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$table$connector$config$lookup$CacheReloadPolicy[CacheReloadPolicy.BLOCKING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$table$connector$config$lookup$CacheReloadPolicy[CacheReloadPolicy.NON_BLOCKING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$table$connector$config$lookup$CacheReloadPolicy[CacheReloadPolicy.SNAPSHOT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public ReloadCacheManager(LookupConfig lookupConfig, InputFormat<RowData, InputSplit> inputFormat, CacheLoaderFactory cacheLoaderFactory, RowType rowType) {
        this.lookupConfig = lookupConfig;
        this.initialInputFormat = inputFormat;
        this.cacheLoaderFactory = cacheLoaderFactory;
        this.lookupTableRowType = rowType;
    }

    public void setReloadFailCallback(Consumer<Throwable> consumer) {
        this.reloadFailCallback = consumer;
    }

    public Collection<RowData> readRows(GenericRowData genericRowData) {
        return this.cache.getOrDefault(genericRowData, Collections.emptySet());
    }

    public void open(Configuration configuration) throws Exception {
        this.parameters = configuration;
        this.initialInputFormat.configure(configuration);
        InputSplit[] createInputSplits = this.initialInputFormat.createInputSplits(1);
        if (createInputSplits.length == 0) {
            throw new IllegalStateException("InputFormat must provide at least one input split to load data into the lookup 'ALL' cache.");
        }
        this.cache = new ConcurrentHashMap<>(16, 0.75f, getConcurrencyLevel(createInputSplits.length));
        this.cacheLoadTasks = new ArrayList(createInputSplits.length);
        Arrays.stream(createInputSplits).forEach(inputSplit -> {
            addCacheLoadTask().setInputSplit(inputSplit);
        });
        this.loadCount = getRuntimeContext().getMetricGroup().counter(LookupCache.LOAD_COUNT_METRIC);
        OperatorMetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        ConcurrentHashMap<GenericRowData, Collection<RowData>> concurrentHashMap = this.cache;
        concurrentHashMap.getClass();
        metricGroup.gauge(LookupCache.NUM_RECORDS_METRIC, concurrentHashMap::size);
        getRuntimeContext().getMetricGroup().gauge(LookupCache.LATEST_LOAD_TIME_METRIC, () -> {
            return Long.valueOf(this.latestLoadTimeMs);
        });
    }

    public ConcurrentHashMap<GenericRowData, Collection<RowData>> getCache() {
        return this.cache;
    }

    public void stopRunning() {
        this.cacheLoadTasks.forEach((v0) -> {
            v0.stopRunning();
        });
    }

    public void close() throws Exception {
        Iterator<InputSplitCacheLoadTask> it = this.cacheLoadTasks.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.cache != null) {
            this.cache.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getConcurrencyLevel() {
        return getConcurrencyLevel(this.cacheLoadTasks.size());
    }

    protected int getConcurrencyLevel(int i) {
        return Math.min(i, Runtime.getRuntime().availableProcessors());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateCacheLoadTasks() {
        try {
            if (!this.firstRun) {
                InputSplit[] createInputSplits = this.initialInputFormat.createInputSplits(1);
                if (createInputSplits.length == 0) {
                    throw new IllegalStateException("InputFormat must provide at least one input split to load data into the lookup 'ALL' cache.");
                }
                int i = 0;
                while (i < createInputSplits.length) {
                    (i < this.cacheLoadTasks.size() ? this.cacheLoadTasks.get(i) : addCacheLoadTask()).setInputSplit(createInputSplits[i]);
                    i++;
                }
                if (this.cacheLoadTasks.size() > createInputSplits.length) {
                    ListIterator<InputSplitCacheLoadTask> listIterator = this.cacheLoadTasks.listIterator(createInputSplits.length);
                    while (listIterator.hasNext()) {
                        listIterator.next().close();
                        listIterator.remove();
                    }
                }
                LOG.debug("InputFormat created {} InputSplits: {}", Integer.valueOf(this.cacheLoadTasks.size()), Arrays.deepToString(createInputSplits));
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to create InputSplits for lookup 'ALL' cache.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runCacheLoadTasks(boolean z) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.firstRun) {
            LOG.info("Initial loading of data into the lookup 'ALL' cache. Input stream is BLOCKED.");
            this.firstRun = false;
        }
        ExecutorService executorService = null;
        ArrayList arrayList = null;
        if (this.cacheLoadTasks.size() > 1) {
            arrayList = new ArrayList();
            executorService = Executors.newFixedThreadPool(getConcurrencyLevel() - 1);
            for (int i = 1; i < this.cacheLoadTasks.size(); i++) {
                arrayList.add(executorService.submit(this.cacheLoadTasks.get(i)));
            }
        }
        this.cacheLoadTasks.get(0).run();
        if (executorService != null) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            executorService.shutdownNow();
        }
        this.loadCount.inc();
        String str = (this.firstRun || z) ? "Input stream UNLOCKED." : "";
        this.latestLoadTimeMs = System.currentTimeMillis() - currentTimeMillis;
        LOG.info("Lookup 'ALL' cache loading finished. Time elapsed - {} ms. Number of records - {}. {}", new Object[]{Long.valueOf(this.latestLoadTimeMs), Integer.valueOf(this.cache.size()), str});
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cache content: \n{\n\t{}\n}", Joiner.on(",\n\t").withKeyValueSeparator(" = ").join(this.cache));
        }
    }

    protected InputSplitCacheLoadTask addCacheLoadTask() {
        return addCacheLoadTask(new AtomicReference<>(this.cache));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InputSplitCacheLoadTask addCacheLoadTask(AtomicReference<ConcurrentHashMap<GenericRowData, Collection<RowData>>> atomicReference) {
        try {
            InputFormat<RowData, InputSplit> clone = this.cacheLoadTasks.isEmpty() ? this.initialInputFormat : InstantiationUtil.clone(this.initialInputFormat);
            clone.configure(this.parameters);
            InputSplitCacheLoadTask inputSplitCacheLoadTask = new InputSplitCacheLoadTask(clone, this.cacheLoaderFactory.createCacheLoader(atomicReference, this.parameters, getRuntimeContext()), this.lookupTableRowType);
            this.cacheLoadTasks.add(inputSplitCacheLoadTask);
            return inputSplitCacheLoadTask;
        } catch (Exception e) {
            throw new RuntimeException("Failed to create data loader into the 'ALL' lookup cache.", e);
        }
    }

    public static ReloadCacheManager forReloadPolicy(LookupConfig lookupConfig, InputFormat<RowData, InputSplit> inputFormat, CacheLoaderFactory cacheLoaderFactory, RowType rowType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$table$connector$config$lookup$CacheReloadPolicy[lookupConfig.getCacheReloadPolicy().ordinal()]) {
            case 1:
                return new BlockingReloadCacheManager(lookupConfig, inputFormat, cacheLoaderFactory, rowType);
            case 2:
                return new NonBlockingReloadCacheManager(lookupConfig, inputFormat, cacheLoaderFactory, rowType);
            case 3:
                return new SnapshotReloadCacheManager(lookupConfig, inputFormat, cacheLoaderFactory, rowType);
            default:
                throw new UnsupportedOperationException("Unknown lookup 'ALL' cache reload policy " + lookupConfig.getCacheReloadPolicy());
        }
    }
}
