package org.apache.flink.runtime.state.rescale;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.rescale.migration.RescalingKeyedStateMigration;
import org.apache.flink.runtime.rescale.options.RescaledTaskMigrationInfo;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateMigrationInfo;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.rescale.StateMigratableKeyedStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/rescale/StateMigratableKeyedStateBackend.class */
public abstract class StateMigratableKeyedStateBackend<K, SB extends StateMigratableKeyedStateBackend<K, SB>> extends AbstractKeyedStateBackend<K> implements StateRescalable<SB> {

    @Nullable
    private final RescalingKeyedStateMigration<K, SB> rescalingKeyedStateMigration;
    private final int numberOfTransferingThreads;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/state/rescale/StateMigratableKeyedStateBackend$KeyGroupsToFetchAndPost.class */
    public static class KeyGroupsToFetchAndPost {
        private final List<MigrationKeyedStateInfo> keyGroupsToFetch;
        private final MigrationKeyedStateInfo keyGroupsToPost;

        public KeyGroupsToFetchAndPost(List<MigrationKeyedStateInfo> list, MigrationKeyedStateInfo migrationKeyedStateInfo) {
            this.keyGroupsToFetch = list;
            this.keyGroupsToPost = migrationKeyedStateInfo;
        }

        public List<MigrationKeyedStateInfo> getKeyGroupsToFetch() {
            return Collections.unmodifiableList(this.keyGroupsToFetch);
        }

        public MigrationKeyedStateInfo getKeyGroupsToPost() {
            return this.keyGroupsToPost;
        }
    }

    public StateMigratableKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry closeableRegistry, StreamCompressionDecorator streamCompressionDecorator, InternalKeyContext<K> internalKeyContext, boolean z, int i) {
        super(taskKvStateRegistry, typeSerializer, classLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, closeableRegistry, streamCompressionDecorator, internalKeyContext, z);
        this.numberOfTransferingThreads = i;
        this.rescalingKeyedStateMigration = createRescalingKeyedStateMigration();
    }

    @Override // org.apache.flink.runtime.state.rescale.StateRescalable
    public MigrationStateObject<SB> getStateObjectToMigrate(StateMigrationInfo stateMigrationInfo) {
        KeyGroupRange kgrAfterRescale = stateMigrationInfo.getTaskMigrationInfo().getSubtaskInfoById(stateMigrationInfo.getSubtaskId()).getKgrAfterRescale();
        KeyGroupsToFetchAndPost computeKeyGroupsForMigration = computeKeyGroupsForMigration(stateMigrationInfo);
        return new MigrationStateObject<>(getStateMigrators(computeKeyGroupsForMigration.getKeyGroupsToPost()), getStateExtractors(computeKeyGroupsForMigration.getKeyGroupsToFetch()), kgrAfterRescale);
    }

    protected abstract RescalingKeyedStateMigration<K, SB> createRescalingKeyedStateMigration();

    private StateMigrator<SB> getStateMigrators(MigrationKeyedStateInfo migrationKeyedStateInfo) {
        Preconditions.checkNotNull(this.rescalingKeyedStateMigration);
        return this.rescalingKeyedStateMigration.getStateMigrators(migrationKeyedStateInfo, this, this.numberOfTransferingThreads);
    }

    private StateExtractor<SB> getStateExtractors(List<MigrationKeyedStateInfo> list) {
        Preconditions.checkNotNull(this.rescalingKeyedStateMigration);
        return this.rescalingKeyedStateMigration.getStateExtractors(list, this, this.numberOfTransferingThreads);
    }

    public static KeyGroupsToFetchAndPost computeKeyGroupsForMigration(StateMigrationInfo stateMigrationInfo) {
        RescaledTaskMigrationInfo taskMigrationInfo = stateMigrationInfo.getTaskMigrationInfo();
        return taskMigrationInfo.getNewParallelism() > taskMigrationInfo.getOldParallelism() ? createKeyGroupsToFetchAndPostForUpscaling(stateMigrationInfo) : createKeyGroupsToFetchAndPostForDownscaling(stateMigrationInfo);
    }

    private static KeyGroupsToFetchAndPost createKeyGroupsToFetchAndPostForUpscaling(StateMigrationInfo stateMigrationInfo) {
        List emptyList;
        MigrationKeyedStateInfo migrationKeyedStateInfo;
        int subtaskId = stateMigrationInfo.getSubtaskId();
        RescaledTaskMigrationInfo taskMigrationInfo = stateMigrationInfo.getTaskMigrationInfo();
        if (subtaskId >= taskMigrationInfo.getOldParallelism()) {
            migrationKeyedStateInfo = MigrationKeyedStateInfo.EMPTY_MIGRATION_INFO;
            KeyGroupRange kgrAfterRescale = taskMigrationInfo.getSubtaskInfoById(subtaskId).getKgrAfterRescale();
            emptyList = (List) taskMigrationInfo.getSubtaskInfos().stream().filter(rescaledSubtaskMigrationInfo -> {
                return rescaledSubtaskMigrationInfo.getKgrBeforeRescale().hasIntersection(kgrAfterRescale);
            }).map(rescaledSubtaskMigrationInfo2 -> {
                return new MigrationKeyedStateInfo(rescaledSubtaskMigrationInfo2.getIdx(), rescaledSubtaskMigrationInfo2.getKgrBeforeRescale());
            }).filter(migrationKeyedStateInfo2 -> {
                return !migrationKeyedStateInfo2.getKeyGroupRange().equals(kgrAfterRescale);
            }).collect(Collectors.toList());
        } else {
            KeyGroupRange kgrBeforeRescale = taskMigrationInfo.getSubtaskInfoById(subtaskId).getKgrBeforeRescale();
            KeyGroupRange kgrAfterRescale2 = taskMigrationInfo.getSubtaskInfoById(subtaskId).getKgrAfterRescale();
            emptyList = subtaskId == 0 ? Collections.emptyList() : (List) taskMigrationInfo.getSubtaskInfos().stream().filter(rescaledSubtaskMigrationInfo3 -> {
                return rescaledSubtaskMigrationInfo3.getKgrBeforeRescale().hasIntersection(kgrAfterRescale2);
            }).map(rescaledSubtaskMigrationInfo4 -> {
                return new MigrationKeyedStateInfo(rescaledSubtaskMigrationInfo4.getIdx(), rescaledSubtaskMigrationInfo4.getKgrBeforeRescale());
            }).filter(migrationKeyedStateInfo3 -> {
                return !migrationKeyedStateInfo3.getKeyGroupRange().equals(kgrBeforeRescale);
            }).collect(Collectors.toList());
            migrationKeyedStateInfo = new MigrationKeyedStateInfo(subtaskId, kgrBeforeRescale);
        }
        return new KeyGroupsToFetchAndPost(emptyList, migrationKeyedStateInfo);
    }

    private static KeyGroupsToFetchAndPost createKeyGroupsToFetchAndPostForDownscaling(StateMigrationInfo stateMigrationInfo) {
        List list;
        MigrationKeyedStateInfo migrationKeyedStateInfo;
        int subtaskId = stateMigrationInfo.getSubtaskId();
        RescaledTaskMigrationInfo taskMigrationInfo = stateMigrationInfo.getTaskMigrationInfo();
        if (taskMigrationInfo.getSubtaskInfos().get(subtaskId).isRemovedState()) {
            list = Collections.emptyList();
            migrationKeyedStateInfo = new MigrationKeyedStateInfo(subtaskId, taskMigrationInfo.getSubtaskInfoById(subtaskId).getKgrBeforeRescale());
        } else {
            KeyGroupRange kgrBeforeRescale = taskMigrationInfo.getSubtaskInfoById(subtaskId).getKgrBeforeRescale();
            KeyGroupRange kgrAfterRescale = taskMigrationInfo.getSubtaskInfoById(subtaskId).getKgrAfterRescale();
            list = (List) taskMigrationInfo.getSubtaskInfos().stream().filter(rescaledSubtaskMigrationInfo -> {
                return rescaledSubtaskMigrationInfo.getKgrBeforeRescale().hasIntersection(kgrAfterRescale);
            }).map(rescaledSubtaskMigrationInfo2 -> {
                return new MigrationKeyedStateInfo(rescaledSubtaskMigrationInfo2.getIdx(), rescaledSubtaskMigrationInfo2.getKgrBeforeRescale());
            }).filter(migrationKeyedStateInfo2 -> {
                return !migrationKeyedStateInfo2.getKeyGroupRange().equals(kgrBeforeRescale);
            }).collect(Collectors.toList());
            migrationKeyedStateInfo = (subtaskId == 0 || kgrAfterRescale.contains(kgrBeforeRescale)) ? MigrationKeyedStateInfo.EMPTY_MIGRATION_INFO : new MigrationKeyedStateInfo(subtaskId, kgrBeforeRescale);
        }
        return new KeyGroupsToFetchAndPost(list, migrationKeyedStateInfo);
    }

    public void updateKeyGroupRange(KeyGroupRange keyGroupRange) {
        if (this.keyGroupRange.equals(keyGroupRange)) {
            return;
        }
        updateKeyGroupRangeImpl(keyGroupRange);
        this.keyGroupRange = keyGroupRange;
        Preconditions.checkState(this.keyContext instanceof InternalKeyContextImpl, "Key context must be InternalKeyContextImpl instance.");
        ((InternalKeyContextImpl) this.keyContext).setKeyGroupRange(keyGroupRange);
    }

    protected abstract void updateKeyGroupRangeImpl(KeyGroupRange keyGroupRange);
}
