package org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager;

import java.util.Collection;
import java.util.concurrent.locks.StampedLock;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.loader.CacheLoaderFactory;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/reload/manager/BlockingReloadCacheManager.class */
public class BlockingReloadCacheManager extends ReloadCacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(BlockingReloadCacheManager.class);
    private final StampedLock cacheReloadLock;

    public BlockingReloadCacheManager(LookupConfig lookupConfig, InputFormat<RowData, InputSplit> inputFormat, CacheLoaderFactory cacheLoaderFactory, RowType rowType) {
        super(lookupConfig, inputFormat, cacheLoaderFactory, rowType);
        this.cacheReloadLock = new StampedLock();
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager.ReloadCacheManager
    public Collection<RowData> readRows(GenericRowData genericRowData) {
        long tryOptimisticRead = this.cacheReloadLock.tryOptimisticRead();
        Collection<RowData> readRows = super.readRows(genericRowData);
        if (!this.cacheReloadLock.validate(tryOptimisticRead)) {
            long readLock = this.cacheReloadLock.readLock();
            try {
                readRows = super.readRows(genericRowData);
                this.cacheReloadLock.unlockRead(readLock);
            } catch (Throwable th) {
                this.cacheReloadLock.unlockRead(readLock);
                throw th;
            }
        }
        return readRows;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            updateCacheLoadTasks();
            long writeLock = this.cacheReloadLock.writeLock();
            try {
                if (!this.firstRun) {
                    LOG.info("Lookup 'ALL' cache ttl {} ms expired. Cache is cleared. Input stream is BLOCKED. Data reloading started.", Long.valueOf(this.lookupConfig.getCacheExpireMs()));
                }
                this.cache.clear();
                runCacheLoadTasks(true);
                this.cacheReloadLock.unlockWrite(writeLock);
            } catch (Throwable th) {
                this.cacheReloadLock.unlockWrite(writeLock);
                throw th;
            }
        } catch (Throwable th2) {
            this.reloadFailCallback.accept(th2);
            this.loadFailuresCounter.inc();
        }
    }
}
