package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImpl.class */
public class SequentialChannelStateReaderImpl implements SequentialChannelStateReader {
    private final TaskStateSnapshot taskStateSnapshot;
    private final ChannelStateSerializer serializer = new ChannelStateSerializerImpl();
    private final ChannelStateChunkReader chunkReader = new ChannelStateChunkReader(this.serializer);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImpl$RescaledOffset.class */
    public static class RescaledOffset<Info> {
        final Long offset;
        final Info channelInfo;
        final int oldSubtaskIndex;

        RescaledOffset(Long l, Info info, int i) {
            this.offset = l;
            this.channelInfo = info;
            this.oldSubtaskIndex = i;
        }
    }

    public SequentialChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot) {
        this.taskStateSnapshot = taskStateSnapshot;
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader
    public void readInputData(InputGate[] inputGateArr) throws IOException, InterruptedException {
        InputChannelRecoveredStateHandler inputChannelRecoveredStateHandler = new InputChannelRecoveredStateHandler(inputGateArr, this.taskStateSnapshot.getInputRescalingDescriptor());
        try {
            read(inputChannelRecoveredStateHandler, groupByDelegate(streamSubtaskStates(), (v0) -> {
                return v0.getInputChannelState();
            }));
            inputChannelRecoveredStateHandler.close();
        } catch (Throwable th) {
            try {
                inputChannelRecoveredStateHandler.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader
    public void readInputDataWithoutState(InputGate[] inputGateArr) throws IOException, InterruptedException {
        InputChannelRecoveredStateHandler inputChannelRecoveredStateHandler = new InputChannelRecoveredStateHandler(inputGateArr, this.taskStateSnapshot.getInputRescalingDescriptor());
        try {
            read(inputChannelRecoveredStateHandler, groupByDelegate(streamSubtaskStates(), operatorSubtaskState -> {
                return StateObjectCollection.empty();
            }));
            inputChannelRecoveredStateHandler.close();
        } catch (Throwable th) {
            try {
                inputChannelRecoveredStateHandler.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader
    public void readOutputData(ResultPartitionWriter[] resultPartitionWriterArr, boolean z) throws IOException, InterruptedException {
        ResultSubpartitionRecoveredStateHandler resultSubpartitionRecoveredStateHandler = new ResultSubpartitionRecoveredStateHandler(resultPartitionWriterArr, z, this.taskStateSnapshot.getOutputRescalingDescriptor());
        try {
            read(resultSubpartitionRecoveredStateHandler, groupByDelegate(streamSubtaskStates(), (v0) -> {
                return v0.getResultSubpartitionState();
            }));
            resultSubpartitionRecoveredStateHandler.close();
        } catch (Throwable th) {
            try {
                resultSubpartitionRecoveredStateHandler.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader
    public void readOutputDataWithoutState(ResultPartitionWriter[] resultPartitionWriterArr, boolean z) throws IOException, InterruptedException {
        ResultSubpartitionRecoveredStateHandler resultSubpartitionRecoveredStateHandler = new ResultSubpartitionRecoveredStateHandler(resultPartitionWriterArr, z, this.taskStateSnapshot.getOutputRescalingDescriptor());
        try {
            read(resultSubpartitionRecoveredStateHandler, groupByDelegate(streamSubtaskStates(), operatorSubtaskState -> {
                return StateObjectCollection.empty();
            }));
            resultSubpartitionRecoveredStateHandler.close();
        } catch (Throwable th) {
            try {
                resultSubpartitionRecoveredStateHandler.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private <Info, Context, Handle extends AbstractChannelStateHandle<Info>> void read(RecoveredChannelStateHandler<Info, Context> recoveredChannelStateHandler, Map<StreamStateHandle, List<Handle>> map) throws IOException, InterruptedException {
        for (Map.Entry<StreamStateHandle, List<Handle>> entry : map.entrySet()) {
            readSequentially(entry.getKey(), entry.getValue(), recoveredChannelStateHandler);
        }
    }

    private <Info, Context, Handle extends AbstractChannelStateHandle<Info>> void readSequentially(StreamStateHandle streamStateHandle, List<Handle> list, RecoveredChannelStateHandler<Info, Context> recoveredChannelStateHandler) throws IOException, InterruptedException {
        FSDataInputStream openInputStream = streamStateHandle.openInputStream();
        try {
            this.serializer.readHeader(openInputStream);
            for (RescaledOffset rescaledOffset : extractOffsetsSorted(list)) {
                this.chunkReader.readChunk(openInputStream, rescaledOffset.offset.longValue(), recoveredChannelStateHandler, rescaledOffset.channelInfo, rescaledOffset.oldSubtaskIndex);
            }
            if (openInputStream != null) {
                openInputStream.close();
            }
        } catch (Throwable th) {
            if (openInputStream != null) {
                try {
                    openInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Stream<OperatorSubtaskState> streamSubtaskStates() {
        return this.taskStateSnapshot.getSubtaskStateMappings().stream().map((v0) -> {
            return v0.getValue();
        });
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Map<StreamStateHandle, List<Handle>> groupByDelegate(Stream<OperatorSubtaskState> stream, Function<OperatorSubtaskState, StateObjectCollection<Handle>> function) {
        return (Map) stream.map(function).flatMap((v0) -> {
            return v0.stream();
        }).peek(validate()).collect(Collectors.groupingBy((v0) -> {
            return v0.getDelegate();
        }));
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Consumer<Handle> validate() {
        HashSet hashSet = new HashSet();
        return abstractChannelStateHandle -> {
            if (!hashSet.add(new Tuple2(abstractChannelStateHandle.getInfo(), Integer.valueOf(abstractChannelStateHandle.getSubtaskIndex())))) {
                throw new IllegalStateException("Duplicate channel info: " + abstractChannelStateHandle);
            }
        };
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> List<RescaledOffset<Info>> extractOffsetsSorted(List<Handle> list) {
        return (List) list.stream().flatMap(SequentialChannelStateReaderImpl::extractOffsets).sorted(Comparator.comparingLong(rescaledOffset -> {
            return rescaledOffset.offset.longValue();
        })).collect(Collectors.toList());
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Stream<RescaledOffset<Info>> extractOffsets(Handle handle) {
        return (Stream<RescaledOffset<Info>>) handle.getOffsets().stream().map(l -> {
            return new RescaledOffset(l, handle.getInfo(), handle.getSubtaskIndex());
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader, java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
