package org.apache.flink.runtime.state.rescale;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapSnapshotResources;
import org.apache.flink.runtime.state.heap.StateUID;

/* loaded from: input_file:org/apache/flink/runtime/state/rescale/HeapRescalingStateMigrator.class */
public class HeapRescalingStateMigrator<K> extends KeyedStateMigrator<HeapKeyedStateBackend<K>> {
    public HeapRescalingStateMigrator(MigrationKeyedStateInfo migrationKeyedStateInfo, HeapKeyedStateBackend<K> heapKeyedStateBackend) {
        super(migrationKeyedStateInfo, heapKeyedStateBackend);
    }

    @Override // org.apache.flink.runtime.state.rescale.StateMigrator
    public void migrateStatesToStorage(String str, String str2, String str3, int i, RuntimeRescaleStreamFactory runtimeRescaleStreamFactory) throws Exception {
        if (this.migrationKeyedStateInfo.equals(MigrationKeyedStateInfo.EMPTY_MIGRATION_INFO)) {
            return;
        }
        migrateAllStatesToStorage(str2, runtimeRescaleStreamFactory);
    }

    private void migrateAllStatesToStorage(String str, RuntimeRescaleStreamFactory runtimeRescaleStreamFactory) throws Exception {
        HeapSnapshotResources heapSnapshotResources = (HeapSnapshotResources) ((HeapKeyedStateBackend) this.backend).getCheckpointStrategy().syncPrepareResources(0L);
        OutputStream createStateOutputStream = runtimeRescaleStreamFactory.createStateOutputStream(joinPaths(str, this.migrationKeyedStateInfo.toPathName()));
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(createStateOutputStream);
            Map<StateUID, StateSnapshot> cowStateStableSnapshots = heapSnapshotResources.getCowStateStableSnapshots();
            dataOutputViewStreamWrapper.writeInt(cowStateStableSnapshots.size());
            for (Map.Entry<StateUID, StateSnapshot> entry : cowStateStableSnapshots.entrySet()) {
                migrateOneStateToStorage(dataOutputViewStreamWrapper, entry.getKey(), entry.getValue(), this.migrationKeyedStateInfo);
            }
            createStateOutputStream.flush();
            if (createStateOutputStream != null) {
                createStateOutputStream.close();
            }
        } catch (Throwable th) {
            if (createStateOutputStream != null) {
                try {
                    createStateOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void migrateOneStateToStorage(DataOutputViewStreamWrapper dataOutputViewStreamWrapper, StateUID stateUID, StateSnapshot stateSnapshot, MigrationKeyedStateInfo migrationKeyedStateInfo) throws IOException {
        dataOutputViewStreamWrapper.writeUTF(stateUID.getStateName());
        dataOutputViewStreamWrapper.writeShort(stateUID.getStateType().getCode());
        StateSnapshot.StateKeyGroupWriter keyGroupWriter = stateSnapshot.getKeyGroupWriter();
        KeyGroupRange keyGroupRange = migrationKeyedStateInfo.getKeyGroupRange();
        dataOutputViewStreamWrapper.writeInt(keyGroupRange.getNumberOfKeyGroups());
        for (int i = 0; i < keyGroupRange.getNumberOfKeyGroups(); i++) {
            migrateOneKeyGroupToStorage(dataOutputViewStreamWrapper, keyGroupWriter, keyGroupRange.getKeyGroupId(i));
        }
    }

    private static void migrateOneKeyGroupToStorage(DataOutputViewStreamWrapper dataOutputViewStreamWrapper, StateSnapshot.StateKeyGroupWriter stateKeyGroupWriter, int i) throws IOException {
        dataOutputViewStreamWrapper.writeInt(i);
        stateKeyGroupWriter.writeStateInKeyGroup(dataOutputViewStreamWrapper, i);
    }
}
