package org.apache.flink.streaming.runtime.io.checkpointing;

import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.class */
public class InputProcessorUtil {

    /* renamed from: org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode = new int[CheckpointingMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[CheckpointingMode.EXACTLY_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[CheckpointingMode.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(MailboxExecutor mailboxExecutor, List<IndexedInputGate>[] listArr, TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler checkpointBarrierHandler, StreamConfig streamConfig) {
        registerCheckpointMetrics(taskIOMetricGroup, checkpointBarrierHandler);
        return (CheckpointedInputGate[]) Arrays.stream((InputGate[]) Arrays.stream(listArr).map(InputGateUtil::createInputGate).toArray(i -> {
            return new InputGate[i];
        })).map(inputGate -> {
            return new CheckpointedInputGate(inputGate, checkpointBarrierHandler, mailboxExecutor, streamConfig.isGraphContainingLoops() ? UpstreamRecoveryTracker.NO_OP : UpstreamRecoveryTracker.forInputGate(inputGate));
        }).toArray(i2 -> {
            return new CheckpointedInputGate[i2];
        });
    }

    public static CheckpointBarrierHandler createCheckpointBarrierHandler(CheckpointableTask checkpointableTask, StreamConfig streamConfig, SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, String str, List<IndexedInputGate>[] listArr, List<StreamTaskSourceInput<?>> list, MailboxExecutor mailboxExecutor, TimerService timerService) {
        CheckpointableInput[] checkpointableInputArr = (CheckpointableInput[]) Stream.concat(Arrays.stream(listArr).flatMap((v0) -> {
            return v0.stream();
        }), list.stream()).sorted(Comparator.comparing((v0) -> {
            return v0.getInputGateIndex();
        })).toArray(i -> {
            return new CheckpointableInput[i];
        });
        SystemClock systemClock = SystemClock.getInstance();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[streamConfig.getCheckpointMode().ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
                return createBarrierHandler(checkpointableTask, streamConfig, subtaskCheckpointCoordinator, str, mailboxExecutor, timerService, checkpointableInputArr, systemClock, (int) Arrays.stream(checkpointableInputArr).mapToLong(checkpointableInput -> {
                    return checkpointableInput.getChannelInfos().size();
                }).sum());
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                if (streamConfig.isUnalignedCheckpointsEnabled()) {
                    throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE checkpointing mode");
                }
                return new CheckpointBarrierTracker(Arrays.stream(checkpointableInputArr).mapToInt((v0) -> {
                    return v0.getNumberOfInputChannels();
                }).sum(), checkpointableTask, systemClock, ((Boolean) streamConfig.getConfiguration().get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)).booleanValue());
            default:
                throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + streamConfig.getCheckpointMode());
        }
    }

    private static SingleCheckpointBarrierHandler createBarrierHandler(CheckpointableTask checkpointableTask, StreamConfig streamConfig, SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, String str, MailboxExecutor mailboxExecutor, TimerService timerService, CheckpointableInput[] checkpointableInputArr, Clock clock, int i) {
        boolean booleanValue = ((Boolean) streamConfig.getConfiguration().get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)).booleanValue();
        return streamConfig.isUnalignedCheckpointsEnabled() ? SingleCheckpointBarrierHandler.alternating(str, checkpointableTask, subtaskCheckpointCoordinator, clock, i, createRegisterTimerCallback(mailboxExecutor, timerService), booleanValue, checkpointableInputArr) : SingleCheckpointBarrierHandler.aligned(str, checkpointableTask, clock, i, createRegisterTimerCallback(mailboxExecutor, timerService), booleanValue, checkpointableInputArr);
    }

    private static BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> createRegisterTimerCallback(MailboxExecutor mailboxExecutor, TimerService timerService) {
        return (callable, duration) -> {
            ScheduledFuture registerTimer = timerService.registerTimer(timerService.getCurrentProcessingTime() + duration.toMillis(), j -> {
                mailboxExecutor.submit(callable, "Execute checkpoint barrier handler delayed action");
            });
            return () -> {
                registerTimer.cancel(false);
            };
        };
    }

    private static void registerCheckpointMetrics(TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler checkpointBarrierHandler) {
        checkpointBarrierHandler.getClass();
        taskIOMetricGroup.gauge("checkpointAlignmentTime", checkpointBarrierHandler::getAlignmentDurationNanos);
        checkpointBarrierHandler.getClass();
        taskIOMetricGroup.gauge("checkpointStartDelayNanos", checkpointBarrierHandler::getCheckpointStartDelayNanos);
    }
}
