package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.util.Iterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.heap.SpillableStateTable;
import org.apache.flink.runtime.state.heap.space.SpaceAllocator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateTableImpl.class */
public class SpillableStateTableImpl<K, N, S> extends SpillableStateTable<K, N, S> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpillableStateTableImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillableStateTableImpl(InternalKeyContext<K> internalKeyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> registeredKeyValueStateBackendMetaInfo, TypeSerializer<K> typeSerializer, SpaceAllocator spaceAllocator, SpillAndLoadManager spillAndLoadManager, SpillableStateTable.SpillStateType spillStateType) {
        this(internalKeyContext, registeredKeyValueStateBackendMetaInfo, typeSerializer, spaceAllocator, spillAndLoadManager, spillStateType, 1000);
    }

    SpillableStateTableImpl(InternalKeyContext<K> internalKeyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> registeredKeyValueStateBackendMetaInfo, TypeSerializer<K> typeSerializer, SpaceAllocator spaceAllocator, SpillAndLoadManager spillAndLoadManager, SpillableStateTable.SpillStateType spillStateType, int i) {
        super(internalKeyContext, registeredKeyValueStateBackendMetaInfo, typeSerializer, spaceAllocator, spillAndLoadManager, spillStateType, i);
    }

    @Override // org.apache.flink.runtime.state.heap.StateTable
    protected StateMap<K, N, S> createStateMap() {
        return createHashStateMap();
    }

    @Override // org.apache.flink.runtime.state.heap.SpillableStateTable
    public void spillState(int i) {
        StateMap<K, N, S> mapForKeyGroup = getMapForKeyGroup(i);
        Preconditions.checkState(mapForKeyGroup instanceof CopyOnWriteStateMap, "Only CopyOnWriteStateMap can be spilled");
        SpillStateMap<K, N, S> createSpillStateMap = createSpillStateMap();
        try {
            transferState(mapForKeyGroup, createSpillStateMap);
            setMapForKeyGroup(i, createSpillStateMap);
        } catch (Exception e) {
            LOG.error("Spill state in key group {} failed", Integer.valueOf(i), e);
            IOUtils.closeQuietly(createSpillStateMap);
            throw e;
        }
    }

    @Override // org.apache.flink.runtime.state.heap.SpillableStateTable
    public void loadState(int i) {
        StateMap<K, N, S> mapForKeyGroup = getMapForKeyGroup(i);
        Preconditions.checkState(mapForKeyGroup instanceof SpillStateMap, "Only SpillStateMap can be loaded");
        CopyOnWriteStateMap<K, N, S> createHashStateMap = createHashStateMap();
        try {
            transferState(mapForKeyGroup, createHashStateMap);
            setMapForKeyGroup(i, createHashStateMap);
            try {
                ((SpillStateMap) mapForKeyGroup).close();
            } catch (Exception e) {
                LOG.error("Failed to close state map for key group {} after load", Integer.valueOf(i), e);
                throw e;
            }
        } catch (Exception e2) {
            LOG.error("Load state in key group {} failed", Integer.valueOf(i), e2);
            throw e2;
        }
    }

    @Override // org.apache.flink.runtime.state.heap.SpillableStateTable
    public float getSpilledRatio() {
        int i = 0;
        for (StateMap<K, N, S> stateMap : this.keyGroupedStateMaps) {
            if (stateMap instanceof SpillStateMap) {
                i++;
            }
        }
        return i / this.keyGroupedStateMaps.length;
    }

    private void transferState(StateMap<K, N, S> stateMap, StateMap<K, N, S> stateMap2) {
        Iterator<StateEntry<K, N, S>> it = stateMap.iterator();
        while (it.hasNext()) {
            StateEntry<K, N, S> next = it.next();
            stateMap2.put(next.getKey(), next.getNamespace(), next.getState());
        }
    }
}
