package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.rescale.RuntimeRescaleEvent;
import org.apache.flink.runtime.rescale.RuntimeRescaleException;
import org.apache.flink.runtime.rescale.RuntimeRescaleMetaData;
import org.apache.flink.runtime.rescale.options.RescaledTaskMigrationInfo;
import org.apache.flink.runtime.state.rescale.RuntimeRescaleStorageWorkerView;
import org.apache.flink.runtime.taskmanager.InputGateWithMetrics;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/DefaultSubtaskRuntimeRescaleEventCoordinator.class */
public class DefaultSubtaskRuntimeRescaleEventCoordinator implements SubtaskRuntimeRescaleEventCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSubtaskRuntimeRescaleEventCoordinator.class);
    private final RuntimeRescaleStorageWorkerView runtimeRescaleStorage;
    private volatile String taskName;
    private final Environment env;
    private final MailboxProcessor mailboxProcessor;
    private final BiConsumer<List<ResultPartitionWriter>, List<ResultPartitionWriter>> resultPartitionsUpdater;
    private final BiConsumer<List<IndexedInputGate>, List<IndexedInputGate>> inputGatesUpdater;
    private CompletableFuture<Void> migrateStatesForStartedByRuntimeRescaleSubtaskFuture;
    private volatile StreamTaskRuntimeRescaleInfo streamTaskRuntimeRescaleInfo = NoChangedStreamTaskRuntimeRescaleInfo.getInstance();
    private long lastRuntimeRescaleEventId = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/DefaultSubtaskRuntimeRescaleEventCoordinator$OldAndNewInputGates.class */
    public static class OldAndNewInputGates {
        private final List<IndexedInputGate> oldInputGates;
        private final List<IndexedInputGate> newInputGates;

        public OldAndNewInputGates(List<IndexedInputGate> list, List<IndexedInputGate> list2) {
            this.oldInputGates = list;
            this.newInputGates = list2;
        }

        public List<IndexedInputGate> getOldInputGates() {
            return Collections.unmodifiableList(this.oldInputGates);
        }

        public List<IndexedInputGate> getNewInputGates() {
            return Collections.unmodifiableList(this.newInputGates);
        }

        public boolean isEmpty() {
            Preconditions.checkState(this.oldInputGates.size() == this.newInputGates.size());
            return this.oldInputGates.isEmpty();
        }

        public static OldAndNewInputGates empty() {
            return new OldAndNewInputGates(Collections.emptyList(), Collections.emptyList());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/DefaultSubtaskRuntimeRescaleEventCoordinator$OldAndNewPartitionsWriters.class */
    public static class OldAndNewPartitionsWriters {
        private final List<ResultPartitionWriter> oldPartitionsWriters;
        private final List<ResultPartitionWriter> newPartitionsWriters;

        public OldAndNewPartitionsWriters(List<ResultPartitionWriter> list, List<ResultPartitionWriter> list2) {
            this.oldPartitionsWriters = list;
            this.newPartitionsWriters = list2;
        }

        public List<ResultPartitionWriter> getOldPartitionsWriters() {
            return Collections.unmodifiableList(this.oldPartitionsWriters);
        }

        public List<ResultPartitionWriter> getNewPartitionsWriters() {
            return Collections.unmodifiableList(this.newPartitionsWriters);
        }

        public boolean isEmpty() {
            Preconditions.checkState(this.oldPartitionsWriters.size() == this.newPartitionsWriters.size());
            return this.oldPartitionsWriters.isEmpty();
        }

        public static OldAndNewPartitionsWriters empty() {
            return new OldAndNewPartitionsWriters(Collections.emptyList(), Collections.emptyList());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultSubtaskRuntimeRescaleEventCoordinator(RuntimeRescaleStorageWorkerView runtimeRescaleStorageWorkerView, String str, Environment environment, MailboxProcessor mailboxProcessor, BiConsumer<List<ResultPartitionWriter>, List<ResultPartitionWriter>> biConsumer, BiConsumer<List<IndexedInputGate>, List<IndexedInputGate>> biConsumer2) {
        this.runtimeRescaleStorage = (RuntimeRescaleStorageWorkerView) Preconditions.checkNotNull(runtimeRescaleStorageWorkerView);
        this.taskName = (String) Preconditions.checkNotNull(str);
        this.env = (Environment) Preconditions.checkNotNull(environment);
        this.mailboxProcessor = (MailboxProcessor) Preconditions.checkNotNull(mailboxProcessor);
        this.resultPartitionsUpdater = biConsumer;
        this.inputGatesUpdater = (BiConsumer) Preconditions.checkNotNull(biConsumer2);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public <OUT> void rescaleSubtask(RuntimeRescaleMetaData runtimeRescaleMetaData, OperatorChain<?, ?> operatorChain, @Nullable StreamInputProcessor streamInputProcessor, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, Supplier<Boolean> supplier) throws Exception {
        Preconditions.checkNotNull(runtimeRescaleMetaData);
        if (this.lastRuntimeRescaleEventId >= runtimeRescaleMetaData.getRuntimeRescaleId()) {
            LOG.info("Out of order runtime rescale event (aborted previously?): {} >= {}", Long.valueOf(this.lastRuntimeRescaleEventId), Long.valueOf(runtimeRescaleMetaData.getRuntimeRescaleId()));
            return;
        }
        this.lastRuntimeRescaleEventId = runtimeRescaleMetaData.getRuntimeRescaleId();
        StreamTaskRuntimeRescaleInfo streamTaskRuntimeRescaleInfo = this.streamTaskRuntimeRescaleInfo;
        OldAndNewPartitionsWriters empty = OldAndNewPartitionsWriters.empty();
        OldAndNewInputGates empty2 = OldAndNewInputGates.empty();
        if (streamTaskRuntimeRescaleInfo.isAffected()) {
            Preconditions.checkState(streamTaskRuntimeRescaleInfo instanceof ModifiedStreamTaskRuntimeRescaleInfo);
            ModifiedStreamTaskRuntimeRescaleInfo modifiedStreamTaskRuntimeRescaleInfo = (ModifiedStreamTaskRuntimeRescaleInfo) streamTaskRuntimeRescaleInfo;
            List<ResultPartitionDeploymentDescriptor> producedPartitionsByRescale = modifiedStreamTaskRuntimeRescaleInfo.getProducedPartitionsByRescale();
            if (producedPartitionsByRescale != null) {
                empty = getPartitionsWritersByRescale(producedPartitionsByRescale, this.env.getAllWriters());
            }
            List<InputGateDeploymentDescriptor> inputGatesByRescale = modifiedStreamTaskRuntimeRescaleInfo.getInputGatesByRescale();
            if (inputGatesByRescale != null) {
                empty2 = getInputGatesByRescale(inputGatesByRescale, this.env.getAllInputGates());
            }
        }
        LOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}", new Object[]{this.taskName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(runtimeRescaleMetaData.getTimestamp()), Long.valueOf(System.currentTimeMillis() - runtimeRescaleMetaData.getTimestamp())});
        operatorChain.broadcastEvent(new RuntimeRescaleEvent(runtimeRescaleMetaData.getRuntimeRescaleId(), runtimeRescaleMetaData.getTimestamp()), false);
        OldAndNewInputGates oldAndNewInputGates = empty2;
        OldAndNewPartitionsWriters oldAndNewPartitionsWriters = empty;
        for (ResultPartitionWriter resultPartitionWriter : oldAndNewPartitionsWriters.getOldPartitionsWriters()) {
            this.env.getTaskEventDispatcher().unregisterPartition(resultPartitionWriter.getPartitionId());
            try {
                resultPartitionWriter.finish();
                resultPartitionWriter.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (!empty.isEmpty()) {
            this.resultPartitionsUpdater.accept(empty.getOldPartitionsWriters(), empty.getNewPartitionsWriters());
            recordWriterDelegate.modifyPartitionStrategy(RescalePartitioner::new);
        }
        if (!oldAndNewInputGates.isEmpty()) {
            this.mailboxProcessor.getMainMailboxExecutor().execute(() -> {
                updateInputs(oldAndNewInputGates);
            }, "Update inputs after runtime rescale");
        }
        if (this.streamTaskRuntimeRescaleInfo.isAffected() && oldAndNewInputGates.isEmpty() && oldAndNewPartitionsWriters.isEmpty()) {
            this.mailboxProcessor.suspend();
        }
        this.streamTaskRuntimeRescaleInfo = NoChangedStreamTaskRuntimeRescaleInfo.getInstance();
    }

    private void updateInputs(OldAndNewInputGates oldAndNewInputGates) {
        Iterator<IndexedInputGate> it = oldAndNewInputGates.getOldInputGates().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (oldAndNewInputGates.isEmpty()) {
            return;
        }
        this.inputGatesUpdater.accept(oldAndNewInputGates.getOldInputGates(), oldAndNewInputGates.getNewInputGates());
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public void migrateOperatorStatesModifiedSubtask(RuntimeRescaleMetaData runtimeRescaleMetaData, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        if (this.streamTaskRuntimeRescaleInfo.isAffected()) {
            Preconditions.checkState(this.streamTaskRuntimeRescaleInfo instanceof ModifiedStreamTaskRuntimeRescaleInfo);
            RescaledTaskMigrationInfo taskMigrationInfo = ((ModifiedStreamTaskRuntimeRescaleInfo) this.streamTaskRuntimeRescaleInfo).getTaskMigrationInfo();
            if (taskMigrationInfo != null) {
                migrateOperatorStates(runtimeRescaleMetaData, taskMigrationInfo, operatorChain, supplier);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public void migrateOperatorStatesNewSubtask(OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        Preconditions.checkState(this.streamTaskRuntimeRescaleInfo instanceof NewStreamTaskRuntimeRescaleInfo);
        NewStreamTaskRuntimeRescaleInfo newStreamTaskRuntimeRescaleInfo = (NewStreamTaskRuntimeRescaleInfo) this.streamTaskRuntimeRescaleInfo;
        RuntimeRescaleMetaData rescaleMetaData = newStreamTaskRuntimeRescaleInfo.getRescaleMetaData();
        RescaledTaskMigrationInfo taskMigrationInfo = newStreamTaskRuntimeRescaleInfo.getTaskMigrationInfo();
        this.migrateStatesForStartedByRuntimeRescaleSubtaskFuture.join();
        migrateOperatorStates(rescaleMetaData, taskMigrationInfo, operatorChain, supplier);
        unsetStartedByRuntimeRescale();
        this.env.acknowledgeRuntimeRescaleEvent(rescaleMetaData.getRuntimeRescaleId());
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public void abortRuntimeRescaleOnAligning(long j, RuntimeRescaleException runtimeRescaleException) {
        LOG.debug("Aborting runtime rescale event {} for task {}", Long.valueOf(j), this.taskName);
        this.lastRuntimeRescaleEventId = Math.max(this.lastRuntimeRescaleEventId, j);
        this.env.declineRuntimeRescaleEvent(j, runtimeRescaleException);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public void setTaskRuntimeRescaleInfoForModifiedSubtask(@Nullable List<InputGateDeploymentDescriptor> list, @Nullable List<ResultPartitionDeploymentDescriptor> list2, @Nullable RescaledTaskMigrationInfo rescaledTaskMigrationInfo) {
        this.streamTaskRuntimeRescaleInfo = new ModifiedStreamTaskRuntimeRescaleInfo(list, list2, rescaledTaskMigrationInfo);
        if (list2 != null) {
            Map map = (Map) Arrays.stream(this.env.getAllWriters()).map(resultPartitionWriter -> {
                return Tuple2.of(resultPartitionWriter.getPartitionId().getPartitionId(), resultPartitionWriter);
            }).collect(Collectors.toMap(tuple2 -> {
                return (IntermediateResultPartitionID) tuple2.f0;
            }, tuple22 -> {
                return (ResultPartitionWriter) tuple22.f1;
            }));
            Iterator<ResultPartitionDeploymentDescriptor> it = list2.iterator();
            while (it.hasNext()) {
                ((ResultPartitionWriter) map.get(it.next().getPartitionId())).markAsWillBeRemovedByRuntimeRescale();
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public void markStartedByRuntimeRescale(CompletableFuture<Void> completableFuture, RuntimeRescaleMetaData runtimeRescaleMetaData, RescaledTaskMigrationInfo rescaledTaskMigrationInfo) {
        this.streamTaskRuntimeRescaleInfo = new NewStreamTaskRuntimeRescaleInfo(runtimeRescaleMetaData, rescaledTaskMigrationInfo);
        this.migrateStatesForStartedByRuntimeRescaleSubtaskFuture = completableFuture;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.SubtaskRuntimeRescaleEventCoordinator
    public boolean shouldBeRescaled() {
        return this.streamTaskRuntimeRescaleInfo.isRescaled();
    }

    private void migrateOperatorStates(@Nonnull RuntimeRescaleMetaData runtimeRescaleMetaData, @Nonnull RescaledTaskMigrationInfo rescaledTaskMigrationInfo, OperatorChain<?, ?> operatorChain, Supplier<Boolean> supplier) throws Exception {
        operatorChain.migrateStates(runtimeRescaleMetaData, rescaledTaskMigrationInfo, this.runtimeRescaleStorage.resolveRuntimeRescaleStorageLocation(runtimeRescaleMetaData.getRuntimeRescaleId()), supplier);
    }

    private void unsetStartedByRuntimeRescale() {
        this.streamTaskRuntimeRescaleInfo = NoChangedStreamTaskRuntimeRescaleInfo.getInstance();
    }

    private OldAndNewPartitionsWriters getPartitionsWritersByRescale(List<ResultPartitionDeploymentDescriptor> list, ResultPartitionWriter[] resultPartitionWriterArr) {
        Preconditions.checkArgument(list.size() <= resultPartitionWriterArr.length);
        Map map = (Map) IntStream.range(0, resultPartitionWriterArr.length).mapToObj(i -> {
            return Tuple2.of(resultPartitionWriterArr[i].getPartitionId().getPartitionId(), Tuple2.of(Integer.valueOf(i), resultPartitionWriterArr[i]));
        }).collect(Collectors.toMap(tuple2 -> {
            return (IntermediateResultPartitionID) tuple2.f0;
        }, tuple22 -> {
            return (Tuple2) tuple22.f1;
        }));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : list) {
            Tuple2 tuple23 = (Tuple2) map.get(resultPartitionDeploymentDescriptor.getPartitionId());
            ResultPartitionWriter resultPartitionWriter = (ResultPartitionWriter) tuple23.f1;
            ResultPartitionWriter createResultPartitionWriter = this.env.getShuffleEnvironment().createResultPartitionWriter(this.env.getTaskShuffleContext(), resultPartitionDeploymentDescriptor, ((Integer) tuple23.f0).intValue());
            resultPartitionWriterArr[((Integer) tuple23.f0).intValue()] = createResultPartitionWriter;
            arrayList.add(resultPartitionWriter);
            arrayList2.add(createResultPartitionWriter);
        }
        return new OldAndNewPartitionsWriters(arrayList, arrayList2);
    }

    private OldAndNewInputGates getInputGatesByRescale(List<InputGateDeploymentDescriptor> list, IndexedInputGate[] indexedInputGateArr) {
        Preconditions.checkArgument(list.size() <= indexedInputGateArr.length);
        Map map = (Map) Arrays.stream(indexedInputGateArr).map(indexedInputGate -> {
            return Tuple2.of(indexedInputGate.getDatasetID(), indexedInputGate);
        }).collect(Collectors.toMap(tuple2 -> {
            return (IntermediateDataSetID) tuple2.f0;
        }, tuple22 -> {
            return (IndexedInputGate) tuple22.f1;
        }));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor : list) {
            IndexedInputGate indexedInputGate2 = (IndexedInputGate) map.get(inputGateDeploymentDescriptor.getConsumedResultId());
            int gateIndex = indexedInputGate2.getGateIndex();
            InputGateWithMetrics inputGateWithMetrics = new InputGateWithMetrics(this.env.getShuffleEnvironment().createInputGate(this.env.getTaskShuffleContext(), this.env.getContainingTask(), inputGateDeploymentDescriptor, gateIndex), this.env.getMetricGroup().getIOMetricGroup().getNumBytesInCounter());
            indexedInputGateArr[gateIndex] = inputGateWithMetrics;
            arrayList.add(indexedInputGate2);
            arrayList2.add(inputGateWithMetrics);
        }
        return new OldAndNewInputGates(arrayList, arrayList2);
    }

    public void setTaskName(String str) {
        this.taskName = str;
    }
}
