package org.apache.flink.table.runtime.functions.table.lookup.reload;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.ThreadSafeSimpleCounter;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/lookup/reload/LookupSetLoader.class */
public abstract class LookupSetLoader<T, M extends MetricGroup> implements AutoCloseable, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(LookupSetLoader.class);
    protected static final long TIMEOUT_AFTER_INTERRUPT_MS = 10000;
    protected final ReloadManager<T, M> reloadManager;
    protected transient FunctionContext functionContext;
    private transient CountDownLatch firstLoadLatch;
    private transient ExecutorService reloadExecutor;
    protected transient Counter loadCounter;
    protected transient Counter loadFailuresCounter;
    protected volatile boolean isStopped;
    private final ReentrantLock reloadLock = new ReentrantLock();
    protected AtomicLong latestLoadTimeMs = new AtomicLong(-1);

    public LookupSetLoader(ReloadManager<T, M> reloadManager) {
        this.reloadManager = reloadManager;
    }

    protected abstract boolean updateSet() throws Exception;

    public void open(FunctionContext functionContext) throws Exception {
        Preconditions.checkNotNull(functionContext.getUserCodeClassLoader(), "User code classloader must be initialized before opening lookup set loader");
        Preconditions.checkNotNull(functionContext.getRuntimeContext(), "Runtime context must be initialized before opening full lookup set loader");
        Preconditions.checkNotNull(functionContext.getConfiguration(), "Configuration must be initialized before opening full lookup set loader");
        this.firstLoadLatch = new CountDownLatch(1);
        this.functionContext = functionContext;
        this.reloadExecutor = Executors.newSingleThreadExecutor(runnable -> {
            Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
            newThread.setName("lookup-set-loader-executor");
            newThread.setContextClassLoader(functionContext.getUserCodeClassLoader());
            return newThread;
        });
    }

    public void initializeMetrics(M m) {
        if (this.loadCounter == null) {
            this.loadCounter = new ThreadSafeSimpleCounter();
        }
        if (this.loadFailuresCounter == null) {
            this.loadFailuresCounter = new ThreadSafeSimpleCounter();
        }
        this.reloadManager.open();
        this.reloadManager.initializeMetrics(m, this.loadCounter, this.loadFailuresCounter, this.latestLoadTimeMs);
    }

    public T getFillingSet() {
        return this.reloadManager.getFillingSet();
    }

    public void awaitFirstLoad() throws InterruptedException {
        this.firstLoadLatch.await();
    }

    public CompletableFuture<Void> reloadAsync() {
        return CompletableFuture.runAsync(this::reload, this.reloadExecutor);
    }

    private void reload() {
        if (this.isStopped) {
            return;
        }
        this.reloadLock.lock();
        String setName = this.reloadManager.getSetName();
        try {
            try {
                LOG.info("Lookup {} loading triggered.", setName);
                long currentTimeMillis = System.currentTimeMillis();
                boolean updateSet = updateSet();
                this.latestLoadTimeMs.set(System.currentTimeMillis() - currentTimeMillis);
                if (updateSet) {
                    this.loadCounter.inc();
                    LOG.info("Lookup {} loading finished. Time elapsed - {} ms. Number of records - {}.", new Object[]{setName, this.latestLoadTimeMs, Long.valueOf(this.reloadManager.numLoadedRows())});
                } else {
                    LOG.info("Active lookup {} reload has been interrupted.", setName);
                }
            } catch (Exception e) {
                this.loadFailuresCounter.inc();
                this.isStopped = true;
                throw new RuntimeException("Failed to reload lookup " + setName, e);
            }
        } finally {
            this.reloadManager.postLoad();
            this.reloadLock.unlock();
            this.firstLoadLatch.countDown();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.isStopped = true;
        if (this.reloadExecutor != null) {
            this.reloadExecutor.shutdownNow();
            if (!this.reloadExecutor.awaitTermination(TIMEOUT_AFTER_INTERRUPT_MS, TimeUnit.MILLISECONDS)) {
                throw new TimeoutException("Lookup reload thread was not terminated after closing in timeout of 10000 ms.");
            }
        }
        this.reloadManager.close();
    }
}
