package org.apache.flink.streaming.api.operators;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl.class */
public class InternalBacklogAwareTimerServiceManagerImpl<K> extends InternalTimeServiceManagerImpl<K> implements InternalTimeServiceManager<K>, KeyedStateBackend.KeySelectionListener<K> {
    private final Map<String, InternalBacklogAwareTimerServiceImpl<K, ?>> timerServices;
    private boolean backlog;

    InternalBacklogAwareTimerServiceManagerImpl(KeyGroupRange keyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, ProcessingTimeService processingTimeService) {
        super(keyGroupRange, keyContext, priorityQueueSetFactory, processingTimeService);
        this.timerServices = new HashMap();
        this.backlog = false;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl, org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public <N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<K, N> triggerable) {
        InternalBacklogAwareTimerServiceImpl<K, ?> internalBacklogAwareTimerServiceImpl = this.timerServices.get(str);
        if (internalBacklogAwareTimerServiceImpl == null) {
            InternalTimerServiceImpl internalTimerServiceImpl = (InternalTimerServiceImpl) super.getInternalTimerService(str, typeSerializer, typeSerializer2, triggerable);
            internalBacklogAwareTimerServiceImpl = new InternalBacklogAwareTimerServiceImpl<>(internalTimerServiceImpl, new BacklogTimeService(this.processingTimeService, triggerable, internalTimerServiceImpl.getEventTimeTimersQueue()));
            this.timerServices.put(str, internalBacklogAwareTimerServiceImpl);
        }
        return internalBacklogAwareTimerServiceImpl;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl, org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public void advanceWatermark(Watermark watermark) throws Exception {
        Iterator<InternalBacklogAwareTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
        while (it.hasNext()) {
            it.next().advanceWatermark(watermark.getTimestamp());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl, org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream, String str) throws Exception {
        Preconditions.checkState(!this.backlog, "Cannot snapshot state during backlog.");
        super.snapshotToRawKeyedState(keyedStateCheckpointOutputStream, str);
    }

    public static <K> InternalBacklogAwareTimerServiceManagerImpl<K> create(CheckpointableKeyedStateBackend<K> checkpointableKeyedStateBackend, ClassLoader classLoader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> iterable) throws Exception {
        InternalBacklogAwareTimerServiceManagerImpl<K> internalBacklogAwareTimerServiceManagerImpl = new InternalBacklogAwareTimerServiceManagerImpl<>(checkpointableKeyedStateBackend.getKeyGroupRange(), keyContext, checkpointableKeyedStateBackend, processingTimeService);
        checkpointableKeyedStateBackend.registerKeySelectionListener(internalBacklogAwareTimerServiceManagerImpl);
        return internalBacklogAwareTimerServiceManagerImpl;
    }

    public void keySelected(K k) {
        try {
            Iterator<InternalBacklogAwareTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
            while (it.hasNext()) {
                it.next().setCurrentKey(k);
            }
        } catch (Exception e) {
            throw new WrappingRuntimeException(e);
        }
    }

    public void setMaxWatermarkDuringBacklog(long j) {
        Iterator<InternalBacklogAwareTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
        while (it.hasNext()) {
            it.next().setMaxWatermarkDuringBacklog(j);
        }
    }

    public void setBacklog(boolean z) {
        try {
            if (this.backlog == z) {
                return;
            }
            Iterator<InternalBacklogAwareTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
            while (it.hasNext()) {
                it.next().setBacklog(z);
            }
            this.backlog = z;
        } catch (Exception e) {
            throw new WrappingRuntimeException(e);
        }
    }
}
