package org.apache.kafka.connect.runtime.rest;

import com.fasterxml.jackson.core.type.TypeReference;
import com.huawei.cdc.common.rest.validation.EndpointInputValidator;
import com.huawei.cdc.common.util.ConnectorStatusUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Path("/api/v1/cdl")
@Consumes({"application/json"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/rest/CDCResource.class */
public class CDCResource {
    private static final Logger log = LoggerFactory.getLogger(CDCResource.class);
    private static final String BOOTSTRAP_EMPTY = "BOOTSTRAP EMPTY";
    private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    private static final String GROUP_ID_CONFIG_DLQ_READER = "connect-cluster-dlq-reader";
    private static final String CONNECT_ERROR_EXCEPTION_MESSAGE = "__connect.errors.exception.message";
    private static final String CONNECT_ERROR_TOPIC = "__connect.errors.topic";
    private static final String CONNECT_ERROR_TOPIC_PARTITION = "__connect.errors.partition";
    private static final String CONNECT_ERROR_TOPIC_OFFSET = "__connect.errors.offset";
    private static final byte MAGIC_BYTE = 0;
    private static final String CONNECTOR_UNAVAILABLE = "CONNECTOR UNAVAILABLE";
    private final Herder herder;
    private final WorkerConfig config;

    public CDCResource(Herder herder, WorkerConfig workerConfig) throws IOException {
        this.herder = herder;
        this.config = workerConfig;
    }

    @GET
    @Path("/connectors/{connector_name}/tasks/{task_id}/logs")
    public Response getLogsForATask(@PathParam("connector_name") String str, @PathParam("task_id") String str2, @QueryParam("start_date") String str3, @QueryParam("end_date") String str4, @Context HttpHeaders httpHeaders, @QueryParam("worker_id") String str5) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector_name", str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        if (!StringUtils.isNumeric(str2)) {
            return buildResponse(EndpointInputValidator.getNumericViolationMessage("task_id", str2));
        }
        try {
            ConnectorStateInfo.TaskState taskStatus = this.herder.taskStatus(new ConnectorTaskId(str, Integer.parseInt(str2)));
            if (str5 == null || str5.isEmpty()) {
                str5 = taskStatus.workerId();
            }
            RestClient.HttpResponse httpRequest = RestClient.httpRequest((str3 == null || str3.isEmpty() || str4 == null || str4.isEmpty()) ? "http://" + str5 + "/connectors/" + str + "/tasks/" + str2 + "/viewlogs" : "http://" + str5 + "/connectors/" + str + "/tasks/" + str2 + "/viewlogs?start_date=" + str3 + "&end_date=" + str4, "GET", httpHeaders, (Object) null, new TypeReference<Object>() { // from class: org.apache.kafka.connect.runtime.rest.CDCResource.1
            }, this.config);
            Object obj = new Object();
            if (httpRequest.status() == 200) {
                obj = httpRequest.body();
            }
            return Response.status(Response.Status.OK).entity(obj).build();
        } catch (NotFoundException e) {
            return Response.status(Response.Status.NOT_FOUND).entity("Logs Not found for Connector: " + str + ", Task: " + str2 + ", Reason: Task not found").build();
        }
    }

    private Response buildResponse(String str) {
        return Response.status(Response.Status.BAD_REQUEST).entity(str).build();
    }

    @GET
    @Path("/connectors/{connector_name}/dlq_events")
    public Response getErrorQueueEvents(@PathParam("connector_name") String str, @QueryParam("page_size") int i, @QueryParam("page_number") int i2) {
        Response performBasicValidations = performBasicValidations(str);
        if (i < 0) {
            throw new BadRequestException("page_size must not be negative");
        }
        if (i2 < 0) {
            throw new BadRequestException("page_number must not be negative");
        }
        if (performBasicValidations != null) {
            return performBasicValidations;
        }
        HashMap hashMap = new HashMap();
        List<String> list = this.config.getList(BOOTSTRAP_SERVERS_CONFIG);
        final String str2 = (String) this.herder.connectorInfo(str).config().get("errors.deadletterqueue.topic.name");
        Properties properties = new Properties();
        initConsumerConfigs(properties, list, i);
        final long j = 1 * i * (i2 - 1);
        try {
            final KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            final HashMap<TopicPartition, Long> hashMap2 = new HashMap<TopicPartition, Long>() { // from class: org.apache.kafka.connect.runtime.rest.CDCResource.2
                {
                    put(new TopicPartition(str2, CDCResource.MAGIC_BYTE), Long.valueOf(j));
                }
            };
            kafkaConsumer.subscribe(Collections.singletonList(str2), new ConsumerRebalanceListener() { // from class: org.apache.kafka.connect.runtime.rest.CDCResource.3
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    for (TopicPartition topicPartition : collection) {
                        Long l = (Long) hashMap2.get(topicPartition);
                        if (l != null) {
                            kafkaConsumer.seek(topicPartition, l.longValue());
                        }
                    }
                }
            });
            kafkaConsumer.poll(Duration.ofMillis(1000L)).forEach(consumerRecord -> {
                if (MAGIC_BYTE == consumerRecord.value() || ((String) consumerRecord.value()).isEmpty()) {
                    return;
                }
                HashMap hashMap3 = new HashMap();
                int schemaId = getSchemaId((String) consumerRecord.value());
                HashMap hashMap4 = new HashMap();
                if (schemaId != -1) {
                    hashMap4.put("schemaId", schemaId + "");
                }
                hashMap4.put("payload", consumerRecord.value());
                hashMap3.put("value", hashMap4);
                consumerRecord.headers().forEach(header -> {
                    if (header.key().equals(CONNECT_ERROR_EXCEPTION_MESSAGE)) {
                        hashMap3.put("error_message", new String(header.value()));
                    }
                    if (header.key().equals(CONNECT_ERROR_TOPIC)) {
                        hashMap3.put("topic", new String(header.value()));
                    }
                    if (header.key().equals(CONNECT_ERROR_TOPIC_PARTITION)) {
                        hashMap3.put("topic_partition", new String(header.value()));
                    }
                    if (header.key().equals(CONNECT_ERROR_TOPIC_OFFSET)) {
                        hashMap3.put("topic_offset", new String(header.value()));
                    }
                });
                hashMap.put(Integer.valueOf(hashMap.size() + 1), hashMap3);
            });
            kafkaConsumer.close();
            return Response.status(Response.Status.OK).entity(hashMap).build();
        } catch (Exception e) {
            log.error("Failed to replay dlq messages: {}", e.getMessage());
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(getResponseData(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage())).build();
        }
    }

    private Response performBasicValidations(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector_name", str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        ConnectorInfo connectorInfo = this.herder.connectorInfo(str);
        if (connectorInfo == null) {
            return getResponse(CONNECTOR_UNAVAILABLE);
        }
        if (this.config.getList(BOOTSTRAP_SERVERS_CONFIG).isEmpty()) {
            return getResponse(BOOTSTRAP_EMPTY);
        }
        if (StringUtils.isBlank((String) connectorInfo.config().get("errors.deadletterqueue.topic.name"))) {
            return Response.status(Response.Status.NOT_FOUND).entity(getResponseData(Response.Status.NOT_FOUND, "Dead Letter Queue is not configured for the connector: " + str)).build();
        }
        return null;
    }

    private void initConsumerConfigs(Properties properties, List<String> list, int i) {
        properties.put(BOOTSTRAP_SERVERS_CONFIG, list.stream().collect(Collectors.joining(",")));
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("group.id", GROUP_ID_CONFIG_DLQ_READER);
        properties.put("enable.auto.commit", false);
        properties.put("max.poll.records", Integer.valueOf(i));
        String str = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str)) {
            properties.put("kerberos.domain.name", str);
        }
    }

    @POST
    @Path("/connectors/{connector_name}/dlq_replay")
    @Consumes({"application/json"})
    public Response replay(@PathParam("connector_name") String str, Map<String, List<Map<String, String>>> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector_name", str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        ConnectorInfo connectorInfo = this.herder.connectorInfo(str);
        if (connectorInfo == null) {
            return getResponse(CONNECTOR_UNAVAILABLE);
        }
        List list = this.config.getList(BOOTSTRAP_SERVERS_CONFIG);
        if (list.isEmpty()) {
            return getResponse(BOOTSTRAP_EMPTY);
        }
        String str2 = (String) connectorInfo.config().get("errors.deadletterqueue.replay.topic.name");
        if (StringUtils.isBlank(str2)) {
            return Response.status(Response.Status.NOT_FOUND).entity(getResponseData(Response.Status.NOT_FOUND, "Dead Letter Queue is not configured for the connector: " + str)).build();
        }
        Properties properties = new Properties();
        properties.put(BOOTSTRAP_SERVERS_CONFIG, list.stream().collect(Collectors.joining(",")));
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        String str3 = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str3)) {
            properties.put("kerberos.domain.name", str3);
        }
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        AtomicInteger atomicInteger = new AtomicInteger(MAGIC_BYTE);
        map.values().forEach(list2 -> {
            list2.forEach(map2 -> {
                kafkaProducer.send(new ProducerRecord(str2, (Object) null, (String) map2.values().toArray()[MAGIC_BYTE]));
                atomicInteger.incrementAndGet();
            });
        });
        kafkaProducer.close();
        return Response.status(Response.Status.OK).entity(Integer.valueOf(atomicInteger.get())).build();
    }

    private Map<String, String> getResponseData(Response.Status status, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("error_code", String.valueOf(status.getStatusCode()));
        hashMap.put("message", str);
        return hashMap;
    }

    private int getSchemaId(String str) {
        ByteBuffer wrap = ByteBuffer.wrap(str.getBytes());
        int i = -1;
        if (wrap.get() == 0) {
            i = wrap.getInt();
        }
        return i;
    }

    private Response getResponse(String str) {
        return str.equalsIgnoreCase(CONNECTOR_UNAVAILABLE) ? Response.status(Response.Status.NOT_FOUND).entity(getResponseData(Response.Status.NOT_FOUND, "Not Such Connector is present, Please check connector name")).build() : Response.status(Response.Status.NOT_FOUND).entity(getResponseData(Response.Status.NOT_FOUND, "Bootstrap Server config is Empty")).build();
    }

    @POST
    @Path("/connectors/{connector_name}/cdlstop")
    @Consumes({"application/json"})
    public Response connectorStop(@PathParam("connector_name") String str, Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector_name", str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        ConnectorStatusUtil.removeData(str);
        return Response.status(Response.Status.OK).build();
    }

    @POST
    @Path("/connectors/{connector_name}/cdlstart")
    @Consumes({"application/json"})
    public Response connectorStart(@PathParam("connector_name") String str, Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector_name", str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        ConnectorStatusUtil.putData(str, true);
        return Response.status(Response.Status.OK).build();
    }

    @Path("/connectors/{connector_name}/auto_schema/status")
    @PUT
    @Consumes({"application/json"})
    public Response updateSchemaAutoCreationStatus(@PathParam("connector_name") String str, Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector_name", str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        ConnectorStatusUtil.SCHEMA_CREATION_STATUS.put(str, map.get("status"));
        return Response.status(Response.Status.OK).build();
    }
}
