package org.apache.flink.runtime.rescaling;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rescaling/JobTransition.class */
public class JobTransition {
    protected final Logger log;
    protected JobGraph from;
    protected JobGraph to;
    protected State state;
    private String latestSavepoint;
    private int attempt;
    private int maxAttempts;
    private boolean reachedTerminalState;
    private final Dispatcher dispatcher;
    private final ComponentMainThreadExecutor dispatcherExecutor;
    private List<Decision> decisionList;
    private List<ActionResult> actionList;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.rescaling.JobTransition$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/rescaling/JobTransition$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$JobStatus = new int[JobStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.RESTARTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.CREATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FINISHED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FAILING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.INITIALIZING.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision = new int[Decision.values().length];
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.STOP_DEFAULT.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.RUN_REDEPLOYED.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.WATCH_REDEPLOYED.ordinal()] = 3;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.ROLLBACK.ordinal()] = 4;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.CANCEL_TRANSITION.ordinal()] = 5;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.FINISH_TRANSITION.ordinal()] = 6;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[Decision.SLEEP.ordinal()] = 7;
            } catch (NoSuchFieldError e14) {
            }
            $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State = new int[State.values().length];
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.DEFAULT_RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.DEFAULT_STOPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.REDEPLOYED_STARTED.ordinal()] = 3;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.REDEPLOYED_FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.REDEPLOYED_RUNNING.ordinal()] = 5;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.REDEPLOYED_FINISHED.ordinal()] = 6;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[State.DEFAULT_RESTORED.ordinal()] = 7;
            } catch (NoSuchFieldError e21) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rescaling/JobTransition$ActionResult.class */
    public static class ActionResult {
        boolean successful;
        String message;
        Throwable exception;

        private ActionResult(boolean z, String str, Throwable th) {
            this.successful = z;
            this.message = str;
            this.exception = th;
        }

        public boolean isSuccessful() {
            return this.successful;
        }

        public String getMessage() {
            return this.message;
        }

        public Throwable getException() {
            return this.exception;
        }

        public String toString() {
            return "ActionResult{successful=" + this.successful + ", message='" + this.message + "', exception=" + this.exception + '}';
        }

        /* synthetic */ ActionResult(boolean z, String str, Throwable th, AnonymousClass1 anonymousClass1) {
            this(z, str, th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rescaling/JobTransition$Decision.class */
    public enum Decision {
        STOP_DEFAULT,
        RUN_REDEPLOYED,
        WATCH_REDEPLOYED,
        ROLLBACK,
        CANCEL_TRANSITION,
        FINISH_TRANSITION,
        SLEEP
    }

    /* loaded from: input_file:org/apache/flink/runtime/rescaling/JobTransition$State.class */
    public enum State {
        DEFAULT_RUNNING,
        DEFAULT_STOPPED,
        REDEPLOYED_STARTED,
        REDEPLOYED_RUNNING,
        REDEPLOYED_FAILED,
        REDEPLOYED_FINISHED,
        DEFAULT_RESTORED
    }

    public JobTransition(JobGraph jobGraph, JobGraph jobGraph2, Dispatcher dispatcher, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.log = LoggerFactory.getLogger(getClass());
        this.attempt = 0;
        this.maxAttempts = 3;
        this.reachedTerminalState = false;
        this.decisionList = new ArrayList();
        this.actionList = new ArrayList();
        this.from = jobGraph;
        this.to = jobGraph2;
        this.state = State.DEFAULT_RUNNING;
        this.dispatcher = dispatcher;
        this.dispatcherExecutor = componentMainThreadExecutor;
    }

    public JobTransition(JobGraph jobGraph, JobGraph jobGraph2, Dispatcher dispatcher, int i, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this(jobGraph, jobGraph2, dispatcher, componentMainThreadExecutor);
        this.maxAttempts = i;
    }

    Decision decideNextAction(State state) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$State[state.ordinal()]) {
            case 1:
                return decideByAttempt(Decision.STOP_DEFAULT, Decision.CANCEL_TRANSITION);
            case 2:
                return decideByAttempt(Decision.RUN_REDEPLOYED, Decision.ROLLBACK);
            case 3:
                return Decision.WATCH_REDEPLOYED;
            case QueryScopeInfo.INFO_CATEGORY_OPERATOR /* 4 */:
                return Decision.ROLLBACK;
            case 5:
            case 6:
            case 7:
                return Decision.FINISH_TRANSITION;
            default:
                return Decision.SLEEP;
        }
    }

    ActionResult performAction(Decision decision) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[decision.ordinal()]) {
            case 1:
                return stopDefaultJob();
            case 2:
                return runRedeployedJob();
            case 3:
                return watchRedeployed();
            case QueryScopeInfo.INFO_CATEGORY_OPERATOR /* 4 */:
                return rollback();
            case 5:
                return cancelTransition();
            case 6:
                return finishTransition();
            case 7:
            default:
                return sleep(5000L);
        }
    }

    public boolean start() {
        while (!isReachedTerminalState()) {
            Decision decideNextAction = decideNextAction(this.state);
            ActionResult performAction = performAction(decideNextAction);
            this.decisionList.add(decideNextAction);
            this.actionList.add(performAction);
            this.log.info("Job transformation ActionResult: " + performAction.toString());
            handleResults(decideNextAction, performAction);
        }
        return isTransitionSuccessful();
    }

    private void handleResults(Decision decision, ActionResult actionResult) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$rescaling$JobTransition$Decision[decision.ordinal()]) {
            case 1:
                mutateState(actionResult, State.DEFAULT_STOPPED);
                return;
            case 2:
                mutateState(actionResult, State.REDEPLOYED_STARTED);
                return;
            case 3:
                mutateToFail(actionResult, State.REDEPLOYED_RUNNING, State.REDEPLOYED_FAILED);
                return;
            case QueryScopeInfo.INFO_CATEGORY_OPERATOR /* 4 */:
                mutateState(actionResult, State.DEFAULT_RESTORED);
                return;
            case 5:
            case 6:
            case 7:
            default:
                return;
        }
    }

    private Decision decideByAttempt(Decision decision, Decision decision2) {
        return this.attempt < this.maxAttempts ? decision : decision2;
    }

    private void mutateToFail(ActionResult actionResult, State state, State state2) {
        if (actionResult.successful) {
            this.state = state;
        } else {
            this.attempt++;
            this.state = state2;
        }
        if (actionResult.exception != null) {
            actionResult.exception.printStackTrace();
        }
    }

    private void mutateState(ActionResult actionResult, State state) {
        if (actionResult.successful) {
            this.state = state;
        } else {
            this.attempt++;
            silentSleep(5000L);
        }
        if (actionResult.exception != null) {
            actionResult.exception.printStackTrace();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("JobTransition(").append(this.from.getJobID()).append(" -> ").append(this.to.getJobID()).append(") log: \n");
        for (int i = 0; i < this.decisionList.size(); i++) {
            ActionResult actionResult = this.actionList.get(i);
            sb.append(this.decisionList.get(i).name()).append(" | ").append(actionResult.successful).append(" | ").append(actionResult.message);
            sb.append(actionResult.exception != null ? " | " + actionResult.exception.getMessage() : "").append("\n");
        }
        sb.append("Current state is ").append(this.state.name()).append(" and it is ").append(this.reachedTerminalState ? "terminal" : "not terminal").append("\n");
        return sb.toString();
    }

    public boolean isReachedTerminalState() {
        return this.reachedTerminalState;
    }

    public boolean isTransitionSuccessful() {
        if (this.state == State.REDEPLOYED_RUNNING || this.state == State.REDEPLOYED_FINISHED) {
            return true;
        }
        return (this.state == State.DEFAULT_RUNNING || this.state == State.DEFAULT_STOPPED || this.state == State.REDEPLOYED_STARTED) ? false : false;
    }

    private ActionResult stopDefaultJob() {
        Time of = Time.of(10L, TimeUnit.MINUTES);
        try {
            JobID jobID = this.from.getJobID();
            String str = stopWithSavepointOnMainThreadExecutor(jobID, of).get();
            this.dispatcher.getJobTerminationFuture(jobID).get(60L, TimeUnit.SECONDS);
            this.log.info("Job {} was successfully terminated", jobID);
            this.latestSavepoint = str;
            return new ActionResult(true, String.format("Job %1$s was stopped with savepoint saved in %2$s", jobID, str), null, null);
        } catch (CancellationException e) {
            return new ActionResult(false, "Process of stopping job with savepoint was interrupted or cancelled.", e, null);
        } catch (Exception e2) {
            return new ActionResult(false, "Exception during stopping with savepoint.", e2, null);
        }
    }

    private ActionResult runRedeployedJob() {
        Time of = Time.of(60L, TimeUnit.SECONDS);
        try {
            String str = this.latestSavepoint;
            if (str == null) {
                throw new IllegalStateException("Rescaled job cannot be run if savepoint is null!");
            }
            this.to.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false));
            runJobOnMainThreadExecutor(this.to, of).get();
            ActionResult actionResult = null;
            long currentTimeMillis = System.currentTimeMillis();
            while (actionResult == null && System.currentTimeMillis() - currentTimeMillis < 60000) {
                actionResult = (ActionResult) requestJobStatusOnMainThreadExecutor(this.to.getJobID(), of).handle((jobStatus, th) -> {
                    if (th != null) {
                        return new ActionResult(false, "Could not submit rescaled job.", th.getCause(), null);
                    }
                    switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$JobStatus[jobStatus.ordinal()]) {
                        case 5:
                        case 6:
                            return new ActionResult(false, String.format("Failed to start rescaled job with id %s", this.to.getJobID()), null, null);
                        case 7:
                            silentSleep(100L);
                            return null;
                        default:
                            return new ActionResult(true, String.format("Rescaled job with id %s submitted successfully", this.to.getJobID()), null, null);
                    }
                }).get();
            }
            if (actionResult != null) {
                return actionResult;
            }
            cancelJobOnMainThreadExecutor(this.to.getJobID(), of);
            return new ActionResult(false, "Rescaled job stuck at initialization", null, null);
        } catch (InterruptedException | CancellationException e) {
            return new ActionResult(false, "Process of submitting rescaled job from savepoint was interrupted or cancelled.", e, null);
        } catch (ExecutionException e2) {
            return new ActionResult(false, "Exception during submitting rescaled job from savepoint.", e2.getCause(), null);
        } catch (Throwable th2) {
            return new ActionResult(false, th2.getMessage(), th2, null);
        }
    }

    private ActionResult watchRedeployed() {
        Time of = Time.of(10L, TimeUnit.SECONDS);
        JobID jobID = this.to.getJobID();
        long currentTimeMillis = System.currentTimeMillis();
        State state = State.REDEPLOYED_STARTED;
        Throwable th = null;
        while (state == State.REDEPLOYED_STARTED && System.currentTimeMillis() - currentTimeMillis < 30000) {
            try {
                state = (State) requestJobStatusOnMainThreadExecutor(jobID, of).thenApply(jobStatus -> {
                    switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$JobStatus[jobStatus.ordinal()]) {
                        case 1:
                            return State.REDEPLOYED_RUNNING;
                        case 2:
                        case 3:
                        default:
                            silentSleep(3000L);
                            return State.REDEPLOYED_STARTED;
                        case QueryScopeInfo.INFO_CATEGORY_OPERATOR /* 4 */:
                            return State.REDEPLOYED_FINISHED;
                        case 5:
                            return State.REDEPLOYED_FAILED;
                    }
                }).get();
            } catch (InterruptedException | CancellationException e) {
                silentSleep(3000L);
            } catch (ExecutionException e2) {
                th = ExceptionUtils.firstOrSuppressed(e2.getCause(), th);
                silentSleep(3000L);
            }
        }
        if (state != State.REDEPLOYED_STARTED) {
            return new ActionResult(true, String.format("Rescaled job %1$s successfully changed it state. It's %2$s now.", this.to.getJobID(), state), th, null);
        }
        try {
            return new ActionResult(false, String.format("Rescaled job did not start running after %1$s ms. Current status is %2$s.", 30000L, requestJobStatusOnMainThreadExecutor(jobID, of).get()), th, null);
        } catch (Exception e3) {
            return new ActionResult(false, String.format("Rescaled job did not start running after %1$s ms.Can't retrieve current status.", 30000L), th, null);
        }
    }

    private ActionResult rollback() {
        Time of = Time.of(10L, TimeUnit.SECONDS);
        Time of2 = Time.of(60L, TimeUnit.SECONDS);
        Time of3 = Time.of(120L, TimeUnit.SECONDS);
        JobID jobID = this.to.getJobID();
        Throwable th = null;
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        while (!z && System.currentTimeMillis() - currentTimeMillis < 60000) {
            try {
                z = ((Boolean) requestJobStatusOnMainThreadExecutor(jobID, of).handle((jobStatus, th2) -> {
                    if (jobStatus == JobStatus.FAILED || (th2.getCause() instanceof FlinkJobNotFoundException)) {
                        return true;
                    }
                    switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$JobStatus[jobStatus.ordinal()]) {
                        case 1:
                        case 2:
                        case 3:
                            cancelJobOnMainThreadExecutor(jobID, of3);
                            return false;
                        default:
                            return true;
                    }
                }).get()).booleanValue();
            } catch (InterruptedException | CancellationException e) {
                silentSleep(3000L);
            } catch (ExecutionException e2) {
                th = ExceptionUtils.firstOrSuppressed(e2.getCause(), th);
                silentSleep(3000L);
            }
        }
        if (!z) {
            try {
                return new ActionResult(false, String.format("Rescaled job failed to stop after %1$s ms. Current status is %2$s.", 60000L, requestJobStatusOnMainThreadExecutor(jobID, of).get()), th, null);
            } catch (Exception e3) {
                return new ActionResult(false, String.format("Rescaled job failed to stop after %1$s ms. Can't retrieve current status.", 60000L), th, null);
            }
        }
        try {
            this.from.setSavepointRestoreSettings(this.to.getSavepointRestoreSettings());
            runJobOnMainThreadExecutor(this.from, of2).get();
            return new ActionResult(true, String.format("Restored job with id %s submitted successfully", this.from.getJobID()), th, null);
        } catch (InterruptedException | CancellationException e4) {
            return new ActionResult(false, "Process of restoring job with savepoint was interrupted or cancelled.", ExceptionUtils.firstOrSuppressed(e4, th), null);
        } catch (ExecutionException e5) {
            return new ActionResult(false, "Exception during submitting restored job from savepoint.", ExceptionUtils.firstOrSuppressed(e5.getCause(), th), null);
        }
    }

    private ActionResult cancelTransition() {
        this.reachedTerminalState = true;
        return new ActionResult(true, "Job transition(rescaling) cancelled because of errors.", null, null);
    }

    private ActionResult finishTransition() {
        this.reachedTerminalState = true;
        return new ActionResult(true, "Job transition(rescaling) was SUCCESSFUL!", null, null);
    }

    private void silentSleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private ActionResult sleep(long j) {
        silentSleep(j);
        return new ActionResult(true, String.format("Slept for %s milliseconds.", Long.valueOf(j)), null, null);
    }

    public boolean isScaleDown() {
        return RedeploymentUtils.calculateSlotRequirements(this.to).intValue() <= RedeploymentUtils.calculateSlotRequirements(this.from).intValue();
    }

    @VisibleForTesting
    protected Tuple2<List<Decision>, List<ActionResult>> getDecisionActionList() {
        return new Tuple2<>(this.decisionList, this.actionList);
    }

    @VisibleForTesting
    protected Map<Decision, ActionResult> getDecisionActionMap() {
        Stream<Integer> boxed = IntStream.range(0, this.decisionList.size()).boxed();
        List<Decision> list = this.decisionList;
        list.getClass();
        Function function = (v1) -> {
            return r1.get(v1);
        };
        List<ActionResult> list2 = this.actionList;
        list2.getClass();
        return (Map) boxed.collect(Collectors.toMap(function, (v1) -> {
            return r2.get(v1);
        }));
    }

    private CompletableFuture<String> stopWithSavepointOnMainThreadExecutor(JobID jobID, Time time) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return this.dispatcher.stopWithSavepointAndGetLocation(jobID, null, SavepointFormatType.DEFAULT, TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT, time).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new CompletionException(e);
            }
        }, this.dispatcherExecutor);
    }

    private CompletableFuture<Void> runJobOnMainThreadExecutor(JobGraph jobGraph, Time time) {
        return CompletableFuture.runAsync(() -> {
            this.dispatcher.submitJob(jobGraph, time);
        }, this.dispatcherExecutor);
    }

    private CompletableFuture<JobStatus> requestJobStatusOnMainThreadExecutor(JobID jobID, Time time) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return this.dispatcher.requestJobStatus(jobID, time).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new CompletionException(e);
            }
        }, this.dispatcherExecutor);
    }

    private CompletableFuture<Void> cancelJobOnMainThreadExecutor(JobID jobID, Time time) {
        return CompletableFuture.runAsync(() -> {
            this.dispatcher.cancelJob(jobID, time);
        }, this.dispatcherExecutor);
    }
}
