package com.huawei.cdc.common.metadata.cache;

import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.metadata.util.CDCJobConnectorParams;
import com.huawei.cdc.common.metadata.util.CacheConstants;
import com.huawei.cdc.common.metadata.util.CommonConstants;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/common/metadata/cache/CacheUtil.class */
public class CacheUtil {
    public static final Logger log = LoggerFactory.getLogger(CacheUtil.class);
    public static final int KAFKA_RECORD_TIMEOUT = 5000;
    private static final int KAFKA_INIT_TIMEOUT = 300000;
    private final String topicName;
    private final String bootstrapServers;
    private final String securityProtocol;
    private Producer<String, String> kafkaProducer;
    private Consumer<String, String> kafkaConsumer;
    private int kafkaInitTimeout;
    private SecureRandom randNum;

    public CacheUtil(String str, String str2, String str3) {
        this.kafkaInitTimeout = KAFKA_INIT_TIMEOUT;
        this.randNum = new SecureRandom();
        this.topicName = CommonConfiguration.CACHE_TOPIC + CommonConstants.HYPHEN + str;
        this.bootstrapServers = str2;
        this.securityProtocol = str3;
    }

    public CacheUtil(String str) {
        this(str, CommonConfiguration.BOOTSTRAP_SERVERS, CommonConfiguration.SECURITY_PROTOCOL);
    }

    void setKafkaInitTimeout(int i) {
        this.kafkaInitTimeout = i;
    }

    void setKafkaProducer(Producer<String, String> producer) {
        this.kafkaProducer = producer;
    }

    void setKafkaConsumer(Consumer<String, String> consumer) {
        this.kafkaConsumer = consumer;
    }

    public void initProducerOnly() throws Exception {
        createTopic();
        this.kafkaProducer = createProducer();
    }

    public void initConsumerOnly() {
        this.kafkaConsumer = createConsumer();
    }

    private void createTopic() throws Exception {
        Properties properties = new Properties();
        properties.put(CommonConstants.BOOTSTRAP_SERVERS, this.bootstrapServers);
        properties.put(CommonConstants.SECURITY_PROTOCOL, this.securityProtocol);
        String str = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str)) {
            properties.put("kerberos.domain.name", str);
        }
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                try {
                    createTopic(create);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Unable to contact Kafka", e);
            throw e;
        }
    }

    public boolean doesSubmissionTopicExist() throws Exception {
        boolean z = false;
        Properties properties = new Properties();
        properties.put(CommonConstants.BOOTSTRAP_SERVERS, this.bootstrapServers);
        properties.put(CommonConstants.SECURITY_PROTOCOL, this.securityProtocol);
        String str = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str)) {
            properties.put("kerberos.domain.name", str);
        }
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                try {
                    if (((Set) create.listTopics().names().get()).contains(this.topicName)) {
                        z = true;
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    return z;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Unable to contact Kafka", e);
            throw e;
        }
    }

    public void deleteTopic() {
        Properties properties = new Properties();
        properties.put(CommonConstants.BOOTSTRAP_SERVERS, this.bootstrapServers);
        properties.put(CommonConstants.SECURITY_PROTOCOL, this.securityProtocol);
        String str = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str)) {
            properties.put("kerberos.domain.name", str);
        }
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                try {
                    create.deleteTopics(Collections.singletonList(this.topicName));
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Unable to delete Kafka topics", e);
            throw e;
        }
    }

    private void createTopic(AdminClient adminClient) throws Exception {
        NewTopic newTopic = new NewTopic(this.topicName, 1, (short) Math.min(((Collection) adminClient.describeCluster().nodes().get(this.kafkaInitTimeout, TimeUnit.MILLISECONDS)).size(), 3));
        newTopic.configs(Collections.singletonMap("cleanup.policy", "compact"));
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(300000L, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                return;
            }
            log.error("Error during topic creation", e);
        }
    }

    public Map<String, String> getMultiple(List<String> list) {
        boolean z;
        if (list == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        list.forEach(str -> {
        });
        this.kafkaConsumer.subscribe(Collections.singletonList(this.topicName));
        Long l = null;
        do {
            ConsumerRecords poll = this.kafkaConsumer.poll(Duration.ofMillis(5000L));
            if (poll.count() != 0) {
                boolean z2 = true;
                z = false;
                Iterator it = poll.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    if (z2) {
                        if (l != null) {
                            if (l.longValue() == consumerRecord.offset()) {
                                z = true;
                                break;
                            }
                        } else {
                            l = Long.valueOf(consumerRecord.offset());
                        }
                        z2 = false;
                    }
                    String str2 = (String) consumerRecord.key();
                    if (StringUtils.isNotBlank(str2)) {
                        addMetricValues(hashMap, str2, (String) consumerRecord.value());
                    }
                }
            } else {
                break;
            }
        } while (!z);
        this.kafkaConsumer.unsubscribe();
        return hashMap;
    }

    private boolean isCumulativeMetric(String str) {
        return str.startsWith(CacheConstants.DATA_PREFIX) || str.startsWith(CacheConstants.RECORDS_PREFIX) || str.startsWith(CacheConstants.TABLES_PREFIX);
    }

    private String getMetricName(String str) {
        return str.startsWith(CacheConstants.DATA_PREFIX) ? CacheConstants.DATA_PREFIX : str.startsWith(CacheConstants.RECORDS_PREFIX) ? CacheConstants.RECORDS_PREFIX : str.startsWith(CacheConstants.TABLES_PREFIX) ? CacheConstants.TABLES_PREFIX : str;
    }

    private String sumValues(String str, String str2) {
        return String.valueOf(Long.parseLong(str) + Long.parseLong(str2));
    }

    private void addMetricValues(Map<String, String> map, String str, String str2) {
        String str3 = StringUtils.isBlank(str2) ? "0" : str2;
        if (!isCumulativeMetric(str)) {
            map.put(str, str3);
        } else {
            String metricName = getMetricName(str);
            map.put(metricName, sumValues(map.getOrDefault(metricName, "0"), str3));
        }
    }

    public void put(String str, String str2) {
        try {
            this.kafkaProducer.send(new ProducerRecord(this.topicName, str, str2));
        } catch (Exception e) {
            log.info("Exception during save", e);
        }
    }

    protected Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put(CommonConstants.BOOTSTRAP_SERVERS, this.bootstrapServers);
        properties.put("acks", CDCJobConnectorParams.ID_NOT_PRESENT);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.put("retries", 0);
        properties.put(CommonConstants.SECURITY_PROTOCOL, this.securityProtocol);
        String str = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str)) {
            properties.put("kerberos.domain.name", str);
        }
        return new KafkaProducer(properties);
    }

    private Consumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put(CommonConstants.BOOTSTRAP_SERVERS, this.bootstrapServers);
        properties.put("group.id", "groupId");
        properties.put("client.id", "cId" + this.randNum.nextInt());
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "false");
        properties.put("max.poll.records", "10000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put(CommonConstants.SECURITY_PROTOCOL, this.securityProtocol);
        String str = System.getenv("KERBEROS_DOMAIN_NAME");
        if (StringUtils.isNotBlank(str)) {
            properties.put("kerberos.domain.name", str);
        }
        return new KafkaConsumer(properties);
    }

    public void closeProducer() {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
        }
    }

    public void closeConsumer() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }
}
