package com.huawei.cdc.common.metadata.client;

import com.huawei.cdc.common.WriterConstants;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.health.constants.HealthConstants;
import com.huawei.cdc.common.metadata.models.ConnectorTaskData;
import com.huawei.cdc.common.metadata.models.ErrorData;
import com.huawei.cdc.common.metadata.models.HeartbeatData;
import com.huawei.cdc.common.metadata.models.MetricData;
import com.huawei.cdc.common.metadata.models.RestData;
import com.huawei.cdc.common.metadata.models.SubmissionMetricsData;
import com.huawei.cdc.common.metadata.util.CommonConstants;
import com.huawei.cdc.common.metadata.util.ErrorDataConstants;
import com.huawei.cdc.common.metadata.util.HeartbeatConstants;
import com.huawei.cdc.common.metadata.util.KerberosUtil;
import com.huawei.cdc.common.metadata.util.MetricsConstants;
import com.huawei.cdc.common.metadata.util.TaskConstants;
import com.huawei.cdc.common.metadata.util.UniqueIdHelper;
import com.huawei.cdc.common.rest.validation.EndpointInputValidator;
import com.huawei.cdc.common.util.CommonUtil;
import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/common/metadata/client/ConnectorClient.class */
public class ConnectorClient {
    private String sourceDatastore;
    private String targetDatastore;
    private ExecutorService service;
    private boolean closed;
    private boolean asynchronous;
    private Client client;
    private LinkedBlockingQueue<RestData> restDataQueue;
    private boolean isValidCdlRestServers;
    private String keyTabContent;
    public static final Logger log = LoggerFactory.getLogger(ConnectorClient.class);
    private static final Pattern ADDRESS_PATTERN = Pattern.compile("((?<cname>[^/]+)/)?(?<ip>.+):(?<port>\\d+)");
    private static String keyTabFile = "cdl.keytab";
    private final List<String> cdlConnectServers = new ArrayList();
    private final List<String> cdlRestServers = new ArrayList();
    private boolean clusterSecurity = true;
    private String principal = CommonConstants.EMPTY;
    private String keytabPath = CommonConstants.EMPTY;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/huawei/cdc/common/metadata/client/ConnectorClient$RequestListener.class */
    public class RequestListener extends Thread {
        private RequestListener() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!ConnectorClient.this.closed) {
                try {
                    ConnectorClient.this.makeRestCall((RestData) ConnectorClient.this.restDataQueue.take());
                } catch (InterruptedException e) {
                    ConnectorClient.log.error("Error in heartbeat listener", e);
                    if (ConnectorClient.this.service != null) {
                        ConnectorClient.this.service.shutdown();
                    }
                    ConnectorClient.this.closed = true;
                }
            }
        }
    }

    public static boolean isRestServerConfigured() {
        return StringUtils.isNotBlank(CommonConfiguration.CDL_REST_SERVERS);
    }

    public void initSource(String str, boolean z) {
        this.asynchronous = z;
        initSource(str);
    }

    public void initTarget(String str, boolean z) {
        this.asynchronous = z;
        initTarget(str);
    }

    public void initSource(String str) {
        this.sourceDatastore = str;
        init();
    }

    private void initTarget(String str) {
        this.targetDatastore = str;
        init();
    }

    public void init() {
        setCDLConnectServer();
        setCDLRestServer();
        getSecurityConfig();
        this.client = ClientBuilder.newClient();
        this.client.property("jersey.config.client.connectTimeout", CommonConfiguration.CDL_REST_CLIENT_CONNECT_TIMEOUT_MS);
        this.client.property("jersey.config.client.readTimeout", CommonConfiguration.CDL_REST_CLIENT_REQUEST_TIMEOUT_MS);
        if (this.asynchronous) {
            this.restDataQueue = new LinkedBlockingQueue<>();
            this.service = Executors.newSingleThreadExecutor();
            this.service.execute(new RequestListener());
        }
    }

    private String getCDLRestServerLocation() {
        String str = null;
        if (this.isValidCdlRestServers) {
            try {
                str = CommonConstants.HTTP_PREFIX + getCDLRestServer();
            } catch (Exception e) {
                log.error("CDL Service Error: " + e.getMessage());
            }
        }
        return str;
    }

    private String getCDLRestServer() throws Exception {
        ArrayList arrayList = new ArrayList(this.cdlRestServers);
        boolean z = false;
        String str = null;
        while (arrayList.size() > 0) {
            str = getRandomCdlRestServer(arrayList);
            z = getCdlServiceStatus(str);
            if (z) {
                break;
            }
        }
        if (z) {
            return str;
        }
        throw new Exception("CDL services are DOWN");
    }

    private String getRandomCdlRestServer(List<String> list) {
        String str = list.get(new SecureRandom().nextInt(list.size()));
        list.remove(str);
        return str;
    }

    private boolean getCdlServiceStatus(String str) {
        String str2 = CommonConstants.HTTP_PREFIX + str;
        checkRestClientClosed();
        try {
            Response makeRestCall = makeRestCall("/api/v1/cdl/health/cdl-service", CommonConstants.HTTP_GET, CommonConstants.EMPTY, str2);
            if (makeRestCall == null || makeRestCall.getStatus() != 200) {
                return false;
            }
            return ((String) makeRestCall.readEntity(String.class)).equals(HealthConstants.STATUS_UP);
        } catch (Exception e) {
            log.error("Error fetching CDL service status for " + str + " Message: " + e.getMessage());
            return false;
        }
    }

    private void setCDLRestServer() {
        if (!StringUtils.isNotBlank(CommonConfiguration.CDL_REST_SERVERS)) {
            this.isValidCdlRestServers = false;
            log.error("No valid CDL Rest Servers set. Please configure {} or {}", "cdl.rest.servers");
            return;
        }
        this.isValidCdlRestServers = EndpointInputValidator.validateCDLRestServers(CommonConfiguration.CDL_REST_SERVERS);
        if (this.isValidCdlRestServers) {
            Collections.addAll(this.cdlRestServers, CommonConfiguration.CDL_REST_SERVERS.split(CommonConstants.COMMA));
        } else {
            log.error("Invalid cdl.rest.servers, check if all IPs and the Port are correct.");
        }
    }

    private void setCDLConnectServer() {
        if (StringUtils.isNotBlank(CommonConfiguration.CDL_CONNECT_SERVERS)) {
            Arrays.stream(CommonConfiguration.CDL_CONNECT_SERVERS.split(CommonConstants.COMMA)).forEach(str -> {
                if (ADDRESS_PATTERN.matcher(str).matches()) {
                    this.cdlConnectServers.add(CommonConstants.HTTP_PREFIX + str);
                }
            });
        } else {
            log.error("No valid CDL Connect Servers set. Please configure {} or {}", "cdl.connect.servers");
            throw new ConnectException("cdl.connect.servers not set in CDL config properties");
        }
    }

    public void createSourceHeartbeat(HeartbeatData heartbeatData) {
        heartbeatData.setMethod(CommonConstants.HTTP_POST);
        heartbeatData.setServerName(getCDLRestServerLocation());
        heartbeatData.setPath(HeartbeatConstants.SOURCE_HEARTBEAT_URL);
        heartbeatData.setSourceDatastore(this.sourceDatastore);
        dispatch(heartbeatData);
    }

    public void postError(String str, String str2, String str3, String str4, Exception exc) {
        ErrorData errorData = new ErrorData();
        errorData.setId(UniqueIdHelper.getId());
        errorData.setMessage(exc.getMessage());
        errorData.setStatus(str4);
        errorData.setSeverity(str3);
        errorData.setSource(str);
        errorData.setServerName(getCDLRestServerLocation());
        errorData.setMethod(CommonConstants.HTTP_POST);
        errorData.setPath(ErrorDataConstants.LOG_ERROR_PATH.replace(ErrorDataConstants.SUBMISSION, str2));
        errorData.setStack((String) Arrays.stream(exc.getStackTrace()).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(" ")));
        dispatch(errorData);
    }

    public void createOrReplaceMetrics(String str) {
        dispatch(getMetricsBody(str));
    }

    public void updateMetrics(int i, long j, long j2, String str) {
        dispatch(getMetricsBody(i, j, j2, str));
    }

    private SubmissionMetricsData getMetricsBody(int i, long j, long j2, String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MetricData(str, MetricsConstants.SOURCE_TABLES_PROCESSED, String.valueOf(i)));
        arrayList.add(new MetricData(str, MetricsConstants.TOTAL_RECORDS_PROCESSED, String.valueOf(j)));
        arrayList.add(new MetricData(str, MetricsConstants.TOTAL_DATA_PROCESSED, String.valueOf(j2)));
        return new SubmissionMetricsData(arrayList, getCDLRestServerLocation(), CommonConstants.HTTP_POST, ErrorDataConstants.METRIC_CREATE_PATH.replace(ErrorDataConstants.SUBMISSION, str));
    }

    private SubmissionMetricsData getMetricsBody(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = MetricsConstants.getMetricNames().iterator();
        while (it.hasNext()) {
            arrayList.add(new MetricData(str, it.next(), "0"));
        }
        return new SubmissionMetricsData(arrayList, getCDLRestServerLocation(), CommonConstants.HTTP_POST, ErrorDataConstants.METRIC_CREATE_PATH.replace(ErrorDataConstants.SUBMISSION, str));
    }

    public void createSinkHeartbeat(String str, String str2, Long l, String str3, String str4, String str5, String str6) {
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setSubmissionId(str5);
        heartbeatData.setMethod(CommonConstants.HTTP_PUT);
        heartbeatData.setServerName(getCDLRestServerLocation());
        heartbeatData.setPath(HeartbeatConstants.SINK_HEARTBEAT_URL);
        heartbeatData.setLazyUid(str3);
        heartbeatData.setTargetSchema(str);
        heartbeatData.setTargetEntity(str2);
        heartbeatData.setTargetTaskId(str6);
        heartbeatData.setTargetDatastore(this.targetDatastore);
        heartbeatData.setTargetCommitTime(CommonUtil.convertTimestampToSystemDate(l.longValue()));
        heartbeatData.setTargetConnectionId(str4);
        dispatch(heartbeatData);
    }

    public Response getExistingHeartbeat(String str) {
        init();
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setMethod(CommonConstants.HTTP_GET);
        heartbeatData.setServerName(getCDLRestServerLocation());
        heartbeatData.setPath(HeartbeatConstants.EXISTING_HEARTBEAT_URL.replace(CommonConstants.IDENTIFIER, str));
        return makeRestCall(heartbeatData.getPath(), heartbeatData.getMethod(), null, heartbeatData.getServerName());
    }

    public void updateFailedIdentifier(String str, String str2, String str3, String str4, String str5) {
        init();
        ConnectorTaskData connectorTaskData = new ConnectorTaskData();
        connectorTaskData.setMethod(CommonConstants.HTTP_POST);
        connectorTaskData.setPath(TaskConstants.FAILED_TASK_PATH);
        connectorTaskData.setServerName(getCDLRestServerLocation());
        connectorTaskData.setConnectorName(str3);
        connectorTaskData.setTaskId(str4);
        connectorTaskData.setType(str5);
        connectorTaskData.setTaskStatus("FAILED");
        connectorTaskData.setLastProcessedRecordIdentifier(str2);
        connectorTaskData.setFailedRecordIdentifier(str);
        dispatch(connectorTaskData);
    }

    public Response getLastProcessedIdentifiers(String str, String str2) {
        init();
        ConnectorTaskData connectorTaskData = new ConnectorTaskData();
        connectorTaskData.setMethod(CommonConstants.HTTP_POST);
        connectorTaskData.setPath(TaskConstants.GET_IDENTIFIER_PATH);
        connectorTaskData.setServerName(getCDLRestServerLocation());
        connectorTaskData.setConnectorName(str);
        connectorTaskData.setTaskId(str2);
        return makeRestCall(connectorTaskData.getPath(), connectorTaskData.getMethod(), connectorTaskData.getJSONString(), connectorTaskData.getServerName());
    }

    private void dispatch(RestData restData) {
        if (!this.asynchronous) {
            makeRestCall(restData);
            return;
        }
        try {
            this.restDataQueue.put(restData);
        } catch (InterruptedException e) {
            log.error("Error encountered during dispatch of message {}", restData.getJSONString(), e);
        }
    }

    private void checkRestClientClosed() {
        if (this.client == null || this.client.isClosed()) {
            this.client = ClientBuilder.newClient();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void makeRestCall(RestData restData) {
        Response response = null;
        checkRestClientClosed();
        try {
            if (restData.getServerName() != null) {
                response = makeRestCall(restData.getPath(), restData.getMethod(), restData.getJSONString(), restData.getServerName());
            } else if (this.isValidCdlRestServers) {
                log.error("CDL services are DOWN, Unable to make rest call with path {}, method {}", restData.getPath(), restData.getMethod());
            } else {
                log.error("Invalid cdl.rest.servers in config properties, Unable to make rest call with path {}, method {}", restData.getPath(), restData.getMethod());
            }
        } catch (RuntimeException e) {
            log.error("Error making rest call with server {}, path {}, method {}", new Object[]{restData.getServerName(), restData.getPath(), restData.getMethod(), e});
        }
        if (response != null && response.getStatus() == 404) {
            log.info("Server not found {}", restData.getServerName());
        }
        if (response == null || response.getStatus() != 500) {
            return;
        }
        log.info("Server error in server {} during request {}", restData.getServerName(), restData);
    }

    public Response makeRestCall(String str, String str2, String str3, String str4) {
        Response response = null;
        setCDLAuthentication(str);
        boolean z = -1;
        switch (str2.hashCode()) {
            case 70454:
                if (str2.equals(CommonConstants.HTTP_GET)) {
                    z = false;
                    break;
                }
                break;
            case 79599:
                if (str2.equals(CommonConstants.HTTP_PUT)) {
                    z = 3;
                    break;
                }
                break;
            case 2461856:
                if (str2.equals(CommonConstants.HTTP_POST)) {
                    z = true;
                    break;
                }
                break;
            case 2012838315:
                if (str2.equals("DELETE")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                response = doGet(str, str4);
                break;
            case true:
                response = doPost(str, str3, str4);
                break;
            case true:
                response = doDelete(str, str4);
                break;
            case WriterConstants.MAX_RETRIES /* 3 */:
                response = doPut(str, str3, str4);
                break;
            default:
                log.info("Request not sent because method {} is not implemented in this context", str2);
                break;
        }
        return response;
    }

    public void shutdown() {
        this.closed = true;
        if (this.service != null) {
            this.service.shutdown();
        }
        if (this.client != null) {
            this.client.close();
        }
    }

    private Response doPost(String str, String str2, String str3) {
        return this.client.target(str3).path(str).request().post(Entity.json(str2));
    }

    private Response doPut(String str, String str2, String str3) {
        return this.client.target(str3).path(str).request().put(Entity.json(str2));
    }

    private Response doDelete(String str, String str2) {
        return this.client.target(str2).path(str).request().delete();
    }

    private Response doGet(String str, String str2) {
        return this.client.target(str2).path(str).request().get();
    }

    private void getSecurityConfig() {
        try {
            this.clusterSecurity = Boolean.parseBoolean(System.getenv("CLUSTER_SECURITY"));
            if (this.clusterSecurity) {
                this.principal = System.getenv("KEYTAB_SRC_PRINCIPAL");
                this.keytabPath = System.getenv("KEYTAB_DEST") + keyTabFile;
                if (!Files.exists(Paths.get(this.keytabPath, new String[0]), new LinkOption[0])) {
                    log.error("KeyTab file not found.");
                    throw new FileNotFoundException("KeyTab file not found.");
                }
                this.keyTabContent = KerberosUtil.getKeytabContent(this.keytabPath);
            }
        } catch (Exception e) {
            log.error("CLUSTER_SECURITY is not set. Default is True.");
        }
    }

    private void setCDLAuthentication(String str) {
        if (this.clusterSecurity && str.startsWith(CommonConstants.CDL_BASE_URL)) {
            this.client.register(new Authenticator(this.principal, this.keyTabContent));
            log.debug("Authentication set for rest call");
        }
    }
}
