package com.huawei.cdc.connect.drs.processor;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.huawei.cdc.common.kafka.TopicCreation;
import com.huawei.cdc.common.metadata.cache.CacheUtil;
import com.huawei.cdc.common.metadata.client.ConnectorClient;
import com.huawei.cdc.common.metadata.models.HeartbeatData;
import com.huawei.cdc.connect.drs.config.ConnectorConfig;
import com.huawei.cdc.connect.drs.consumer.KafkaConsumerUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javafx.util.Pair;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/drs/processor/TaskProcessor.class */
public class TaskProcessor {
    private ConnectorConfig config;
    private String connectorName;
    private ExecutorService service;
    private boolean isRestServerConfigured;
    private ConnectorClient connectorClient;
    String source;
    private CacheUtil cacheUtil;
    private Pair<Set<String>, Map<String, String>> topicTableMappingPair;
    private ConcurrentHashMap<String, Integer> topicPartitionMap;
    private static final Logger log = LoggerFactory.getLogger(TaskProcessor.class);
    private static final ConcurrentHashMap<String, Pair<Set<String>, Map<String, String>>> TOPIC_TABLE_MAPPING_TO_PARSED_PAIR_MAP = new ConcurrentHashMap<>();
    private final Object taskLock = new Object();
    private Set<String> taskIds = new HashSet();
    TopicCreation topicCreation = new TopicCreation();
    private final CountDownLatch latch = new CountDownLatch(1);
    private final AtomicBoolean live = new AtomicBoolean(false);

    /* loaded from: input_file:com/huawei/cdc/connect/drs/processor/TaskProcessor$PreProcess.class */
    private class PreProcess implements Callable<Void> {
        private PreProcess() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            TaskProcessor.this.topicTableMappingPair = TaskProcessor.processTopicTables(TaskProcessor.this.config.getTopicTableMapping());
            if ("CONNECTOR".equals(TaskProcessor.this.source)) {
                TaskProcessor.this.prepareTopicPartitionMap((Set) TaskProcessor.this.topicTableMappingPair.getKey());
                try {
                    TaskProcessor.this.topicCreation.createOrAlterTopic(TaskProcessor.this.topicPartitionMap);
                } catch (Exception e) {
                    TaskProcessor.log.error("Error during Topic Creation ", e);
                    throw new ConnectException(e);
                }
            }
            TaskProcessor.this.latch.countDown();
            return null;
        }
    }

    public void init(ConnectorConfig connectorConfig, String str) {
        if ("TASK".equals(str)) {
            synchronized (this.taskLock) {
                this.taskIds.add(connectorConfig.getIntTaskId());
                if (this.live.get()) {
                    return;
                } else {
                    log.debug("Tasks {} associated with TaskProcessor {}", getAssociatedTasks(), Integer.valueOf(hashCode()));
                }
            }
        }
        synchronized (this.taskLock) {
            if (this.live.get()) {
                return;
            }
            this.live.set(true);
            this.config = connectorConfig;
            this.connectorName = this.config.getConnectorName();
            this.source = str;
            initCache();
            setRestServer();
            initConnectorClient();
            this.service = Executors.newSingleThreadExecutor();
            try {
                this.service.submit(new PreProcess()).get();
            } catch (InterruptedException e) {
                stop();
                log.info("Exception occurred while pre-processing");
            } catch (ExecutionException e2) {
                stop();
                Throwable cause = e2.getCause();
                if (cause instanceof IllegalArgumentException) {
                    throw new IllegalArgumentException(cause);
                }
                log.error("Error while topic creation.Close Schema connection ", e2);
                throw new ConnectException(e2);
            }
            this.service.shutdown();
        }
    }

    private void initConnectorClient() {
        this.connectorClient = new ConnectorClient();
        if (this.isRestServerConfigured) {
            this.connectorClient.initSource("DRS_MYSQL", true);
        }
    }

    private void setRestServer() {
        this.isRestServerConfigured = ConnectorClient.isRestServerConfigured();
    }

    private String getAssociatedTasks() {
        synchronized (this.taskLock) {
            if (this.taskIds == null) {
                return "No";
            }
            return String.join(",", this.taskIds);
        }
    }

    public void reset() {
        this.live.set(false);
        synchronized (this.taskLock) {
            if (this.taskIds != null) {
                this.taskIds.clear();
            }
        }
        stop();
        TaskProcessorFactory.getInstance().relieveTaskProcessor(this.connectorName);
    }

    private void stop() {
        if (this.service != null) {
            this.service.shutdown();
        }
        log.info("Schema connection closed");
    }

    public void waitForPreProcessing() {
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            log.info("Error while waiting on latch");
            stop();
        }
    }

    public void detach(String str) {
        synchronized (this.taskLock) {
            if (this.taskIds != null) {
                this.taskIds.remove(str);
                log.debug("Tasks {} associated with TaskProcessor {}", getAssociatedTasks(), Integer.valueOf(hashCode()));
                if (this.taskIds.isEmpty()) {
                    log.debug("Task set empty, reset called from task {}", str);
                    reset();
                }
            }
        }
    }

    public void deleteCdlCache() {
        if ("-1".equals(this.config.getJobExecutionId())) {
            return;
        }
        this.cacheUtil.deleteTopic();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void invalidate() {
        this.topicCreation = null;
        this.config = null;
        this.taskIds = null;
        if (this.cacheUtil != null) {
            this.cacheUtil.closeProducer();
            this.cacheUtil = null;
        }
    }

    private void initCache() {
        if (this.cacheUtil != null || "-1".equals(this.config.getJobExecutionId())) {
            return;
        }
        this.cacheUtil = new CacheUtil(this.config.getJobExecutionId());
        try {
            this.cacheUtil.initProducerOnly();
        } catch (Exception e) {
            log.error("Error initializing Kafka", e);
            throw new ConnectException(e);
        }
    }

    public void updateMetrics(String str, int i, long j, long j2, String str2) {
        if ("-1".equals(str2)) {
            return;
        }
        this.cacheUtil.put("cdc-records-" + str, String.valueOf(j));
        this.cacheUtil.put("cdc-tables-" + str, String.valueOf(i));
        this.cacheUtil.put("cdc-data-" + str, String.valueOf(j2));
    }

    public void postHeartbeat(HeartbeatData heartbeatData) {
        this.connectorClient.createSourceHeartbeat(heartbeatData);
    }

    public void postError(String str, String str2, Exception exc, String str3) {
        if (!this.isRestServerConfigured || "-1".equals(str3)) {
            return;
        }
        this.connectorClient.postError("pgsql", str3, str, str2, exc);
    }

    public void prepareTopicPartitionMap(Set<String> set) {
        this.topicPartitionMap = new KafkaConsumerUtil(this.config, set).getTopicPartitionMap();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Pair<Set<String>, Map<String, String>> processTopicTables(String str) {
        Pair<Set<String>, Map<String, String>> pair;
        if (TOPIC_TABLE_MAPPING_TO_PARSED_PAIR_MAP.containsKey(str)) {
            pair = TOPIC_TABLE_MAPPING_TO_PARSED_PAIR_MAP.get(str);
        } else {
            HashMap hashMap = new HashMap();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            JsonElement jsonElement = null;
            try {
                jsonElement = JsonParser.parseString(str);
            } catch (JsonSyntaxException e) {
                log.error("Error in parsing Topic Configuration: {}. Discarding topic table mapping", str, e);
            }
            if (jsonElement == null) {
                log.info("Topic Config is Empty. Discarding...");
                return new Pair<>(hashSet, hashMap);
            }
            try {
                JsonArray asJsonArray = jsonElement.getAsJsonArray();
                if (asJsonArray.size() == 0) {
                    log.info("Topic Config is Empty. Discarding...");
                    return new Pair<>(hashSet, hashMap);
                }
                for (int i = 0; i < asJsonArray.size(); i++) {
                    JsonObject asJsonObject = asJsonArray.get(i).getAsJsonObject();
                    String asString = asJsonObject.get("topicName").getAsString();
                    String upperCase = asJsonObject.get("tableName").getAsString().toUpperCase(Locale.ENGLISH);
                    if (StringUtils.isBlank(asString) || StringUtils.isBlank(upperCase)) {
                        log.info("Skipping Entry at index {} in Topic Table mapping. Topic or table value is empty", Integer.valueOf(i));
                    } else if (hashSet.contains(asString)) {
                        log.info("Skipping Entry at index {} in Topic Table mapping. Topic already mapped", Integer.valueOf(i));
                    } else {
                        addValidTopicTables(i, upperCase, hashSet2, asString, hashSet, hashMap);
                    }
                }
                pair = new Pair<>(hashSet, hashMap);
                TOPIC_TABLE_MAPPING_TO_PARSED_PAIR_MAP.put(str, pair);
            } catch (IllegalStateException e2) {
                log.error("Value supplied for param \"topic.table.mapping\" is incorrect: {}", e2.getMessage());
                throw new ConfigException("Value supplied for param \"topic.table.mapping\" is incorrect. {}", e2.getMessage());
            }
        }
        return pair;
    }

    private static void addValidTopicTables(int i, String str, Set<String> set, String str2, Set<String> set2, Map<String, String> map) {
        Arrays.stream(str.split(",")).forEach(str3 -> {
            boolean z = true;
            if (set.contains(str3)) {
                log.info("Skipping table entry {} for topic {} at index {} in Topic Table Mapping. Table already mapped", new Object[]{str3, str2, Integer.valueOf(i)});
                z = false;
            }
            if (z) {
                map.put(str3, str2);
                set.add(str3);
                set2.add(str2);
            }
        });
    }

    public Pair<Set<String>, Map<String, String>> getTopicTableMappingPair() {
        return this.topicTableMappingPair;
    }
}
