package org.apache.flink.table.runtime.operators.join.lookup.cache.all;

import java.time.Duration;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.loader.CacheLoaderFactory;
import org.apache.flink.table.runtime.operators.join.lookup.cache.all.reload.manager.ReloadCacheManager;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/all/LookupALLCache.class */
public class LookupALLCache extends LookupCache {
    private static final long serialVersionUID = 4;
    private final ReloadCacheManager reloadCacheManager;
    private transient ScheduledExecutorService scheduledReloadCacheService;
    private volatile transient Throwable reloadFailCause;

    public LookupALLCache(LookupConfig lookupConfig, InputFormat<RowData, ?> inputFormat, CacheLoaderFactory cacheLoaderFactory, RowType rowType) {
        super(lookupConfig);
        this.reloadCacheManager = ReloadCacheManager.forReloadPolicy(lookupConfig, inputFormat, cacheLoaderFactory, rowType);
    }

    public void open(Configuration configuration) throws Exception {
        this.reloadCacheManager.setRuntimeContext(getRuntimeContext());
        this.reloadCacheManager.open(configuration);
        this.reloadCacheManager.setReloadFailCallback(th -> {
            if (this.reloadFailCause == null) {
                this.reloadFailCause = th;
            } else {
                this.reloadFailCause.addSuppressed(th);
            }
        });
        this.reloadCacheManager.run();
        if (this.lookupConfig.getCacheExpireMs() > 0 || this.lookupConfig.getCacheReloadStartTime() != null) {
            if (this.scheduledReloadCacheService == null) {
                this.scheduledReloadCacheService = Executors.newSingleThreadScheduledExecutor();
            }
            long j = 0;
            long millis = TimeUnit.DAYS.toMillis(1L);
            if (this.lookupConfig.getCacheExpireMs() > 0) {
                j = this.lookupConfig.getCacheExpireMs();
                millis = this.lookupConfig.getCacheExpireMs();
            }
            if (this.lookupConfig.getCacheReloadStartTime() == null) {
                this.scheduledReloadCacheService.scheduleWithFixedDelay(this.reloadCacheManager, j, millis, TimeUnit.MILLISECONDS);
                return;
            }
            Temporal cacheReloadStartTime = this.lookupConfig.getCacheReloadStartTime();
            Duration between = Duration.between(cacheReloadStartTime instanceof LocalTime ? LocalTime.now() : OffsetTime.now(((OffsetTime) cacheReloadStartTime).getOffset()), cacheReloadStartTime);
            if (between.isNegative()) {
                between = between.plus(1L, ChronoUnit.DAYS);
            }
            this.scheduledReloadCacheService.scheduleAtFixedRate(this.reloadCacheManager, between.toMillis(), millis, TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public Collection<RowData> getCachedRows(GenericRowData genericRowData) {
        if (this.reloadFailCause != null) {
            throw new RuntimeException("Failed to reload lookup 'ALL' cache.", this.reloadFailCause);
        }
        return this.reloadCacheManager.readRows(genericRowData);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public void putCollectedRows(GenericRowData genericRowData, Collection<RowData> collection) {
        throw new UnsupportedOperationException("'ALL' cache doesn't support public 'put' operation from the outside.");
    }

    public void close() throws Exception {
        this.reloadCacheManager.stopRunning();
        if (this.scheduledReloadCacheService != null) {
            this.scheduledReloadCacheService.shutdown();
            if (!this.scheduledReloadCacheService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                throw new IllegalStateException("Timeout for closing reload cache thread. This should never happen. If you see this, report a bug, please.");
            }
        }
        this.reloadCacheManager.close();
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public long size() {
        return this.reloadCacheManager.getCache().size();
    }

    @VisibleForTesting
    public void setScheduledReloadCacheService(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledReloadCacheService = scheduledExecutorService;
    }
}
