package org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.InputSplitCacheLoadTask;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.loader.CacheLoaderFactory;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/reload/manager/SnapshotReloadCacheManager.class */
public class SnapshotReloadCacheManager extends ReloadCacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotReloadCacheManager.class);
    private final AtomicReference<ConcurrentHashMap<GenericRowData, Collection<RowData>>> cacheRef;
    protected boolean isStopped;

    public SnapshotReloadCacheManager(LookupConfig lookupConfig, InputFormat<RowData, InputSplit> inputFormat, CacheLoaderFactory cacheLoaderFactory, RowType rowType) {
        super(lookupConfig, inputFormat, cacheLoaderFactory, rowType);
        this.cacheRef = new AtomicReference<>();
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager.ReloadCacheManager
    protected InputSplitCacheLoadTask addCacheLoadTask() {
        return addCacheLoadTask(this.cacheRef);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager.ReloadCacheManager
    public void close() throws Exception {
        super.close();
        this.isStopped = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            updateCacheLoadTasks();
            ConcurrentHashMap<GenericRowData, Collection<RowData>> concurrentHashMap = this.firstRun ? this.cache : new ConcurrentHashMap<>(16, 0.75f, getConcurrencyLevel());
            this.cacheRef.set(concurrentHashMap);
            if (!this.firstRun) {
                LOG.info("Lookup 'ALL' cache ttl {} ms expired. Data reloading started. During that join is performed with a snapshot of cache at the start of the reloading.", Long.valueOf(this.lookupConfig.getCacheExpireMs()));
            }
            runCacheLoadTasks(false);
            if (!this.isStopped) {
                this.cache = concurrentHashMap;
            }
        } catch (Throwable th) {
            this.reloadFailCallback.accept(th);
        }
    }
}
