package org.apache.flink.runtime.state.ttl;

import java.util.Collection;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.TtlCatcherWithNamespace;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;

/* loaded from: input_file:org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.class */
class TtlIncrementalCleanup<K, N, S> {

    @Nonnegative
    private final int cleanupSize;
    private AbstractTtlState<K, N, ?, S, ?> ttlState;
    private InternalKvState.StateIncrementalVisitor<K, N, S> stateIterator;
    private TtlCatcherWithNamespace<K, N, S> ttlCatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TtlIncrementalCleanup(@Nonnegative int i) {
        this.cleanupSize = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stateAccessed() {
        initIteratorIfNot();
        try {
            runCleanup();
        } catch (Throwable th) {
            throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", th);
        }
    }

    private void initIteratorIfNot() {
        if (this.stateIterator == null || this.stateIterator.invalidated() || !this.stateIterator.hasNext()) {
            this.stateIterator = ((InternalKvState) this.ttlState.original).getStateIncrementalVisitor(this.cleanupSize);
        }
    }

    private void runCleanup() {
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= this.cleanupSize || !this.stateIterator.hasNext()) {
                return;
            }
            Collection<StateEntry<K, N, S>> nextEntries = this.stateIterator.nextEntries();
            for (StateEntry<K, N, S> stateEntry : nextEntries) {
                Tuple2<S, S> unexpiredAndExpiredOrNull = this.ttlState.getUnexpiredAndExpiredOrNull(stateEntry.getState());
                if (unexpiredAndExpiredOrNull.f0 == null && unexpiredAndExpiredOrNull.f1 == null) {
                    this.stateIterator.remove(stateEntry);
                } else if (unexpiredAndExpiredOrNull.f0 == null) {
                    if (this.ttlCatcher != null) {
                        this.ttlCatcher.collect(stateEntry.getKey(), stateEntry.getNamespace(), unexpiredAndExpiredOrNull.f1, this.ttlState.timeProvider.currentTimestamp());
                    }
                    this.stateIterator.remove(stateEntry);
                } else if (unexpiredAndExpiredOrNull.f1 != null) {
                    if (this.ttlCatcher != null) {
                        this.ttlCatcher.collect(stateEntry.getKey(), stateEntry.getNamespace(), unexpiredAndExpiredOrNull.f1, this.ttlState.timeProvider.currentTimestamp());
                    }
                    this.stateIterator.update(stateEntry, unexpiredAndExpiredOrNull.f0);
                }
            }
            i = i2 + nextEntries.size();
        }
    }

    public void setTtlState(@Nonnull AbstractTtlState<K, N, ?, S, ?> abstractTtlState) {
        this.ttlState = abstractTtlState;
    }

    public void setTtlCatcher(@Nonnull TtlCatcherWithNamespace<K, N, S> ttlCatcherWithNamespace) {
        this.ttlCatcher = ttlCatcherWithNamespace;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getCleanupSize() {
        return this.cleanupSize;
    }
}
