package org.apache.kafka.connect.runtime.rest;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.huawei.cdc.common.SourceTaskInfoCache;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.metadata.client.ConnectorClient;
import com.huawei.cdc.common.rest.validation.EndpointInputValidator;
import com.huawei.cdc.common.task.RestartTaskResponse;
import com.huawei.cdc.common.util.RestartTaskUtils;
import java.io.IOException;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.common.MetricsConstants;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.logutil.LogParser;
import org.apache.kafka.connect.runtime.model.metrics.SinkTaskAndTableMetrics;
import org.apache.kafka.connect.runtime.model.metrics.SourceTaskAndTableMetrics;
import org.apache.kafka.connect.runtime.model.task.IdentifierInfo;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Path("/connectors")
@Consumes({"application/json"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/rest/ConnectorsResourceExtension.class */
public class ConnectorsResourceExtension {
    public static final long REQUEST_TIMEOUT_MS = 90000;
    private static final Logger log = LoggerFactory.getLogger(ConnectorsResourceExtension.class);
    private final Herder herder;
    private final WorkerConfig config;
    private final JobExecutionStats jobExecutionStats;
    private final CDCMetrics cdcMetrics;
    private final RestartTaskInfo restartTaskInfo;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/rest/ConnectorsResourceExtension$IdentityTranslator.class */
    public static class IdentityTranslator<T> implements Translator<T, T> {
        private IdentityTranslator() {
        }

        @Override // org.apache.kafka.connect.runtime.rest.ConnectorsResourceExtension.Translator
        public T translate(RestClient.HttpResponse<T> httpResponse) {
            return (T) httpResponse.body();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/rest/ConnectorsResourceExtension$Translator.class */
    public interface Translator<T, U> {
        T translate(RestClient.HttpResponse<U> httpResponse);
    }

    public ConnectorsResourceExtension(Herder herder, WorkerConfig workerConfig) throws IOException {
        this.herder = herder;
        this.config = workerConfig;
        this.cdcMetrics = new CDCMetrics(herder);
        this.restartTaskInfo = new RestartTaskInfo(herder);
        this.jobExecutionStats = new JobExecutionStats(herder);
    }

    @POST
    @Path("/{connector}/tasks/{taskId}/restart_task")
    public String restartTask(@PathParam("connector") String str, @PathParam("taskId") Integer num, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool, byte[] bArr) throws Exception {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        return this.restartTaskInfo.restartTask(str, num, httpHeaders, bArr);
    }

    @POST
    @Path("/{connector}/tasks/{taskId}/cache_details")
    public Map<String, String> getCache(@PathParam("connector") String str, @PathParam("taskId") Integer num, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        HashMap hashMap = new HashMap();
        String identifier = SourceTaskInfoCache.getIdentifier(str, String.valueOf(num));
        String successfulIdentifier = SourceTaskInfoCache.getSuccessfulIdentifier(str, String.valueOf(num));
        RestartTaskUtils.updateDatastoreType(identifier, hashMap);
        hashMap.put("failed_identifier", identifier);
        hashMap.put("last_identifier", successfulIdentifier);
        return hashMap;
    }

    @POST
    @Path("/{connector}/tasks/{taskId}/restart_internal_task")
    public String restartInternalTask(@PathParam("connector") String str, @PathParam("taskId") Integer num, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool, byte[] bArr) throws Exception {
        String str2;
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        Map map = null;
        if (bArr.length > 0) {
            map = (Map) new ObjectMapper().readValue(bArr, new TypeReference<Map<String, Object>>() { // from class: org.apache.kafka.connect.runtime.rest.ConnectorsResourceExtension.1
            });
            if (map.get("type").toString().equalsIgnoreCase("oracle")) {
                SourceTaskInfoCache.putRestartIdentifier(str, String.valueOf(num), String.valueOf(map.get("identifierJson")));
            }
        }
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(new ConnectorTaskId(str, num.intValue()), futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks/" + num + "/restart", "POST", httpHeaders, null, bool);
        if (map != null && map.get("type").toString().equalsIgnoreCase("oracle")) {
            String restartStatusIdentifier = SourceTaskInfoCache.getRestartStatusIdentifier(str, String.valueOf(num), String.valueOf(map.get("identifierJson")));
            while (true) {
                str2 = restartStatusIdentifier;
                if (!"NEW".equals(str2)) {
                    break;
                }
                restartStatusIdentifier = SourceTaskInfoCache.getRestartStatusIdentifier(str, String.valueOf(num), String.valueOf(map.get("identifierJson")));
            }
            if ("FAILED".equals(str2)) {
                return RestartTaskUtils.jsonConverter("status", "FAILED");
            }
            if ("STARTED".equals(str2)) {
                return RestartTaskUtils.jsonConverter("status", "STARTED");
            }
        }
        return RestartTaskUtils.jsonConverter("status", "STARTED");
    }

    @POST
    @Path("/{connector}/tasks/{taskId}/restartByTime")
    public String restartByTime(@PathParam("connector") String str, @PathParam("taskId") Integer num, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool, byte[] bArr) throws Exception {
        String str2;
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        Map map = (Map) new ObjectMapper().readValue(bArr, new TypeReference<Map<String, Object>>() { // from class: org.apache.kafka.connect.runtime.rest.ConnectorsResourceExtension.2
        });
        String valueOf = String.valueOf(map.get("start.time"));
        if (valueOf.equals("null") || StringUtils.isBlank(valueOf)) {
            throw new IllegalArgumentException("start.time can not be blank");
        }
        if (!valueOf.matches("^\\d{2}-\\d{2}-\\d{4} \\d{2}\\.\\d{2}\\.\\d{2} [A|P]M$")) {
            throw new IllegalArgumentException("Format of start.time is wrong. Correct format is DD-MM-YYYY HH.MI.SS AM");
        }
        Object obj = map.get("end.time");
        String valueOf2 = obj == null ? "" : String.valueOf(obj);
        if (StringUtils.isNotBlank(valueOf2) && !valueOf2.matches("^\\d{2}-\\d{2}-\\d{4} \\d{2}\\.\\d{2}\\.\\d{2} [A|P]M$")) {
            throw new IllegalArgumentException("Format of end.time is wrong. Correct format is DD-MM-YYYY HH.MI.SS AM");
        }
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("DD-MM-YYYY hh.MM.SS aa");
        if (StringUtils.isNotBlank(valueOf2)) {
            if (simpleDateFormat.parse(valueOf2).before(simpleDateFormat.parse(valueOf))) {
                throw new IllegalArgumentException("Error in restarting Task : " + num + " .Invalid input !! end time must come after start time");
            }
        }
        SourceTaskInfoCache.putRestartTime(str, String.valueOf(num), valueOf, valueOf2);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(new ConnectorTaskId(str, num.intValue()), futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks/" + num + "/restart", "POST", httpHeaders, null, bool);
        String restartStatus = SourceTaskInfoCache.getRestartStatus(str, String.valueOf(num), valueOf, valueOf2);
        while (true) {
            str2 = restartStatus;
            if (!"NEW".equals(str2)) {
                break;
            }
            restartStatus = SourceTaskInfoCache.getRestartStatus(str, String.valueOf(num), valueOf, valueOf2);
        }
        return "FAILED".equals(str2) ? "Task " + num + " of connector " + str + " has failed to restart at start time " + valueOf + ". Please check task status for more details." : "Task " + num + " of connector " + str + " has successfully restarted at start time " + valueOf + ".";
    }

    @Path("/{connector}/submissions/{executionId}/delete_cache")
    @DELETE
    public void deleteCache(@PathParam("executionId") String str) {
        this.jobExecutionStats.deleteCache(str);
    }

    @GET
    @Path("/{connector}/submissions/{executionId}")
    public Map<String, Long> getConnectorMetrics(@PathParam("connector") String str, @PathParam("executionId") String str2) {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        EndpointInputValidator.validateNumeric("executionId", str2);
        return this.jobExecutionStats.getConnectorMetrics(str2);
    }

    @GET
    @Path("/{connector}/tasks/{taskId}/task_status")
    public Object taskStatus(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @PathParam("taskId") String str2) {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        validateNumeric(str2);
        return this.cdcMetrics.getSourceTaskStatus(str, Integer.valueOf(str2), httpHeaders, this.config);
    }

    @GET
    @Path("/{connector}/tasks/{taskId}/task_internal_status")
    public Object taskStatusInternal(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @PathParam("taskId") Integer num) {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        ConnectorStateInfo.TaskState taskStatus = this.herder.taskStatus(new ConnectorTaskId(str, num.intValue()));
        String str2 = (String) this.herder.connectorInfo(str).config().get("schema");
        String tables = SourceTaskInfoCache.getTables(str, String.valueOf(num));
        String str3 = null;
        String str4 = null;
        List<IdentifierInfo> list = null;
        List<IdentifierInfo> list2 = null;
        if (taskStatus.state().equalsIgnoreCase("FAILED")) {
            RestartTaskResponse restartTaskResponse = (RestartTaskResponse) new ConnectorClient().getLastProcessedIdentifiers(str, String.valueOf(num)).readEntity(RestartTaskResponse.class);
            if (restartTaskResponse.getId() != null) {
                str3 = restartTaskResponse.getCreateDate();
                str4 = restartTaskResponse.getUpdateDate();
                list = CDCMetrics.getIdentifierList(restartTaskResponse.getLastProcessedRecordIdentifier());
                list2 = CDCMetrics.getIdentifierList(restartTaskResponse.getFailedRecordIdentifier());
            }
        } else {
            String[] split = SourceTaskInfoCache.getIdentifier(str, String.valueOf(num)).split("~");
            String[] split2 = SourceTaskInfoCache.getSuccessfulIdentifier(str, String.valueOf(num)).split("~");
            str3 = split[1];
            str4 = split2[1];
            list = CDCMetrics.getIdentifierList(split[0]);
            list2 = CDCMetrics.getIdentifierList(split2[0]);
        }
        return new KafkaTaskStatusInfo(taskStatus.workerId(), Integer.valueOf(taskStatus.id()), taskStatus.state(), str3, str4, tables, str2, taskStatus.trace(), list, list2);
    }

    private void validateNumeric(String str) {
        if (!StringUtils.isNumeric(str)) {
            throw new ConnectRestException(Response.Status.BAD_REQUEST, EndpointInputValidator.getNumericViolationMessage("taskId", str));
        }
    }

    @GET
    @Path("/{connector_name}/tasks/{task_id}/viewlogs")
    public List<String> viewLogs(@PathParam("connector_name") String str, @PathParam("task_id") String str2, @Context HttpHeaders httpHeaders, @QueryParam("start_date") String str3, @QueryParam("end_date") String str4) {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters("connector_name", str);
        validateNumeric(str2);
        HashMap hashMap = new HashMap();
        hashMap.put("logPath", Paths.get(CommonConfiguration.CONNECTOR_LOG_PATH, new String[0]).getParent().getFileName().toString());
        hashMap.put(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        hashMap.put("taskId", str2);
        hashMap.put("startDate", str3);
        hashMap.put("endDate", str4);
        LogParser.INSTANCE.registerFilters(hashMap);
        List<String> parseLogs = LogParser.INSTANCE.parseLogs(hashMap);
        LogParser.INSTANCE.resetFilters();
        return parseLogs;
    }

    @GET
    @Path("/{connector}/tasks/{taskId}/source_task_count")
    public Object getSourceConnectorTaskCount(@PathParam("connector") String str, @PathParam("taskId") String str2) throws Exception {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        validateNumeric(str2);
        return this.cdcMetrics.getSourceTaskMetrics(str, str2);
    }

    @GET
    @Path("/{connector}/tasks/{taskId}/sink_task_count")
    public Object getSinkConnectorTaskCount(@PathParam("connector") String str, @PathParam("taskId") String str2) throws Exception {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        validateNumeric(str2);
        return this.cdcMetrics.getSinkTaskMetrics(str, str2);
    }

    @GET
    @Path("/worker/metrics")
    public Object getWorkerRebalanceMetrics(@QueryParam("metricType") String str) throws Exception {
        if (str == null || str.equalsIgnoreCase("rebalance")) {
            return this.cdcMetrics.getWorkerRebalanceMetrics(str);
        }
        throw new ConnectRestException(Response.Status.BAD_REQUEST, "Currently only rebalance metricType is supported");
    }

    @GET
    @Path("/source/{connector}/tasks/{taskId}/metrics")
    public SourceTaskAndTableMetrics getSourceMetrics(@PathParam("connector") String str, @PathParam("taskId") String str2, @Context HttpHeaders httpHeaders) throws Exception {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        validateNumeric(str2);
        return new SourceTaskAndTableMetrics(this.cdcMetrics.getSourceTaskMetrics(str, str2), this.cdcMetrics.getSourceTaskStatus(str, Integer.valueOf(str2), httpHeaders, this.config));
    }

    @GET
    @Path("/sink/{connector}/tasks/{taskId}/metrics")
    public SinkTaskAndTableMetrics getSinkMetrics(@PathParam("connector") String str, @PathParam("taskId") String str2) throws Exception {
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(MetricsConstants.MONITOR_CES_DIMENSION_STREAM_ID, str);
        validateNumeric(str2);
        return new SinkTaskAndTableMetrics(this.cdcMetrics.getSinkTaskMetrics(str, str2));
    }

    private <T, U> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, HttpHeaders httpHeaders, Object obj, TypeReference<U> typeReference, Translator<T, U> translator, Boolean bool) throws Exception {
        try {
            return (T) futureCallback.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
        } catch (ExecutionException e2) {
            RequestTargetException requestTargetException = (Exception) e2.getCause();
            if (!(requestTargetException instanceof RequestTargetException)) {
                if (requestTargetException instanceof RebalanceNeededException) {
                    throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
                }
                throw requestTargetException;
            }
            if (bool != null && !bool.booleanValue()) {
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
            }
            String uri = UriBuilder.fromUri(requestTargetException.forwardUrl()).path(str).queryParam("forward", new Object[]{Boolean.valueOf(bool == null)}).build(new Object[0]).toString();
            log.debug("Forwarding request {} {} {}", new Object[]{uri, str2, obj});
            return translator.translate(RestClient.httpRequest(uri, str2, httpHeaders, obj, typeReference, this.config));
        } catch (TimeoutException e3) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
        }
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, HttpHeaders httpHeaders, Object obj, Boolean bool) throws Exception {
        return (T) completeOrForwardRequest(futureCallback, str, str2, httpHeaders, obj, null, new IdentityTranslator(), bool);
    }
}
