package org.apache.flink.table.runtime.operators.join.lookup.cache.all;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.connector.config.lookup.CacheReloadPolicy;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/LookupALLCache.class */
public class LookupALLCache extends LookupCache {
    private static final Logger LOG = LoggerFactory.getLogger(LookupALLCache.class);
    private static final long serialVersionUID = 1;
    private final InputFormat<RowData, InputSplit> initialInputFormat;
    private final TypeInformation<RowData> cacheEntriesTypeInfo;
    private final TypeInformation<RowData> cacheKeysTypeInfo;
    private final GeneratedProjection generatedProjection;
    private final GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc;
    private final StampedLock cacheReloadLock;
    private ConcurrentHashMap<GenericRowData, Collection<RowData>> cache;
    private transient Collection<InputSplitCacheLoader> cacheLoaders;
    private transient ScheduledExecutorService scheduledReloadCacheService;
    private transient ExecutorService cacheLoadersService;
    private transient RuntimeContext runtimeContext;
    private transient Configuration parameters;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/LookupALLCache$ReloadCacheTask.class */
    private class ReloadCacheTask implements Runnable {
        private List<InputSplitCacheLoader> cacheLoaders;
        private boolean firstRun = true;

        public ReloadCacheTask(InputSplit[] inputSplitArr) {
            this.cacheLoaders = (List) Arrays.stream(inputSplitArr).map(inputSplit -> {
                InputSplitCacheLoader createCacheLoader = LookupALLCache.this.createCacheLoader();
                createCacheLoader.setInputSplit(inputSplit);
                return createCacheLoader;
            }).collect(Collectors.toList());
        }

        @Override // java.lang.Runnable
        public void run() {
            InputSplitCacheLoader inputSplitCacheLoader;
            try {
                if (!this.firstRun) {
                    InputSplit[] createInputSplits = LookupALLCache.this.initialInputFormat.createInputSplits(1);
                    if (createInputSplits.length == 0) {
                        throw new IllegalStateException("InputFormat must provide at least one input split to load data into the lookup 'ALL' cache.");
                    }
                    for (int i = 0; i < createInputSplits.length; i++) {
                        if (i >= this.cacheLoaders.size()) {
                            inputSplitCacheLoader = LookupALLCache.this.createCacheLoader();
                            this.cacheLoaders.add(inputSplitCacheLoader);
                        } else {
                            inputSplitCacheLoader = this.cacheLoaders.get(i);
                        }
                        inputSplitCacheLoader.setInputSplit(createInputSplits[i]);
                    }
                    if (this.cacheLoaders.size() > createInputSplits.length) {
                        this.cacheLoaders = this.cacheLoaders.subList(0, createInputSplits.length);
                    }
                    LookupALLCache.LOG.info("Created {} InputSplits from which data will be loaded in parallel into the lookup 'ALL' cache.", Integer.valueOf(this.cacheLoaders.size()));
                }
                long writeLock = LookupALLCache.this.cacheReloadLock.writeLock();
                try {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        LookupALLCache.this.cache.clear();
                        if (this.firstRun) {
                            LookupALLCache.LOG.info("Initial loading of data into the lookup 'ALL' cache. Input stream blocked");
                            this.firstRun = false;
                        } else {
                            LookupALLCache.LOG.info("Lookup 'ALL' cache ttl {} ms expired. Cache cleared. Input stream blocked. Data reloading started.", Long.valueOf(LookupALLCache.this.lookupConfig.getCacheExpireMs()));
                        }
                        if (this.cacheLoaders.size() > 1) {
                            int availableProcessors = Runtime.getRuntime().availableProcessors();
                            LookupALLCache.this.cacheLoadersService = Executors.newFixedThreadPool(this.cacheLoaders.size() < availableProcessors ? this.cacheLoaders.size() - 1 : availableProcessors - 1);
                            for (int i2 = 1; i2 < this.cacheLoaders.size(); i2++) {
                                LookupALLCache.this.cacheLoadersService.submit(this.cacheLoaders.get(i2));
                            }
                        }
                        this.cacheLoaders.get(0).run();
                        if (LookupALLCache.this.cacheLoadersService != null) {
                            LookupALLCache.this.cacheLoadersService.shutdown();
                            if (!LookupALLCache.this.cacheLoadersService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                                throw new IllegalStateException("Timeout for cache reload elapsed. This should never happen. If you see this, report a bug, please.");
                            }
                        }
                        LookupALLCache.LOG.info("Lookup 'ALL' cache loading finished. Time elapsed - {} ms. Number of records - {}. Input stream unlocked.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(LookupALLCache.this.cache.size()));
                        LookupALLCache.this.cacheReloadLock.unlockWrite(writeLock);
                    } catch (Exception e) {
                        LookupALLCache.LOG.error("Failed to reload lookup 'ALL' cache.", e);
                        throw new RuntimeException("Failed to reload lookup 'ALL' cache.", e);
                    }
                } catch (Throwable th) {
                    LookupALLCache.this.cacheReloadLock.unlockWrite(writeLock);
                    throw th;
                }
            } catch (Exception e2) {
                LookupALLCache.LOG.error("Failed to create InputSplits for lookup 'ALL' cache.", e2);
                throw new RuntimeException("Failed to create InputSplits for lookup 'ALL' cache.", e2);
            }
        }
    }

    public LookupALLCache(LookupConfig lookupConfig, InputFormat<RowData, ?> inputFormat, TypeInformation<RowData> typeInformation, GeneratedProjection generatedProjection, TypeInformation<RowData> typeInformation2, GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFunction) {
        super(lookupConfig);
        this.cacheReloadLock = new StampedLock();
        Preconditions.checkArgument(lookupConfig.getCacheReloadPolicy() == CacheReloadPolicy.BLOCKING, "Currently lookup 'ALL' cache supports only 'blocking' reload policy.");
        this.initialInputFormat = inputFormat;
        this.cacheEntriesTypeInfo = typeInformation;
        this.generatedProjection = generatedProjection;
        this.cacheKeysTypeInfo = typeInformation2;
        this.generatedCalc = generatedFunction;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public void open(Configuration configuration, RuntimeContext runtimeContext) throws Exception {
        this.parameters = configuration;
        this.runtimeContext = runtimeContext;
        this.initialInputFormat.configure(configuration);
        InputSplit[] createInputSplits = this.initialInputFormat.createInputSplits(1);
        if (createInputSplits.length == 0) {
            throw new IllegalStateException("InputFormat must provide at least one input split to load data into the lookup 'ALL' cache.");
        }
        this.cache = new ConcurrentHashMap<>(16, 0.75f, createInputSplits.length);
        this.cacheLoaders = new ArrayList(createInputSplits.length);
        ReloadCacheTask reloadCacheTask = new ReloadCacheTask(createInputSplits);
        reloadCacheTask.run();
        if (this.lookupConfig.getCacheExpireMs() > 0) {
            this.scheduledReloadCacheService = Executors.newSingleThreadScheduledExecutor();
            this.scheduledReloadCacheService.scheduleWithFixedDelay(reloadCacheTask, this.lookupConfig.getCacheExpireMs(), this.lookupConfig.getCacheExpireMs(), TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public Collection<RowData> getCachedRows(GenericRowData genericRowData) {
        long tryOptimisticRead = this.cacheReloadLock.tryOptimisticRead();
        Collection<RowData> orDefault = this.cache.getOrDefault(genericRowData, Collections.emptySet());
        if (!this.cacheReloadLock.validate(tryOptimisticRead)) {
            long readLock = this.cacheReloadLock.readLock();
            try {
                orDefault = this.cache.getOrDefault(genericRowData, Collections.emptySet());
                this.cacheReloadLock.unlockRead(readLock);
            } catch (Throwable th) {
                this.cacheReloadLock.unlockRead(readLock);
                throw th;
            }
        }
        return orDefault;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public void putCollectedRows(GenericRowData genericRowData, Collection<RowData> collection) {
        throw new UnsupportedOperationException("'ALL' cache doesn't support public 'put' operation from the outside.");
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public void close() throws Exception {
        this.cacheLoaders.forEach((v0) -> {
            v0.stopRunning();
        });
        if (this.scheduledReloadCacheService != null) {
            this.scheduledReloadCacheService.shutdown();
            if (!this.scheduledReloadCacheService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                throw new IllegalStateException("Timeout for closing reload cache thread. This should never happen. If you see this, report a bug, please.");
            }
        }
        if (this.cacheLoadersService != null) {
            this.cacheLoadersService.shutdown();
        }
        if (this.cache != null) {
            this.cache.clear();
        }
        for (InputSplitCacheLoader inputSplitCacheLoader : this.cacheLoaders) {
            RichInputFormat inputFormat = inputSplitCacheLoader.getInputFormat();
            inputFormat.close();
            if (inputFormat instanceof RichInputFormat) {
                inputFormat.closeInputFormat();
            }
            FunctionUtils.closeFunction(inputSplitCacheLoader.getCalc());
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public long size() {
        return this.cache.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public InputSplitCacheLoader createCacheLoader() {
        try {
            InputFormat<RowData, InputSplit> clone = this.cacheLoaders.isEmpty() ? this.initialInputFormat : InstantiationUtil.clone(this.initialInputFormat);
            clone.configure(this.parameters);
            if (clone instanceof RichInputFormat) {
                ((RichInputFormat) clone).openInputFormat();
            }
            Projection newInstance = this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
            TypeSerializer createSerializer = this.cacheEntriesTypeInfo.createSerializer(this.runtimeContext.getExecutionConfig());
            TypeSerializer createSerializer2 = this.cacheKeysTypeInfo.createSerializer(this.runtimeContext.getExecutionConfig());
            Function function = null;
            if (this.generatedCalc != null && this.lookupConfig.isCacheAfterCalc()) {
                function = (FlatMapFunction) this.generatedCalc.newInstance(this.runtimeContext.getUserCodeClassLoader());
                FunctionUtils.setFunctionRuntimeContext(function, this.runtimeContext);
                FunctionUtils.openFunction(function, this.parameters);
            }
            InputSplitCacheLoader inputSplitCacheLoader = new InputSplitCacheLoader(this.cache, clone, newInstance, createSerializer, createSerializer2, function);
            this.cacheLoaders.add(inputSplitCacheLoader);
            return inputSplitCacheLoader;
        } catch (Exception e) {
            throw new RuntimeException("Failed to create data loader into the 'ALL' lookup cache.", e);
        }
    }
}
