package io.prestosql.server;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.MoreFutures;
import io.airlift.jaxrs.AsyncResponseHandler;
import io.airlift.stats.TimeStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.hetu.core.transport.execution.buffer.SerializedPage;
import io.prestosql.PrestoMediaTypes;
import io.prestosql.execution.TaskId;
import io.prestosql.execution.TaskInfo;
import io.prestosql.execution.TaskManager;
import io.prestosql.execution.TaskState;
import io.prestosql.execution.TaskStatus;
import io.prestosql.execution.buffer.BufferResult;
import io.prestosql.execution.buffer.OutputBuffers;
import io.prestosql.metadata.SessionPropertyManager;
import io.prestosql.operator.TaskStats;
import io.prestosql.protocol.SmileHeader;
import io.prestosql.server.security.SecurityRequireNonNull;
import io.prestosql.spi.Page;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.joda.time.DateTime;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

@Path("/v1/task")
/* loaded from: input_file:io/prestosql/server/TaskResource.class */
public class TaskResource {
    private static final Duration ADDITIONAL_WAIT_TIME = new Duration(5.0d, TimeUnit.SECONDS);
    private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(2.0d, TimeUnit.SECONDS);
    private final TaskManager taskManager;
    private final SessionPropertyManager sessionPropertyManager;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final TimeStat readFromOutputBufferTime = new TimeStat();
    private final TimeStat resultsRequestTime = new TimeStat();

    @Inject
    public TaskResource(TaskManager taskManager, SessionPropertyManager sessionPropertyManager, @ForAsyncHttp BoundedExecutor boundedExecutor, @ForAsyncHttp ScheduledExecutorService scheduledExecutorService) {
        this.taskManager = (TaskManager) Objects.requireNonNull(taskManager, "taskManager is null");
        this.sessionPropertyManager = (SessionPropertyManager) Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.responseExecutor = (Executor) Objects.requireNonNull(boundedExecutor, "responseExecutor is null");
        this.timeoutExecutor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "timeoutExecutor is null");
    }

    @GET
    @Produces({"application/json", SmileHeader.APPLICATION_JACKSON_SMILE})
    public List<TaskInfo> getAllTaskInfo(@Context UriInfo uriInfo) {
        ImmutableList allTaskInfo = this.taskManager.getAllTaskInfo();
        if (shouldSummarize(uriInfo)) {
            allTaskInfo = ImmutableList.copyOf(Iterables.transform(allTaskInfo, (v0) -> {
                return v0.summarize();
            }));
        }
        return allTaskInfo;
    }

    @Path("{taskId}")
    @Consumes({"application/json", SmileHeader.APPLICATION_JACKSON_SMILE})
    @POST
    @Produces({"application/json", SmileHeader.APPLICATION_JACKSON_SMILE})
    public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) {
        try {
            Objects.requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
            TaskInfo updateTask = this.taskManager.updateTask(taskUpdateRequest.getSession().toSession(this.sessionPropertyManager, taskUpdateRequest.getExtraCredentials()), taskId, taskUpdateRequest.getFragment(), taskUpdateRequest.getSources(), taskUpdateRequest.getOutputIds(), taskUpdateRequest.getTotalPartitions(), taskUpdateRequest.getConsumerId(), taskUpdateRequest.getTaskInstanceId());
            if (updateTask == null) {
                return Response.ok().entity(createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), taskUpdateRequest.getTaskInstanceId())).build();
            }
            if (shouldSummarize(uriInfo)) {
                updateTask = updateTask.summarize();
            }
            return Response.ok().entity(updateTask).build();
        } catch (Exception e) {
            return Response.status(Response.Status.BAD_REQUEST).build();
        }
    }

    @GET
    @Produces({"application/json", SmileHeader.APPLICATION_JACKSON_SMILE})
    @Path("{taskId}")
    public void getTaskInfo(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Presto-Current-State") TaskState taskState, @HeaderParam("X-Presto-Max-Wait") Duration duration, @HeaderParam("X-Presto-Task-Instance-Id") String str, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        SecurityRequireNonNull.requireNonNull(taskId, "taskId is null");
        if (taskState == null || duration == null) {
            TaskInfo tryGetTaskInfo = tryGetTaskInfo(uriInfo, taskId, str);
            if (shouldSummarize(uriInfo)) {
                tryGetTaskInfo = tryGetTaskInfo.summarize();
            }
            asyncResponse.resume(tryGetTaskInfo);
            return;
        }
        ListenableFuture<TaskInfo> taskInfo = this.taskManager.getTaskInfo(taskId, taskState, str);
        if (taskInfo == null) {
            asyncResponse.resume(createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), str));
            return;
        }
        ListenableFuture addTimeout = MoreFutures.addTimeout(taskInfo, () -> {
            return tryGetTaskInfo(uriInfo, taskId, null);
        }, randomizeWaitTime(duration), this.timeoutExecutor);
        if (shouldSummarize(uriInfo)) {
            addTimeout = Futures.transform(addTimeout, (v0) -> {
                return v0.summarize();
            }, MoreExecutors.directExecutor());
        }
        AsyncResponseHandler.bindAsyncResponse(asyncResponse, addTimeout, this.responseExecutor).withTimeout(new Duration(r0.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
    }

    private TaskInfo tryGetTaskInfo(UriInfo uriInfo, TaskId taskId, String str) {
        TaskInfo taskInfo = this.taskManager.getTaskInfo(taskId, str);
        if (taskInfo == null) {
            taskInfo = createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), str);
        }
        return taskInfo;
    }

    @GET
    @Produces({"application/json", SmileHeader.APPLICATION_JACKSON_SMILE})
    @Path("{taskId}/status")
    public void getTaskStatus(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Presto-Current-State") TaskState taskState, @HeaderParam("X-Presto-Max-Wait") Duration duration, @HeaderParam("X-Presto-Task-Instance-Id") String str, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        SecurityRequireNonNull.requireNonNull(taskId, "taskId is null");
        if (taskState == null || duration == null) {
            asyncResponse.resume(tryGetTaskStatus(uriInfo, taskId, str));
            return;
        }
        ListenableFuture<TaskStatus> taskStatus = this.taskManager.getTaskStatus(taskId, taskState, str);
        if (taskStatus == null) {
            asyncResponse.resume(createAbortedTaskStatus(taskId, uriInfo.getAbsolutePath(), str));
            return;
        }
        AsyncResponseHandler.bindAsyncResponse(asyncResponse, MoreFutures.addTimeout(taskStatus, () -> {
            return tryGetTaskStatus(uriInfo, taskId, str);
        }, randomizeWaitTime(duration), this.timeoutExecutor), this.responseExecutor).withTimeout(new Duration(r0.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
    }

    private TaskStatus tryGetTaskStatus(UriInfo uriInfo, TaskId taskId, String str) {
        TaskStatus taskStatus = this.taskManager.getTaskStatus(taskId, str);
        if (taskStatus == null) {
            taskStatus = createAbortedTaskStatus(taskId, uriInfo.getAbsolutePath(), str);
        }
        return taskStatus;
    }

    @Produces({"application/json", SmileHeader.APPLICATION_JACKSON_SMILE})
    @Path("{taskId}")
    @DELETE
    public TaskInfo deleteTask(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Presto-Task-Instance-Id") String str, @QueryParam("targetState") String str2, @Context UriInfo uriInfo) {
        SecurityRequireNonNull.requireNonNull(taskId, "taskId is null");
        TaskInfo cancelTask = this.taskManager.cancelTask(taskId, str2 == null ? TaskState.ABORTED : TaskState.valueOf(str2), str);
        if (cancelTask == null) {
            cancelTask = createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), str);
        }
        if (shouldSummarize(uriInfo)) {
            cancelTask = cancelTask.summarize();
        }
        return cancelTask;
    }

    @GET
    @Produces({PrestoMediaTypes.PRESTO_PAGES})
    @Path("{taskId}/results/{bufferId}/{token}")
    public void getResults(@PathParam("taskId") TaskId taskId, @PathParam("bufferId") OutputBuffers.OutputBufferId outputBufferId, @PathParam("token") long j, @HeaderParam("X-Presto-Max-Size") DataSize dataSize, @HeaderParam("X-Presto-Task-Instance-Id") String str, @Suspended AsyncResponse asyncResponse) {
        SecurityRequireNonNull.requireNonNull(taskId, "taskId is null");
        SecurityRequireNonNull.requireNonNull(outputBufferId, "bufferId is null");
        long nanoTime = System.nanoTime();
        ListenableFuture<BufferResult> taskResults = this.taskManager.getTaskResults(taskId, outputBufferId, j, dataSize, str);
        if (taskResults == null) {
            asyncResponse.resume(Response.status(Response.Status.NO_CONTENT).header("X-Presto-Task-Instance-Id", str).header("X-Presto-Page-Sequence-Id", Long.valueOf(j)).header("X-Presto-Page-End-Sequence-Id", Long.valueOf(j)).header("X-Presto-Buffer-Complete", true).build());
            return;
        }
        String taskInstanceId = str == null ? this.taskManager.getTaskInstanceId(taskId) : str;
        ListenableFuture transform = Futures.transform(MoreFutures.addTimeout(taskResults, () -> {
            return BufferResult.emptyResults(taskInstanceId, j, false);
        }, randomizeWaitTime(DEFAULT_MAX_WAIT_TIME), this.timeoutExecutor), bufferResult -> {
            Response.Status status;
            List<SerializedPage> serializedPages = bufferResult.getSerializedPages();
            GenericEntity genericEntity = null;
            if (serializedPages.isEmpty()) {
                status = Response.Status.NO_CONTENT;
            } else {
                genericEntity = new GenericEntity(serializedPages, new TypeToken<List<Page>>() { // from class: io.prestosql.server.TaskResource.1
                }.getType());
                status = Response.Status.OK;
            }
            return Response.status(status).entity(genericEntity).header("X-Presto-Task-Instance-Id", bufferResult.getTaskInstanceId()).header("X-Presto-Page-Sequence-Id", Long.valueOf(bufferResult.getToken())).header("X-Presto-Page-End-Sequence-Id", Long.valueOf(bufferResult.getNextToken())).header("X-Presto-Buffer-Complete", Boolean.valueOf(bufferResult.isBufferComplete())).build();
        }, MoreExecutors.directExecutor());
        AsyncResponseHandler.bindAsyncResponse(asyncResponse, transform, this.responseExecutor).withTimeout(new Duration(r0.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS), Response.status(Response.Status.NO_CONTENT).header("X-Presto-Task-Instance-Id", taskInstanceId).header("X-Presto-Page-Sequence-Id", Long.valueOf(j)).header("X-Presto-Page-End-Sequence-Id", Long.valueOf(j)).header("X-Presto-Buffer-Complete", false).build());
        transform.addListener(() -> {
            this.readFromOutputBufferTime.add(Duration.nanosSince(nanoTime));
        }, MoreExecutors.directExecutor());
        asyncResponse.register(th -> {
            this.resultsRequestTime.add(Duration.nanosSince(nanoTime));
        });
    }

    @GET
    @Path("{taskId}/results/{bufferId}/{token}/acknowledge")
    public void acknowledgeResults(@HeaderParam("X-Presto-Task-Instance-Id") String str, @PathParam("taskId") TaskId taskId, @PathParam("bufferId") OutputBuffers.OutputBufferId outputBufferId, @PathParam("token") long j) {
        SecurityRequireNonNull.requireNonNull(taskId, "taskId is null");
        SecurityRequireNonNull.requireNonNull(outputBufferId, "bufferId is null");
        this.taskManager.acknowledgeTaskResults(taskId, outputBufferId, j, str);
    }

    @Produces({"application/json"})
    @Path("{taskId}/results/{bufferId}")
    @DELETE
    public void abortResults(@HeaderParam("X-Presto-Task-Instance-Id") String str, @PathParam("taskId") TaskId taskId, @PathParam("bufferId") OutputBuffers.OutputBufferId outputBufferId, @Context UriInfo uriInfo) {
        SecurityRequireNonNull.requireNonNull(taskId, "taskId is null");
        SecurityRequireNonNull.requireNonNull(outputBufferId, "bufferId is null");
        this.taskManager.abortTaskResults(taskId, outputBufferId, str);
    }

    @Managed
    @Nested
    public TimeStat getReadFromOutputBufferTime() {
        return this.readFromOutputBufferTime;
    }

    @Managed
    @Nested
    public TimeStat getResultsRequestTime() {
        return this.resultsRequestTime;
    }

    private static boolean shouldSummarize(UriInfo uriInfo) {
        return uriInfo.getQueryParameters().containsKey("summarize");
    }

    private static Duration randomizeWaitTime(Duration duration) {
        long millis = duration.toMillis() / 2;
        return new Duration(millis + ThreadLocalRandom.current().nextLong(millis), TimeUnit.MILLISECONDS);
    }

    private TaskStatus createAbortedTaskStatus(TaskId taskId, URI uri, String str) {
        return TaskStatus.failWith(TaskStatus.initialTaskStatus(taskId, uri, "", str), TaskState.ABORTED, ImmutableList.of());
    }

    private TaskInfo createAbortedTaskInfo(TaskId taskId, URI uri, String str) {
        return TaskInfo.createInitialTask(taskId, uri, "", ImmutableList.of(), new TaskStats(DateTime.now(), null)).withTaskStatus(createAbortedTaskStatus(taskId, uri, str));
    }
}
