package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/AlignedController.class */
public class AlignedController implements CheckpointBarrierBehaviourController {
    private final CheckpointableInput[] inputs;
    private final Map<InputChannelInfo, Boolean> blockedChannels;

    public AlignedController(CheckpointableInput... checkpointableInputArr) {
        this.inputs = checkpointableInputArr;
        this.blockedChannels = (Map) Arrays.stream(checkpointableInputArr).flatMap(checkpointableInput -> {
            return checkpointableInput.getChannelInfos().stream();
        }).collect(Collectors.toMap(Function.identity(), inputChannelInfo -> {
            return false;
        }));
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController
    public void barrierReceived(InputChannelInfo inputChannelInfo, CheckpointBarrier checkpointBarrier) {
        Preconditions.checkState(!this.blockedChannels.put(inputChannelInfo, true).booleanValue(), "Stream corrupt: Repeated barrier for same checkpoint on input " + inputChannelInfo);
        this.inputs[inputChannelInfo.getGateIdx()].blockConsumption(inputChannelInfo);
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController
    public boolean preProcessFirstBarrier(InputChannelInfo inputChannelInfo, CheckpointBarrier checkpointBarrier) {
        return false;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController
    public boolean postProcessLastBarrier(InputChannelInfo inputChannelInfo, CheckpointBarrier checkpointBarrier) throws IOException {
        resumeConsumption();
        return true;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController
    public void abortPendingCheckpoint(long j, CheckpointException checkpointException) throws IOException {
        resumeConsumption();
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController
    public void obsoleteBarrierReceived(InputChannelInfo inputChannelInfo, CheckpointBarrier checkpointBarrier) throws IOException {
        resumeConsumption(inputChannelInfo);
    }

    private void resumeConsumption() throws IOException {
        for (Map.Entry<InputChannelInfo, Boolean> entry : this.blockedChannels.entrySet()) {
            if (entry.getValue().booleanValue()) {
                resumeConsumption(entry.getKey());
            }
            entry.setValue(false);
        }
    }

    private void resumeConsumption(InputChannelInfo inputChannelInfo) throws IOException {
        this.inputs[inputChannelInfo.getGateIdx()].resumeConsumption(inputChannelInfo);
    }
}
