package org.apache.flink.runtime.state.rescale;

import java.io.OutputStream;
import java.util.Map;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.BackendWritableBroadcastState;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.StateMigrationInfo;

/* loaded from: input_file:org/apache/flink/runtime/state/rescale/OperatorStateMigrator.class */
public class OperatorStateMigrator extends StateMigrator<DefaultOperatorStateBackend> {
    private final DefaultOperatorStateBackend backend;
    private final int subtaskId;
    private final boolean isNewSubtask;
    private final boolean rescaleIncrease;

    public OperatorStateMigrator(DefaultOperatorStateBackend defaultOperatorStateBackend, StateMigrationInfo stateMigrationInfo) {
        super(defaultOperatorStateBackend);
        this.backend = defaultOperatorStateBackend;
        this.subtaskId = stateMigrationInfo.getSubtaskId();
        int oldParallelism = stateMigrationInfo.getTaskMigrationInfo().getOldParallelism();
        int newParallelism = stateMigrationInfo.getTaskMigrationInfo().getNewParallelism();
        this.isNewSubtask = this.subtaskId >= oldParallelism;
        this.rescaleIncrease = newParallelism > oldParallelism;
    }

    @Override // org.apache.flink.runtime.state.rescale.StateMigrator
    public void migrateStatesToStorage(String str, String str2, String str3, int i, RuntimeRescaleStreamFactory runtimeRescaleStreamFactory) throws Exception {
        OutputStream createHiddenStateOutputStream;
        if (this.rescaleIncrease && 0 == this.subtaskId && !this.backend.getRegisteredBroadcastStates().isEmpty()) {
            createHiddenStateOutputStream = runtimeRescaleStreamFactory.createHiddenStateOutputStream(joinPaths(joinPaths(str2, str3), MigrationKeyedStateInfo.toBroadcastPathName(this.subtaskId)));
            try {
                DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(createHiddenStateOutputStream);
                dataOutputViewStreamWrapper.writeInt(this.backend.getRegisteredBroadcastStates().size());
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : this.backend.getRegisteredBroadcastStates().entrySet()) {
                    String key = entry.getKey();
                    BackendWritableBroadcastState<?, ?> value = entry.getValue();
                    dataOutputViewStreamWrapper.writeUTF(key);
                    value.write(createHiddenStateOutputStream);
                }
                createHiddenStateOutputStream.flush();
                if (createHiddenStateOutputStream != null) {
                    createHiddenStateOutputStream.close();
                }
            } finally {
            }
        }
        if (this.isNewSubtask || this.backend.getRegisteredOperatorStates().isEmpty()) {
            return;
        }
        createHiddenStateOutputStream = runtimeRescaleStreamFactory.createHiddenStateOutputStream(joinPaths(joinPaths(str2, str3), MigrationKeyedStateInfo.toOperatorStatePathName(this.subtaskId)));
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(createHiddenStateOutputStream);
            dataOutputViewStreamWrapper2.writeInt(this.backend.getRegisteredOperatorStates().size());
            for (Map.Entry<String, PartitionableListState<?>> entry2 : this.backend.getRegisteredOperatorStates().entrySet()) {
                String key2 = entry2.getKey();
                PartitionableListState<?> value2 = entry2.getValue();
                dataOutputViewStreamWrapper2.writeUTF(key2);
                value2.write(createHiddenStateOutputStream);
            }
            createHiddenStateOutputStream.flush();
            if (createHiddenStateOutputStream != null) {
                createHiddenStateOutputStream.close();
            }
        } finally {
        }
    }
}
