package org.apache.flink.runtime.rest.handler.job.savepoints;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointTriggerHeaders;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.class */
public class SavepointHandlers {

    @Nullable
    private final String defaultSavepointDir;

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers$SavepointHandlerBase.class */
    private static abstract class SavepointHandlerBase<B extends RequestBody> extends AbstractRestHandler<RestfulGateway, B, TriggerResponse, SavepointTriggerMessageParameters> {
        SavepointHandlerBase(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map, MessageHeaders<B, TriggerResponse, SavepointTriggerMessageParameters> messageHeaders) {
            super(gatewayRetriever, time, map, messageHeaders);
        }

        protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<B> handlerRequest) {
            return AsynchronousJobOperationKey.of(extractTriggerId(handlerRequest.getRequestBody()).orElseGet(TriggerId::new), (JobID) handlerRequest.getPathParameter(JobIDPathParameter.class));
        }

        protected abstract Optional<TriggerId> extractTriggerId(B b);

        @Override // org.apache.flink.runtime.rest.handler.AbstractRestHandler
        public CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<B> handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            AsynchronousJobOperationKey createOperationKey = createOperationKey(handlerRequest);
            return triggerOperation(handlerRequest, createOperationKey, restfulGateway).handle((acknowledge, th) -> {
                if (th == null) {
                    return new TriggerResponse(createOperationKey.getTriggerId());
                }
                throw new CompletionException((Throwable) SavepointHandlers.createInternalServerError(th, createOperationKey, "triggering"));
            });
        }

        protected abstract CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<B> handlerRequest, AsynchronousJobOperationKey asynchronousJobOperationKey, RestfulGateway restfulGateway) throws RestHandlerException;
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers$SavepointStatusHandler.class */
    public static class SavepointStatusHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> {
        public SavepointStatusHandler(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map) {
            super(gatewayRetriever, time, map, SavepointStatusHeaders.getInstance());
        }

        @Override // org.apache.flink.runtime.rest.handler.AbstractRestHandler
        public CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            AsynchronousJobOperationKey operationKey = getOperationKey(handlerRequest);
            return restfulGateway.getTriggeredSavepointStatus(operationKey).handle((operationResult, th) -> {
                if (th != null) {
                    throw new CompletionException((Throwable) maybeCreateNotFoundError(th, operationKey).orElseGet(() -> {
                        return SavepointHandlers.createInternalServerError(th, operationKey, "retrieving status of");
                    }));
                }
                switch (operationResult.getStatus()) {
                    case SUCCESS:
                        return AsynchronousOperationResult.completed(operationResultResponse((String) operationResult.getResult()));
                    case FAILURE:
                        return AsynchronousOperationResult.completed(exceptionalOperationResultResponse(operationResult.getThrowable()));
                    case IN_PROGRESS:
                        return AsynchronousOperationResult.inProgress();
                    default:
                        throw new IllegalStateException("No handler for operation status " + operationResult.getStatus() + ", encountered for key " + operationKey);
                }
            });
        }

        private static Optional<RestHandlerException> maybeCreateNotFoundError(Throwable th, AsynchronousJobOperationKey asynchronousJobOperationKey) {
            return ExceptionUtils.findThrowable(th, UnknownOperationKeyException.class).isPresent() ? Optional.of(new RestHandlerException(String.format("There is no savepoint operation with triggerId=%s for job %s.", asynchronousJobOperationKey.getTriggerId(), asynchronousJobOperationKey.getJobId()), HttpResponseStatus.NOT_FOUND)) : Optional.empty();
        }

        protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody> handlerRequest) {
            return AsynchronousJobOperationKey.of((TriggerId) handlerRequest.getPathParameter(TriggerIdPathParameter.class), (JobID) handlerRequest.getPathParameter(JobIDPathParameter.class));
        }

        protected SavepointInfo exceptionalOperationResultResponse(Throwable th) {
            return new SavepointInfo(null, new SerializedThrowable(th));
        }

        protected SavepointInfo operationResultResponse(String str) {
            return new SavepointInfo(str, null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers$SavepointTriggerHandler.class */
    public class SavepointTriggerHandler extends SavepointHandlerBase<SavepointTriggerRequestBody> {
        public SavepointTriggerHandler(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map) {
            super(gatewayRetriever, time, map, SavepointTriggerHeaders.getInstance());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointHandlerBase
        public Optional<TriggerId> extractTriggerId(SavepointTriggerRequestBody savepointTriggerRequestBody) {
            return savepointTriggerRequestBody.getTriggerId();
        }

        @Override // org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointHandlerBase
        protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<SavepointTriggerRequestBody> handlerRequest, AsynchronousJobOperationKey asynchronousJobOperationKey, RestfulGateway restfulGateway) throws RestHandlerException {
            Optional<String> targetDirectory = handlerRequest.getRequestBody().getTargetDirectory();
            if (targetDirectory.isPresent() || SavepointHandlers.this.defaultSavepointDir != null) {
                return restfulGateway.triggerSavepoint(asynchronousJobOperationKey, targetDirectory.orElse(SavepointHandlers.this.defaultSavepointDir), handlerRequest.getRequestBody().getFormatType(), handlerRequest.getRequestBody().isCancelJob() ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT : TriggerSavepointMode.SAVEPOINT, RpcUtils.INF_TIMEOUT);
            }
            throw new RestHandlerException(String.format("Config key [%s] is not set. Property [%s] must be provided.", CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY), HttpResponseStatus.BAD_REQUEST);
        }

        @Override // org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointHandlerBase, org.apache.flink.runtime.rest.handler.AbstractRestHandler
        public /* bridge */ /* synthetic */ CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return super.handleRequest(handlerRequest, restfulGateway);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers$StopWithSavepointHandler.class */
    public class StopWithSavepointHandler extends SavepointHandlerBase<StopWithSavepointRequestBody> {
        public StopWithSavepointHandler(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map) {
            super(gatewayRetriever, time, map, StopWithSavepointTriggerHeaders.getInstance());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointHandlerBase
        public Optional<TriggerId> extractTriggerId(StopWithSavepointRequestBody stopWithSavepointRequestBody) {
            return stopWithSavepointRequestBody.getTriggerId();
        }

        @Override // org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointHandlerBase
        protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<StopWithSavepointRequestBody> handlerRequest, AsynchronousJobOperationKey asynchronousJobOperationKey, RestfulGateway restfulGateway) throws RestHandlerException {
            Optional<String> targetDirectory = handlerRequest.getRequestBody().getTargetDirectory();
            if (targetDirectory.isPresent() || SavepointHandlers.this.defaultSavepointDir != null) {
                return restfulGateway.stopWithSavepoint(asynchronousJobOperationKey, targetDirectory.orElse(SavepointHandlers.this.defaultSavepointDir), handlerRequest.getRequestBody().getFormatType(), handlerRequest.getRequestBody().shouldDrain() ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, RpcUtils.INF_TIMEOUT);
            }
            throw new RestHandlerException(String.format("Config key [%s] is not set. Property [%s] must be provided.", CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), StopWithSavepointRequestBody.FIELD_NAME_TARGET_DIRECTORY), HttpResponseStatus.BAD_REQUEST);
        }

        @Override // org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.SavepointHandlerBase, org.apache.flink.runtime.rest.handler.AbstractRestHandler
        public /* bridge */ /* synthetic */ CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return super.handleRequest(handlerRequest, restfulGateway);
        }
    }

    public SavepointHandlers(@Nullable String str) {
        this.defaultSavepointDir = str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RestHandlerException createInternalServerError(Throwable th, AsynchronousJobOperationKey asynchronousJobOperationKey, String str) {
        return new RestHandlerException(String.format("Internal server error while %s savepoint operation with triggerId=%s for job %s.", str, asynchronousJobOperationKey.getTriggerId(), asynchronousJobOperationKey.getJobId()), HttpResponseStatus.INTERNAL_SERVER_ERROR, th);
    }
}
