package com.huawei.cdc.service.job.controller;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.json.JsonSanitizer;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.rest.validation.EndpointInputValidator;
import com.huawei.cdc.common.util.CommonUtil;
import com.huawei.cdc.common.util.CrypterUtils;
import com.huawei.cdc.datacomparison.DataComparisonJob;
import com.huawei.cdc.metadata.datacomparison.DcJobCompareDefinitionManagement;
import com.huawei.cdc.metadata.distributelock.DistributeLockManagement;
import com.huawei.cdc.metadata.jobdefinition.JobDefinitionManagement;
import com.huawei.cdc.metadata.jobexecution.JobExecutionManagement;
import com.huawei.cdc.metadata.jobexecutionmetrics.JobExecutionMetricsManagement;
import com.huawei.cdc.metadata.models.ApplicationStatus;
import com.huawei.cdc.metadata.models.CdcConnection;
import com.huawei.cdc.metadata.models.CdcDistributeLockEntity;
import com.huawei.cdc.metadata.models.CdcJobDefinition;
import com.huawei.cdc.metadata.models.CdcJobDetailMetric;
import com.huawei.cdc.metadata.models.CdcJobExecution;
import com.huawei.cdc.metadata.models.DcJobCompareDefinition;
import com.huawei.cdc.service.audit.CDCAuditLogger;
import com.huawei.cdc.service.exception.EntityException;
import com.huawei.cdc.service.exception.ParameterException;
import com.huawei.cdc.service.exception.RestException;
import com.huawei.cdc.service.job.model.DataCompareExecutionResponse;
import com.huawei.cdc.service.job.model.ExecutionResponse;
import com.huawei.cdc.service.job.model.RepairExecutionResponse;
import com.huawei.cdc.service.job.model.ViewJobExecutionsResponse;
import com.huawei.cdc.service.job.service.JobExecutionService;
import com.huawei.cdc.service.job.service.JobMetricService;
import com.huawei.cdc.service.models.ErrorResponse;
import com.huawei.cdc.service.rest.RestClient;
import com.huawei.cdc.service.rest.RestServer;
import com.huawei.cdc.service.util.BodyConstants;
import com.huawei.cdc.service.util.CommonConstants;
import com.huawei.cdc.service.util.DataSourcesConstants;
import com.huawei.cdc.service.util.ErrorConstants;
import com.huawei.cdc.service.util.JobControllerUtils;
import com.huawei.cdc.service.util.JobExecutionUtils;
import com.huawei.cdc.service.util.MetricsConstants;
import com.huawei.cdc.service.util.RequestUtil;
import com.huawei.cdc.service.util.RestConstants;
import com.huawei.cdc.service.util.ValidationConstants;
import com.huawei.cdc.service.validation.EntityConvertor;
import com.huawei.cdc.service.validation.ExecutionEnvValidator;
import com.huawei.cdl.app.launcher.ApplicationHandler;
import com.huawei.cdl.app.launcher.utils.YarnClientUtil;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.persistence.RollbackException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.launcher.SparkAppHandle;
import org.eclipse.persistence.exceptions.EclipseLinkException;
import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;

@RequestMapping({RestConstants.CDL_BASE_URL})
@RestController
/* loaded from: input_file:com/huawei/cdc/service/job/controller/CDCJobExecutionResource.class */
public class CDCJobExecutionResource {
    public static final Logger log = LoggerFactory.getLogger(CDCJobExecutionResource.class);
    private static final String JOB_PATH = "job";
    private static final String LAST_SUBMISSION_ID = "lastSubmissionIds";
    private static final String JOB_NAME = "jobName";
    private static final String COMPARE_PAIR_NAME = "comparePairName";
    private static final String SUBMISSION_ID = "submission_id";
    private static final String START = "start";
    private static final String STOP = "stop";
    private static final String RESTART = "restart";
    private static final String PAUSE = "pause";
    private static final String RESUME = "resume";
    private static final String REPAIR = "repair";
    private static final String DATA_COMPARE_JOB = "DATA_COMPARE_JOB";
    private static final String JOB_PATH_APPENDER = "/job/{jobName}/";
    private static final String JOB_START_PATH = "/job/{jobName}/start";
    private static final String JOB_SUBMISSIONS_PATH = "/job/submissions";
    private static final String JOB_SUBMISSION_PATH = "/job/{jobName}/submissions/{submission_id}";
    private static final String DC_COMPARE_PAIR_DETAILS_PATH = "/job/{jobName}/submissions/{submission_id}/{comparePairName}";
    private static final String DC_REPAIR_PATH = "/job/{jobName}/submissions/{submission_id}/repair";
    private static final String DC_REPAIR_JOB_PATH = "/job/{jobName}/repair";
    private static final String JOB_SUBMISSIONS_STOP_PATH = "/job/{jobName}/submissions/{submission_id}/stop";
    private static final String JOB_SUBMISSION_RESTART_PATH = "/job/{jobName}/submissions/{submission_id}/restart";
    private static final String JOB_SUBMISSIONS_PAUSE_PATH = "/job/{jobName}/submissions/{submission_id}/pause";
    private static final String JOB_SUBMISSIONS_RESUME_PATH = "/job/{jobName}/submissions/{submission_id}/resume";
    private static final String TARGET = "CDCJobExecutionResource";

    @Autowired
    private JobExecutionMetricsManagement jobExecutionMetricsManagement;

    @Autowired
    private JobMetricService jobMetricService;

    @Autowired
    private JobExecutionService jobExecutionService;

    @Autowired
    private CDCJobExecutor cdcJobExecutor;

    @Autowired
    private JobExecutionManagement jobExecutionManagement;

    @Autowired
    private JobDefinitionManagement jobDefinitionManagement;

    @Autowired
    private DistributeLockManagement distributeLockManagement;

    /* JADX WARN: Failed to calculate best type for var: r20v3 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v3 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r21v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r21v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 20, insn: 0x01fc: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r20 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:52:0x01fc */
    /* JADX WARN: Not initialized variable reg: 21, insn: 0x0201: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r21 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:54:0x0201 */
    /* JADX WARN: Type inference failed for: r11v0, types: [com.huawei.cdc.service.job.controller.CDCJobExecutionResource] */
    /* JADX WARN: Type inference failed for: r20v0, types: [com.huawei.cdc.service.exception.RestException, java.lang.Exception] */
    /* JADX WARN: Type inference failed for: r20v3, types: [com.huawei.cdc.metadata.jobexecution.JobExecutionManagement] */
    /* JADX WARN: Type inference failed for: r21v1, types: [java.lang.Throwable] */
    @PutMapping(value = {JOB_SUBMISSION_RESTART_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> restartSubmissionExecution(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2, @RequestBody(required = false) Map<String, Object> map) {
        ?? r20;
        ?? r21;
        EndpointInputValidator.validateNumeric(SUBMISSION_ID, str2);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, currentTimeMillis, request, CDCAuditLogger.AuditConstants.RESTART_CDL_JOB);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        try {
            if (!CollectionUtils.isEmpty(map)) {
                validateRestartParameters(map);
            }
            try {
                JobExecutionManagement jobExecutionManagement = new JobExecutionManagement();
                Throwable th = null;
                CdcJobExecution jobExecution = getJobExecution(str, str2, jobExecutionManagement, currentTimeMillis, request, CDCAuditLogger.AuditConstants.RESTART_CDL_JOB);
                CdcJobDefinition definition = jobExecutionManagement.getDefinition(str);
                if (definition == null) {
                    logAndReturnResponse("Job definition is not present for {}", str, ErrorConstants.JOB_NOT_FOUND_ERROR, CDCAuditLogger.AuditConstants.START_CDL_JOB, currentTimeMillis, request);
                }
                CdcConnection connection = jobExecutionManagement.getConnection(jobExecution.getSourceConnectorId().intValue());
                CdcConnection connection2 = jobExecutionManagement.getConnection(jobExecution.getTargetConnectorId().intValue());
                String connectorName = jobExecutionManagement.getConnectorName(str, str2, true);
                String connectorName2 = jobExecutionManagement.getConnectorName(str, str2, false);
                boolean isKafkaTypeLink = JobControllerUtils.isKafkaTypeLink(connection);
                boolean isKafkaTypeLink2 = JobControllerUtils.isKafkaTypeLink(connection2);
                syncJobExecutionStatus(jobExecution, isKafkaTypeLink, isKafkaTypeLink2, connectorName, connectorName2, jobExecutionManagement);
                validateJobStatusBeforeRestart(jobExecution, str, str2);
                if (jobExecution.getJobType().equals("DATA_COMPARE_JOB")) {
                    DataComparisonJob dataComparisonJob = new DataComparisonJob();
                    JobExecutionUtils.processRestartDataComparejob(definition, jobExecution, connection, connection2, jobExecutionManagement);
                    dataComparisonJob.syncJobExecutionAndComparePairStatus(jobExecution, jobExecutionManagement);
                    ResponseEntity<Object> responseEntity = new ResponseEntity<>(ExecutionResponse.getDataCompareJobExecutionInstance(jobExecution), HttpStatus.CREATED);
                    if (jobExecutionManagement != null) {
                        if (0 != 0) {
                            try {
                                jobExecutionManagement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jobExecutionManagement.close();
                        }
                    }
                    return responseEntity;
                }
                if (!isKafkaTypeLink) {
                    restartConnector(connectorName, RestConstants.SOURCE_CONNECTOR, map, connection);
                }
                if (!isKafkaTypeLink2) {
                    restartConnector(connectorName2, RestConstants.SINK_CONNECTOR, map, connection2);
                }
                Map<String, Object> updateJobStatusAfterRestart = updateJobStatusAfterRestart(jobExecution, connection, connectorName, isKafkaTypeLink, connection2, connectorName2, isKafkaTypeLink2);
                Object obj = updateJobStatusAfterRestart.get("sourStatus");
                Object obj2 = updateJobStatusAfterRestart.get("sinkStatus");
                CdcJobExecution cdcJobExecution = (CdcJobExecution) updateJobStatusAfterRestart.get("cdcJobExecution");
                CDCAuditLogger.logSuccess(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), "Restart CDL Job " + str, TARGET);
                ResponseEntity<Object> responseEntity2 = new ResponseEntity<>(ExecutionResponse.getInstance(cdcJobExecution, obj, obj2, null), HttpStatus.CREATED);
                if (jobExecutionManagement != null) {
                    if (0 != 0) {
                        try {
                            jobExecutionManagement.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        jobExecutionManagement.close();
                    }
                }
                return responseEntity2;
            } catch (Throwable th4) {
                if (r20 != 0) {
                    if (r21 != 0) {
                        try {
                            r20.close();
                        } catch (Throwable th5) {
                            r21.addSuppressed(th5);
                        }
                    } else {
                        r20.close();
                    }
                }
                throw th4;
            }
        } catch (Exception e) {
            doJobExecutionManagement(null, e);
            log.error("Failed to restart job. ", e);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.RESTART_CDL_JOB, TARGET, e.getMessage());
            return new ResponseEntity<>(new ErrorResponse(e), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (BadRequestException e2) {
            log.error("Failed to restart job. {}", e2.toString());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.RESTART_CDL_JOB, TARGET, e2.getMessage());
            HashMap hashMap = new HashMap();
            hashMap.put(RestConstants.ERROR_MESSAGE, e2.getMessage());
            return new ResponseEntity<>(hashMap, HttpStatus.BAD_REQUEST);
        } catch (RestException e3) {
            doJobExecutionManagement(null, e3);
            log.error("Failed to restart job. {}", e3.toString());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.RESTART_CDL_JOB, TARGET, e3.getMessage());
            return new ResponseEntity<>(prepareResponse(e3), HttpStatus.BAD_REQUEST);
        }
    }

    private Map<String, Object> updateJobStatusAfterRestart(CdcJobExecution cdcJobExecution, CdcConnection cdcConnection, String str, boolean z, CdcConnection cdcConnection2, String str2, boolean z2) {
        cdcJobExecution.setStatus(ApplicationStatus.STARTED.getStatus());
        CdcJobExecution updateJobExecution = this.jobExecutionManagement.updateJobExecution(cdcJobExecution);
        Object connectorStatusesAfterStart = getConnectorStatusesAfterStart(cdcConnection, str);
        Object connectorStatusesAfterStart2 = getConnectorStatusesAfterStart(cdcConnection2, str2);
        updateJobStatus(updateJobExecution, connectorStatusesAfterStart, connectorStatusesAfterStart2, z, z2, str, str2);
        if (ApplicationStatus.STARTED.getStatus().equals(updateJobExecution.getStatus())) {
            updateJobExecution.setErrorMessage((String) null);
            updateJobExecution.setErrorCode((String) null);
        }
        CdcJobExecution updateJobExecution2 = this.jobExecutionManagement.updateJobExecution(updateJobExecution);
        HashMap hashMap = new HashMap();
        hashMap.put("sourStatus", connectorStatusesAfterStart);
        hashMap.put("sinkStatus", connectorStatusesAfterStart2);
        hashMap.put("cdcJobExecution", updateJobExecution2);
        return hashMap;
    }

    private Map<String, Object> prepareResponse(RestException restException) {
        HashMap hashMap = new HashMap();
        ErrorResponse errorResponse = new ErrorResponse(restException);
        hashMap.put(RestConstants.ERROR_CODE, errorResponse.getErrorCode());
        hashMap.put(RestConstants.ERROR_MESSAGE, errorResponse.getErrorMessage());
        if (errorResponse.getErrorMessage().contains("No status found") || errorResponse.getErrorMessage().contains("Request timed out")) {
            hashMap.put(RestConstants.STATUS_MESSAGE, "Failed to get job status. Please check job status after sometime.");
        }
        return hashMap;
    }

    private void validateJobStatusBeforeRestart(CdcJobExecution cdcJobExecution, String str, String str2) {
        if (cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.STARTED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be restarted as the job is started.");
        }
        if (cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.COMPLETED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be restarted as the job is completed successfully. Please start it first.");
        }
        if (ApplicationStatus.STOPPED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be restarted as the job is stopped. Please start it first.");
        }
        if (ApplicationStatus.PAUSED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be restarted as the job is paused. Please resume it first.");
        }
        if (ApplicationStatus.RUNNING.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be restarted as the job is running.");
        }
    }

    private void validateRestartParameters(Map<String, Object> map) {
        if (map.containsKey(RestConstants.SOURCE_CONNECTOR)) {
            validate(map, RestConstants.SOURCE_CONNECTOR);
        }
        if (map.containsKey(RestConstants.SINK_CONNECTOR)) {
            validate(map, RestConstants.SINK_CONNECTOR);
        }
    }

    private void validate(Map<String, Object> map, String str) {
        Object obj = map.get(str);
        if (!(obj instanceof List)) {
            throw new ParameterException(ErrorConstants.INVALID_PARAMETER_ERROR, str);
        }
        Iterator it = ((List) obj).iterator();
        while (it.hasNext()) {
            if (!(it.next() instanceof Map)) {
                throw new ParameterException(ErrorConstants.INVALID_PARAMETER_ERROR, str);
            }
        }
    }

    private void restartConnector(String str, String str2, Map<String, Object> map, CdcConnection cdcConnection) {
        restart(cdcConnection, str, map, getWorkerMapping(JobControllerUtils.getConnectorStatus(str, false)), str2);
    }

    private List<ExecutionResponse> getJobExecutionsByDefID(String str, Integer num) throws Exception {
        try {
            List allExecutionsByJobDefID = this.jobExecutionManagement.getAllExecutionsByJobDefID(str, num);
            if (allExecutionsByJobDefID != null) {
                return (List) allExecutionsByJobDefID.stream().map(ExecutionResponse::getInstance).collect(Collectors.toList());
            }
            throw new Exception("Job execution is null.");
        } catch (RestException e) {
            log.error("Error during retrieval of job submissions for jobName {}", str);
            throw e;
        } catch (Exception e2) {
            log.error("Unexpected error during retrieval of job submissions for jobName {}", str);
            throw new Exception(e2);
        }
    }

    private CdcJobExecution getJobExecution(String str, String str2, JobExecutionManagement jobExecutionManagement, long j, HttpServletRequest httpServletRequest, String str3) {
        CdcJobDefinition definition = jobExecutionManagement.getDefinition(str);
        if (definition == null) {
            logAndReturnResponse("Job definition is not present for {}", str, ErrorConstants.JOB_NOT_FOUND_ERROR, str3, j, httpServletRequest);
        }
        CdcJobExecution jobExecution = jobExecutionManagement.getJobExecution(Integer.parseInt(str2));
        if (jobExecution == null || !jobExecution.getName().equals(str)) {
            throw new BadRequestException(str + " with submission id " + str2 + " does not exist.");
        }
        if (definition.getId() == null || jobExecution.getJobDefinitionId() == null || !definition.getId().equals(jobExecution.getJobDefinitionId())) {
            throw new BadRequestException("Job definition id for given " + str + " with submission id " + str2 + " does not match.");
        }
        return jobExecution;
    }

    private void validateJobExecutionBeforePause(String str, String str2, CdcJobExecution cdcJobExecution, long j, HttpServletRequest httpServletRequest) {
        if (cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.STARTED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be paused as the job is started. It can be paused only when running or partially running.");
        }
        if (cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.COMPLETED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be paused as the job is completed successfully. Please start it first.");
        }
        if (ApplicationStatus.PAUSED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " is already paused.");
        }
        if (ApplicationStatus.FAILED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be paused as the job is failed. Please start or restart it first.");
        }
        if (ApplicationStatus.STOPPED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be paused as the job is stopped. Please start it first.");
        }
        if (ApplicationStatus.RUNNING.getStatus().equals(cdcJobExecution.getStatus()) || ApplicationStatus.PARTIAL.getStatus().equals(cdcJobExecution.getStatus())) {
            cdcJobExecution.setStatus(ApplicationStatus.PAUSED.getStatus());
        } else {
            logAndReturnResponse(str + " with submission id {} is not Running", str2, ErrorConstants.JOB_EXECUTION_NOT_PRESENT, CDCAuditLogger.AuditConstants.PAUSE_CDL_JOB, j, httpServletRequest);
        }
    }

    private void updatePauseMetricsValueForPause(boolean z, String str, String str2) throws Exception {
        if (z) {
            return;
        }
        JobExecutionMetricsManagement jobExecutionMetricsManagement = new JobExecutionMetricsManagement();
        Throwable th = null;
        try {
            try {
                HashMap hashMap = new HashMap();
                hashMap.put(MetricsConstants.PAUSE_START_TIME, Long.valueOf(System.currentTimeMillis()));
                jobExecutionMetricsManagement.persistMetrics(hashMap, Integer.valueOf(Integer.parseInt(str2)));
                if (jobExecutionMetricsManagement != null) {
                    if (0 == 0) {
                        jobExecutionMetricsManagement.close();
                        return;
                    }
                    try {
                        jobExecutionMetricsManagement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (jobExecutionMetricsManagement != null) {
                if (th != null) {
                    try {
                        jobExecutionMetricsManagement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    jobExecutionMetricsManagement.close();
                }
            }
            throw th4;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r21v0, types: [com.huawei.cdc.service.exception.RestException, java.lang.Exception] */
    @PutMapping(value = {JOB_SUBMISSIONS_PAUSE_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> pauseSubmissionExecution(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(JOB_NAME, str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        EndpointInputValidator.validateNumeric(SUBMISSION_ID, str2);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, currentTimeMillis, request, CDCAuditLogger.AuditConstants.PAUSE_CDL_JOB);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        try {
            CdcJobExecution jobExecution = getJobExecution(str, str2, this.jobExecutionManagement, currentTimeMillis, request, CDCAuditLogger.AuditConstants.PAUSE_CDL_JOB);
            CdcJobDefinition definition = this.jobDefinitionManagement.getDefinition(str);
            String connectorName = this.jobExecutionManagement.getConnectorName(str, str2, true);
            CdcConnection connection = this.jobExecutionManagement.getConnection(jobExecution.getSourceConnectorId().intValue());
            boolean isKafkaTypeLink = JobControllerUtils.isKafkaTypeLink(connection);
            syncJobExecutionStatus(str, jobExecution, this.jobExecutionManagement);
            validateJobExecutionBeforePause(str, str2, jobExecution, currentTimeMillis, request);
            if (jobExecution.getJobType().equals("DATA_COMPARE_JOB")) {
                DataComparisonJob dataComparisonJob = new DataComparisonJob();
                dataComparisonJob.pauseCdlDataCompare(jobExecution);
                dataComparisonJob.syncJobExecutionAndComparePairStatus(jobExecution, this.jobExecutionManagement, ApplicationStatus.PAUSED.getStatus());
                return new ResponseEntity<>(ExecutionResponse.getDataCompareJobExecutionInstance(jobExecution), HttpStatus.OK);
            }
            String str3 = null;
            boolean z = false;
            CdcConnection cdcConnection = null;
            if (JobControllerUtils.isHudiEnabled(definition)) {
                pauseConnector(connection, connectorName);
            } else {
                str3 = this.jobExecutionManagement.getConnectorName(str, str2, false);
                cdcConnection = this.jobExecutionManagement.getConnection(jobExecution.getTargetConnectorId().intValue());
                z = JobControllerUtils.isKafkaTypeLink(cdcConnection);
                pauseConnectors(connection, cdcConnection, connectorName, str3);
            }
            updatePauseMetricsValueForPause(isKafkaTypeLink, str, str2);
            Object connectorStatusesAfterPause = getConnectorStatusesAfterPause(connection, connectorName);
            if (JobControllerUtils.isHudiEnabled(definition)) {
                updateJobStatusWithAppAsTarget(jobExecution, connectorStatusesAfterPause, isKafkaTypeLink, connectorName);
            } else {
                updateJobStatus(jobExecution, connectorStatusesAfterPause, getConnectorStatusesAfterPause(cdcConnection, str3), isKafkaTypeLink, z, connectorName, str3);
            }
            this.jobExecutionManagement.updateJobExecution(jobExecution);
            CDCAuditLogger.logSuccess(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), "Pause CDL Job " + str, TARGET);
            return new ResponseEntity<>(ExecutionResponse.getInstance(jobExecution), HttpStatus.OK);
        } catch (Exception e) {
            log.error("Failed to pause job. ", e);
            doJobExecutionManagement(null, e);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.PAUSE_CDL_JOB, TARGET, e.getMessage());
            return new ResponseEntity<>(new ErrorResponse(e), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (BadRequestException e2) {
            log.error("Failed to pause job. {}", e2.toString());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.PAUSE_CDL_JOB, TARGET, e2.getMessage());
            HashMap hashMap2 = new HashMap();
            hashMap2.put(RestConstants.ERROR_MESSAGE, e2.getMessage());
            return new ResponseEntity<>(hashMap2, HttpStatus.BAD_REQUEST);
        } catch (RestException e3) {
            log.error("Failed to pause job. {}", e3.toString());
            doJobExecutionManagement(null, e3);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.PAUSE_CDL_JOB, TARGET, e3.getMessage());
            return new ResponseEntity<>(prepareResponse(e3), HttpStatus.BAD_REQUEST);
        }
    }

    private void validateJobExecutionBeforeResume(String str, String str2, CdcJobExecution cdcJobExecution, long j, HttpServletRequest httpServletRequest) {
        if (cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.STARTED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be resumed as the job is started. Please pause it first.");
        }
        if (cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.COMPLETED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be resumed as the job is completed successfully. Please start it first.");
        }
        if (ApplicationStatus.RUNNING.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be resumed as the job is running.");
        }
        if (ApplicationStatus.PARTIAL.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be resumed as the job is partially running.");
        }
        if (ApplicationStatus.FAILED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be resumed as the job is failed. Please restart it first.");
        }
        if (ApplicationStatus.STOPPED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be resumed as the job is stopped. Please start it first.");
        }
        if (ApplicationStatus.PAUSED.getStatus().equals(cdcJobExecution.getStatus())) {
            cdcJobExecution.setStatus(ApplicationStatus.RUNNING.getStatus());
        } else {
            logAndReturnResponse(str + " with submission id {} is not paused", str2, ErrorConstants.JOB_EXECUTION_NOT_PRESENT, CDCAuditLogger.AuditConstants.RESUME_CDL_JOB, j, httpServletRequest);
        }
    }

    private void updatePauseMetricsValueForResume(boolean z, String str, String str2) throws Exception {
        if (z) {
            return;
        }
        HashMap hashMap = new HashMap();
        List jobPauseAttributeDetailById = this.jobExecutionMetricsManagement.getJobPauseAttributeDetailById(str2);
        CdcJobDetailMetric cdcJobDetailMetric = (CdcJobDetailMetric) ((List) Optional.ofNullable(jobPauseAttributeDetailById).get()).stream().filter(cdcJobDetailMetric2 -> {
            return MetricsConstants.PAUSE_START_TIME.equals(cdcJobDetailMetric2.getNAME());
        }).findFirst().get();
        Optional findFirst = ((List) Optional.ofNullable(jobPauseAttributeDetailById).get()).stream().filter(cdcJobDetailMetric3 -> {
            return MetricsConstants.TOTAL_PAUSE_TIME.equals(cdcJobDetailMetric3.getNAME());
        }).findFirst();
        long currentTimeMillis = System.currentTimeMillis() - Long.parseLong(cdcJobDetailMetric.getValue());
        if (findFirst.isPresent()) {
            hashMap.put(MetricsConstants.TOTAL_PAUSE_TIME, Long.valueOf(Duration.ofMillis(Long.parseLong(((CdcJobDetailMetric) findFirst.get()).getValue())).plus(Duration.ofMillis(currentTimeMillis)).toMillis()));
        } else {
            hashMap.put(MetricsConstants.TOTAL_PAUSE_TIME, String.valueOf(currentTimeMillis));
        }
        hashMap.put(MetricsConstants.EXECUTION_STATUS, ApplicationStatus.RUNNING.getStatus());
        this.jobExecutionMetricsManagement.deleteJobExecutionMetric(cdcJobDetailMetric);
        this.jobExecutionMetricsManagement.persistMetrics(hashMap, Integer.valueOf(Integer.parseInt(str2)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r21v0, types: [com.huawei.cdc.service.exception.RestException, java.lang.Exception] */
    @PutMapping(value = {JOB_SUBMISSIONS_RESUME_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> resumeSubmissionExecution(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(JOB_NAME, str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        EndpointInputValidator.validateNumeric(SUBMISSION_ID, str2);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, currentTimeMillis, request, CDCAuditLogger.AuditConstants.RESUME_CDL_JOB);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        try {
            CdcJobExecution jobExecution = getJobExecution(str, str2, this.jobExecutionManagement, currentTimeMillis, request, CDCAuditLogger.AuditConstants.RESUME_CDL_JOB);
            CdcJobDefinition definition = this.jobDefinitionManagement.getDefinition(str);
            String connectorName = this.jobExecutionManagement.getConnectorName(str, str2, true);
            CdcConnection connection = this.jobExecutionManagement.getConnection(jobExecution.getSourceConnectorId().intValue());
            boolean isKafkaTypeLink = JobControllerUtils.isKafkaTypeLink(connection);
            syncJobExecutionStatus(str, jobExecution, this.jobExecutionManagement);
            validateJobExecutionBeforeResume(str, str2, jobExecution, currentTimeMillis, request);
            if (jobExecution.getJobType().equals("DATA_COMPARE_JOB")) {
                DataComparisonJob dataComparisonJob = new DataComparisonJob();
                dataComparisonJob.resumeCdlDataCompare(jobExecution);
                dataComparisonJob.syncJobExecutionAndComparePairStatus(jobExecution, this.jobExecutionManagement);
                return new ResponseEntity<>(ExecutionResponse.getDataCompareJobExecutionInstance(jobExecution), HttpStatus.OK);
            }
            String str3 = null;
            CdcConnection cdcConnection = null;
            boolean z = false;
            if (JobControllerUtils.isHudiEnabled(definition)) {
                resumeConnector(connection, connectorName);
            } else {
                str3 = this.jobExecutionManagement.getConnectorName(str, str2, false);
                cdcConnection = this.jobExecutionManagement.getConnection(jobExecution.getTargetConnectorId().intValue());
                z = JobControllerUtils.isKafkaTypeLink(cdcConnection);
                resumeConnectors(connection, cdcConnection, connectorName, str3);
            }
            updatePauseMetricsValueForResume(isKafkaTypeLink, connectorName, str2);
            Object connectorStatusesAfterResume = getConnectorStatusesAfterResume(connection, connectorName);
            if (JobControllerUtils.isHudiEnabled(definition)) {
                updateJobStatusWithAppAsTarget(jobExecution, connectorStatusesAfterResume, isKafkaTypeLink, connectorName);
            } else {
                updateJobStatus(jobExecution, connectorStatusesAfterResume, getConnectorStatusesAfterResume(cdcConnection, str3), isKafkaTypeLink, z, connectorName, str3);
            }
            this.jobExecutionManagement.updateJobExecution(jobExecution);
            CDCAuditLogger.logSuccess(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), "Resume CDL Job " + str, TARGET);
            return new ResponseEntity<>(ExecutionResponse.getInstance(jobExecution), HttpStatus.OK);
        } catch (Exception e) {
            log.error("Failed to resume job. ", e);
            doJobExecutionManagement(null, e);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.RESUME_CDL_JOB, TARGET, e.getMessage());
            return new ResponseEntity<>(new ErrorResponse(e), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (BadRequestException e2) {
            log.error("Failed to resume job. {}", e2.toString());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.RESUME_CDL_JOB, TARGET, e2.getMessage());
            HashMap hashMap2 = new HashMap();
            hashMap2.put(RestConstants.ERROR_MESSAGE, e2.getMessage());
            return new ResponseEntity<>(hashMap2, HttpStatus.BAD_REQUEST);
        } catch (RestException e3) {
            log.error("Failed to resume job. {}", e3.toString());
            doJobExecutionManagement(null, e3);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.RESUME_CDL_JOB, TARGET, e3.getMessage());
            return new ResponseEntity<>(prepareResponse(e3), HttpStatus.BAD_REQUEST);
        }
    }

    private void syncJobExecutionStatus(String str, CdcJobExecution cdcJobExecution, JobExecutionManagement jobExecutionManagement) throws Exception {
        if (cdcJobExecution.getJobType().equalsIgnoreCase("DATA_COMPARE_JOB")) {
            new DataComparisonJob().syncJobExecutionAndComparePairStatus(cdcJobExecution, jobExecutionManagement);
        } else {
            syncJobStatus(str, cdcJobExecution);
        }
    }

    private void syncJobExecutionStatus(CdcJobExecution cdcJobExecution, boolean z, boolean z2, String str, String str2, JobExecutionManagement jobExecutionManagement) throws Exception {
        if (cdcJobExecution.getJobType().equalsIgnoreCase("DATA_COMPARE_JOB")) {
            new DataComparisonJob().syncJobExecutionAndComparePairStatus(cdcJobExecution, jobExecutionManagement);
        } else {
            syncJobStatus(cdcJobExecution, z, z2, str, str2);
        }
    }

    private void resumeConnectors(CdcConnection cdcConnection, CdcConnection cdcConnection2, String str, String str2) throws Exception {
        try {
            resumeConnector(cdcConnection, str);
            resumeConnector(cdcConnection2, str2);
            log.info("Resuming Connectors Success");
        } catch (Exception e) {
            log.error("Exception in resuming connectors{}", e.getMessage());
            throw new Exception(e);
        }
    }

    private void resumeConnector(CdcConnection cdcConnection, String str) {
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            return;
        }
        RestClient.doRequest(RestConstants.RESUME_CONNECTOR_SUFFIX.replace(RestConstants.CONNECTOR_NAME, str), Entity.text(CommonConstants.EMPTY).toString(), RestConstants.HTTP_PUT);
    }

    private void pauseConnectors(CdcConnection cdcConnection, CdcConnection cdcConnection2, String str, String str2) throws Exception {
        try {
            pauseConnector(cdcConnection, str);
            pauseConnector(cdcConnection2, str2);
            log.info("Pausing Connectors Success");
        } catch (Exception e) {
            log.error("Exception in pausing connectors {}", e.getMessage());
            throw new Exception(e);
        }
    }

    private void pauseConnector(CdcConnection cdcConnection, String str) {
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            return;
        }
        RestClient.doRequest(RestConstants.PAUSE_CONNECTOR_SUFFIX.replace(RestConstants.CONNECTOR_NAME, str), Entity.text(CommonConstants.EMPTY).toString(), RestConstants.HTTP_PUT);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void doJobExecutionManagement(CdcJobExecution cdcJobExecution, Exception exc) {
        if (cdcJobExecution != null) {
            JobExecutionManagement jobExecutionManagement = new JobExecutionManagement();
            Throwable th = null;
            try {
                try {
                    if (exc instanceof RestException) {
                        cdcJobExecution.setErrorCode(String.valueOf(((RestException) exc).getCode()));
                    }
                    cdcJobExecution.setErrorMessage(exc.getMessage());
                    cdcJobExecution.setStatus(ApplicationStatus.FAILED.getStatus());
                    jobExecutionManagement.updateJobExecution(cdcJobExecution);
                    if (jobExecutionManagement != null) {
                        if (0 == 0) {
                            jobExecutionManagement.close();
                            return;
                        }
                        try {
                            jobExecutionManagement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (jobExecutionManagement != null) {
                    if (th != null) {
                        try {
                            jobExecutionManagement.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        jobExecutionManagement.close();
                    }
                }
                throw th4;
            }
        }
    }

    private void restart(CdcConnection cdcConnection, String str, Map<String, Object> map, Map<String, RestServer> map2, String str2) {
        if (map == null || map.isEmpty()) {
            restartConnector(str);
            return;
        }
        if (map.containsKey(str2)) {
            List<Object> list = (List) map.get(str2);
            if (list == null || list.size() == 0) {
                restartConnector(str);
            } else {
                restartTasksInTaskList(list, str, cdcConnection, map2);
            }
        }
    }

    private void restartTasksInTaskList(List<Object> list, String str, CdcConnection cdcConnection, Map<String, RestServer> map) {
        String replace;
        Response doRequest;
        Iterator<Object> it = list.iterator();
        while (it.hasNext()) {
            Map map2 = (Map) it.next();
            String valueOf = String.valueOf(map2.get(RestConstants.TASK_ID));
            if (!StringUtils.isNumeric(valueOf)) {
                throw new ParameterException(ErrorConstants.INVALID_PARAMETER_ERROR, RestConstants.TASK_ID);
            }
            if (map2.containsKey(RestConstants.START_RECORD_IDENTIFIER) && DataSourcesConstants.ORACLE.equalsIgnoreCase(cdcConnection.getType())) {
                String valueOf2 = String.valueOf(map2.get(RestConstants.START_RECORD_IDENTIFIER));
                if (!StringUtils.isNumeric(valueOf2)) {
                    throw new ParameterException(ErrorConstants.INVALID_PARAMETER_ERROR, RestConstants.START_RECORD_IDENTIFIER);
                }
                String prepareRequestBodyForRestartTask = EntityConvertor.prepareRequestBodyForRestartTask(valueOf2);
                replace = RestConstants.CDC_RESTART_TASK.replace(RestConstants.CONNECTOR_NAME, str).replace(RestConstants.TASK_NAME, valueOf);
                doRequest = RestClient.doRequest(replace, prepareRequestBodyForRestartTask, RestConstants.HTTP_POST, map.get(valueOf));
            } else {
                replace = RestConstants.KAFKA_RESTART_TASK.replace(RestConstants.CONNECTOR_NAME, str).replace(RestConstants.TASK_NAME, valueOf);
                doRequest = RestClient.doRequest(replace, RestConstants.HTTP_POST);
            }
            if (doRequest == null) {
                log.error("Failed to get Response from {}", replace);
                throw new EntityException(ErrorConstants.KAFKA_CONNECT_INVALID_RESPONSE, "getting status for  " + replace);
            }
            String sanitize = JsonSanitizer.sanitize((String) doRequest.readEntity(String.class));
            if (doRequest.getStatus() != HttpStatus.OK.value() && doRequest.getStatus() != HttpStatus.NO_CONTENT.value()) {
                try {
                    Map map3 = (Map) CommonConstants.JSON_MAPPER.readValue(sanitize, new TypeReference<Map<String, Object>>() { // from class: com.huawei.cdc.service.job.controller.CDCJobExecutionResource.1
                    });
                    throw new EntityException(ErrorConstants.KAFKA_CONNECT_ERROR, String.valueOf(map3.get(RestConstants.KAFKA_ERROR_CODE)), String.valueOf(map3.get("message")));
                } catch (IOException e) {
                    log.error("Could not read JSON response {}", sanitize, e);
                    throw new EntityException(ErrorConstants.KAFKA_CONNECT_INVALID_RESPONSE, "getting status of connector " + str);
                }
            }
        }
    }

    private Map<String, RestServer> getWorkerMapping(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        if (map != null && map.size() > 0) {
            for (Map map2 : (List) map.get(RestConstants.TASKS)) {
                hashMap.put(String.valueOf(map2.get("id")), new RestServer(String.valueOf(map2.get(RestConstants.WORKER_ID))));
            }
        }
        return hashMap;
    }

    private void validateJobExecutionBeforeStop(String str, String str2, CdcJobExecution cdcJobExecution) {
        if (cdcJobExecution.getJobType().equalsIgnoreCase("DATA_COMPARE_JOB") && cdcJobExecution.getStatus().equalsIgnoreCase(ApplicationStatus.COMPLETED.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " cannot be stopped as the job is completed successfully. Please start it first.");
        }
        if (ApplicationStatus.STOPPED.getStatus().equals(cdcJobExecution.getStatus())) {
            throw new BadRequestException(str + " with submission id " + str2 + " is already stopped.");
        }
        cdcJobExecution.setStatus(ApplicationStatus.STOPPED.getStatus());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r20v1, types: [com.huawei.cdc.service.exception.RestException, java.lang.Exception] */
    @PutMapping({JOB_SUBMISSIONS_STOP_PATH})
    @ResponseBody
    public ResponseEntity<Object> stopSubmissionExecution(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(JOB_NAME, str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        EndpointInputValidator.validateNumeric(SUBMISSION_ID, str2);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, currentTimeMillis, request, CDCAuditLogger.AuditConstants.STOP_CDL_JOB);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        try {
            CdcJobExecution jobExecution = getJobExecution(str, str2, this.jobExecutionManagement, currentTimeMillis, request, CDCAuditLogger.AuditConstants.STOP_CDL_JOB);
            syncJobExecutionStatus(str, jobExecution, this.jobExecutionManagement);
            validateJobExecutionBeforeStop(str, str2, jobExecution);
            this.jobMetricService.refreshJob();
            if (jobExecution.getJobType().equals("DATA_COMPARE_JOB")) {
                DataComparisonJob dataComparisonJob = new DataComparisonJob();
                dataComparisonJob.stopCdlDataCompare(jobExecution);
                dataComparisonJob.syncJobExecutionAndComparePairStatus(jobExecution, this.jobExecutionManagement, ApplicationStatus.STOPPED.getStatus());
                jobExecution.setExecutionEndTime(LocalDateTime.now());
                return new ResponseEntity<>(ExecutionResponse.getInstance(this.jobExecutionManagement.updateJobExecution(jobExecution)), HttpStatus.OK);
            }
            CdcConnection connectionByID = this.jobMetricService.getConnectionByID(jobExecution.getSourceConnectorId());
            String connectorName = this.jobMetricService.getConnectorName(str, Integer.parseInt(str2), true);
            if (!JobControllerUtils.isKafkaTypeLink(connectionByID)) {
                this.jobMetricService.persistJobMetrics(jobExecution);
            }
            stopConnector(connectionByID, connectorName, BodyConstants.LOG_SOURCE);
            clearCache(connectorName, str2);
            stopTargetConnector(str, jobExecution, str2);
            jobExecution.setStatus(ApplicationStatus.STOPPED.getStatus());
            jobExecution.setExecutionEndTime(LocalDateTime.now());
            this.jobExecutionManagement.updateJobExecution(jobExecution);
            CDCAuditLogger.logSuccess(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), "Stop CDL Job " + str, TARGET);
            return new ResponseEntity<>(ExecutionResponse.getInstance(jobExecution), HttpStatus.OK);
        } catch (BadRequestException e) {
            log.error("Failed to stop job. {}", e.getMessage());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.STOP_CDL_JOB, TARGET, e.getMessage());
            HashMap hashMap2 = new HashMap();
            hashMap2.put(RestConstants.ERROR_MESSAGE, e.getMessage());
            return new ResponseEntity<>(hashMap2, HttpStatus.BAD_REQUEST);
        } catch (RestException e2) {
            log.error("Failed to stop job. {}", e2.getMessage());
            doJobExecutionManagement(null, e2);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.STOP_CDL_JOB, TARGET, e2.getMessage());
            return new ResponseEntity<>(new ErrorResponse((RestException) e2), HttpStatus.BAD_REQUEST);
        } catch (Exception e3) {
            log.error("Failed to stop job. ", e3);
            doJobExecutionManagement(null, e3);
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.STOP_CDL_JOB, TARGET, e3.getMessage());
            return new ResponseEntity<>(new ErrorResponse(e3), HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private void stopTargetConnector(String str, CdcJobExecution cdcJobExecution, String str2) {
        if (!JobControllerUtils.isHudiEnabled(this.jobExecutionManagement.getDefinition(str))) {
            stopConnector(this.jobExecutionManagement.getConnection(cdcJobExecution.getTargetConnectorId().intValue()), this.jobExecutionManagement.getConnectorName(str, str2, false), "target");
            return;
        }
        String appId = cdcJobExecution.getAppId();
        String name = YarnClientUtil.getStatus(appId).name();
        if (ApplicationHandler.State.KILLED.name().equals(name) || ApplicationHandler.State.FINISHED.name().equals(name) || ApplicationHandler.State.FAILED.name().equals(name)) {
            return;
        }
        YarnClientUtil.stopJob(appId);
        waitForAppStatus(appId, SparkAppHandle.State.KILLED.name(), STOP);
        if (SparkAppHandle.State.KILLED.name().equals(YarnClientUtil.getStatus(appId).name())) {
            return;
        }
        JobExecutionUtils.handleAppStartStopFailure(appId, STOP);
    }

    private void clearCache(String str, String str2) {
        RestClient.doRequest(RestConstants.CONNECTOR_END_CACHE_SUFFIX.replace(RestConstants.CONNECTOR_NAME, str).replace(RestConstants.SUB, str2), RestConstants.HTTP_DELETE);
    }

    @GetMapping(value = {JOB_SUBMISSION_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> getJobExecution(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2) {
        Object obj;
        HashMap hashMap = new HashMap();
        hashMap.put(JOB_NAME, str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        EndpointInputValidator.validateNumeric(SUBMISSION_ID, str2);
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, System.currentTimeMillis(), RequestContextHolder.currentRequestAttributes().getRequest(), null);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        try {
            CdcJobExecution jobExecution = this.jobExecutionManagement.getJobExecution(Integer.parseInt(str2));
            CdcJobDefinition definition = this.jobExecutionManagement.getDefinition(str);
            if (jobExecution == null || !jobExecution.getName().equals(str)) {
                throw new BadRequestException(str + " with submission id " + str2 + " does not exist.");
            }
            if ("DATA_COMPARE_JOB".equalsIgnoreCase(definition.getJobType())) {
                return getDataCompareJobExecutionDetails(definition, jobExecution, str2);
            }
            Object obj2 = null;
            String str3 = null;
            if (ApplicationStatus.STOPPED.getStatus().equals(jobExecution.getStatus())) {
                obj = "NA";
                obj2 = "NA";
                str3 = "NA";
            } else {
                syncJobStatus(str, jobExecution);
                String connectorName = this.jobExecutionManagement.getConnectorName(str, str2, true);
                try {
                    obj = JobControllerUtils.getConnectorStatus(connectorName, true);
                } catch (EntityException e) {
                    log.error("source connector: " + connectorName + " not running");
                    obj = "source connector: " + connectorName + " not running";
                }
                if (JobControllerUtils.isHudiEnabled(this.jobExecutionManagement.getDefinition(str))) {
                    String appId = jobExecution.getAppId();
                    if (StringUtils.isEmpty(appId)) {
                        log.error("The application corresponding to job {} does not exist.", str);
                    } else {
                        str3 = YarnClientUtil.getStatus(appId).name();
                    }
                } else {
                    String connectorName2 = this.jobExecutionManagement.getConnectorName(str, str2, false);
                    try {
                        obj2 = JobControllerUtils.getConnectorStatus(connectorName2, true);
                    } catch (EntityException e2) {
                        log.error("sink connector: " + connectorName2 + " not running");
                        obj2 = "sink connector: " + connectorName2 + " not running";
                    }
                }
            }
            return new ResponseEntity<>(ExecutionResponse.getInstance(jobExecution, obj, obj2, str3), HttpStatus.OK);
        } catch (RestException e3) {
            log.error("Error during status retrieval. {}", e3.toString());
            return new ResponseEntity<>(new ErrorResponse(Integer.valueOf(e3.getCode()), e3.getMessage(), e3.getTrace()), HttpStatus.BAD_REQUEST);
        } catch (Exception e4) {
            log.error("Error during status retrieval.", e4);
            return new ResponseEntity<>(new ErrorResponse(e4), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (BadRequestException e5) {
            log.error("Error during status retrieval. {}", e5.toString());
            HashMap hashMap2 = new HashMap();
            hashMap2.put(RestConstants.ERROR_MESSAGE, e5.getMessage());
            return new ResponseEntity<>(hashMap2, HttpStatus.BAD_REQUEST);
        }
    }

    @DeleteMapping(value = {JOB_SUBMISSION_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> deleteDcJobExecutionPairs(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2, @RequestParam Map<String, String> map) {
        return this.cdcJobExecutor.deleteDcJobExecutionPairs(str, str2, map);
    }

    private ResponseEntity<Object> getDataCompareJobExecutionDetails(CdcJobDefinition cdcJobDefinition, CdcJobExecution cdcJobExecution, String str) {
        return this.cdcJobExecutor.getDataCompareJobExecutionDetails(cdcJobDefinition, cdcJobExecution, str);
    }

    @GetMapping({DC_COMPARE_PAIR_DETAILS_PATH})
    @ResponseBody
    public ResponseEntity<Object> getDataCompareJobComparePairDetails(@PathVariable("jobName") String str, @PathVariable("comparePairName") String str2, @PathVariable("submission_id") String str3) {
        return this.cdcJobExecutor.getDataCompareJobComparePairDetails(str, str2, str3);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r22v2, types: [com.huawei.cdc.service.exception.RestException, java.lang.Exception] */
    @PutMapping({DC_REPAIR_JOB_PATH})
    @ResponseBody
    public ResponseEntity<Object> repairDataCompareJob(@PathVariable("jobName") String str, @RequestBody Map<String, Object> map) {
        map.put(JOB_NAME, str);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        try {
            ExecutionEnvValidator.validateRepairConfigValues(map);
            ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, currentTimeMillis, request, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB);
            if (checkPermissionAndReturnError != null) {
                return checkPermissionAndReturnError;
            }
            CdcJobExecution cdcJobExecution = null;
            String str2 = null;
            JSONArray jSONArray = new JSONArray();
            try {
                String str3 = (String) map.get("hdfs.link");
                CdcConnection link = this.jobDefinitionManagement.getLink(str3);
                if (link == null) {
                    logAndReturnResponse("Invalid HDFS link = {}", str3, CommonConstants.EMPTY, ErrorConstants.INVALID_SOURCE_ERROR, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, currentTimeMillis, request);
                    throw new RuntimeException("Invalid HDFS link = " + str3);
                }
                CdcJobDefinition definition = this.jobDefinitionManagement.getDefinition(str);
                CdcConnection connection = getConnection(definition.getTargetConnectorId().intValue(), this.jobExecutionManagement, definition.getName(), CommonConstants.TYPE_TARGET);
                if (connection == null || !CommonConstants.TRUE.equals(connection.getEnable())) {
                    logAndReturnResponse("Valid target connection not present for job {}", str, CommonConstants.EMPTY, ErrorConstants.INVALID_TARGET_ERROR, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, currentTimeMillis, request);
                }
                Iterator<ExecutionResponse> it = getJobExecutionsByDefID(str, definition.getId()).iterator();
                while (it.hasNext()) {
                    str2 = it.next().getSubmissionId().toString();
                    cdcJobExecution = getJobExecution(str, str2, this.jobExecutionManagement, currentTimeMillis, request, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB);
                    JobExecutionUtils.getOutOfSyncRecords(((DataCompareExecutionResponse) getDataCompareJobExecutionDetails(definition, cdcJobExecution, str2).getBody()).getCompareResult(), jSONArray);
                }
                JobExecutionUtils.handleDataRepair(str, str2, map, jSONArray, definition, cdcJobExecution, link, connection);
                RepairExecutionResponse repairExecutionResponse = RepairExecutionResponse.getInstance(cdcJobExecution);
                repairExecutionResponse.setStatus(ApplicationStatus.STARTED.getStatus());
                return new ResponseEntity<>(repairExecutionResponse, HttpStatus.OK);
            } catch (RestException e) {
                log.error("Failed to repair job. {}", e.toString());
                doJobExecutionManagement(null, e);
                CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, TARGET, e.getMessage());
                return new ResponseEntity<>(prepareResponse(e), HttpStatus.BAD_REQUEST);
            } catch (Exception e2) {
                log.error("Failed to repair job. ", e2);
                doJobExecutionManagement(null, e2);
                CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, TARGET, e2.getMessage());
                return new ResponseEntity<>(new ErrorResponse(e2), HttpStatus.INTERNAL_SERVER_ERROR);
            } catch (BadRequestException e3) {
                log.error("Failed to repair job. {}", e3.toString());
                CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, TARGET, e3.getMessage());
                HashMap hashMap = new HashMap();
                hashMap.put(RestConstants.ERROR_MESSAGE, e3.getMessage());
                return new ResponseEntity<>(hashMap, HttpStatus.BAD_REQUEST);
            }
        } catch (RestException e4) {
            log.error("Validation failed {}", e4.toString());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, "CdcExecutionEnvResource", e4.getMessage());
            return new ResponseEntity<>(new ErrorResponse(Integer.valueOf(e4.getCode()), e4.getMessage(), e4.getTrace()), HttpStatus.BAD_REQUEST);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r21v2, types: [com.huawei.cdc.service.exception.RestException, java.lang.Exception] */
    @PutMapping({DC_REPAIR_PATH})
    @ResponseBody
    public ResponseEntity<Object> repairDataCompareJobBySubmission(@PathVariable("jobName") String str, @PathVariable("submission_id") String str2, @RequestBody Map<String, Object> map) {
        map.put(JOB_NAME, str);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        try {
            ExecutionEnvValidator.validateRepairConfigValues(map);
            EndpointInputValidator.validateNumeric(SUBMISSION_ID, str2);
            ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, currentTimeMillis, request, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB);
            if (checkPermissionAndReturnError != null) {
                return checkPermissionAndReturnError;
            }
            try {
                String str3 = (String) map.get("hdfs.link");
                CdcConnection link = this.jobDefinitionManagement.getLink(str3);
                if (link == null) {
                    logAndReturnResponse("Invalid HDFS link = {}", str3, CommonConstants.EMPTY, ErrorConstants.INVALID_SOURCE_ERROR, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, currentTimeMillis, request);
                    throw new RuntimeException("Invalid HDFS link = " + str3);
                }
                CdcJobDefinition definition = this.jobDefinitionManagement.getDefinition(str);
                CdcConnection connection = getConnection(definition.getTargetConnectorId().intValue(), this.jobExecutionManagement, definition.getName(), CommonConstants.TYPE_TARGET);
                if (connection == null || !CommonConstants.TRUE.equals(connection.getEnable())) {
                    logAndReturnResponse("Valid target connection not present for job {}", str, CommonConstants.EMPTY, ErrorConstants.INVALID_TARGET_ERROR, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, currentTimeMillis, request);
                }
                CdcJobExecution jobExecution = getJobExecution(str, str2, this.jobExecutionManagement, currentTimeMillis, request, CDCAuditLogger.AuditConstants.REPAIR_DC_JOB);
                DataCompareExecutionResponse dataCompareExecutionResponse = (DataCompareExecutionResponse) getDataCompareJobExecutionDetails(definition, jobExecution, str2).getBody();
                JSONArray jSONArray = new JSONArray();
                JobExecutionUtils.getOutOfSyncRecords(dataCompareExecutionResponse.getCompareResult(), jSONArray);
                JobExecutionUtils.handleDataRepair(str, str2, map, jSONArray, definition, jobExecution, link, connection);
                RepairExecutionResponse repairExecutionResponse = RepairExecutionResponse.getInstance(jobExecution);
                repairExecutionResponse.setStatus(ApplicationStatus.STARTED.getStatus());
                return new ResponseEntity<>(repairExecutionResponse, HttpStatus.OK);
            } catch (RestException e) {
                log.error("Failed to repair job. {}", e.toString());
                doJobExecutionManagement(null, e);
                CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, TARGET, e.getMessage());
                return new ResponseEntity<>(prepareResponse(e), HttpStatus.BAD_REQUEST);
            } catch (Exception e2) {
                log.error("Failed to repair job. ", e2);
                doJobExecutionManagement(null, e2);
                CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, TARGET, e2.getMessage());
                return new ResponseEntity<>(new ErrorResponse(e2), HttpStatus.INTERNAL_SERVER_ERROR);
            } catch (BadRequestException e3) {
                log.error("Failed to repair job. {}", e3.toString());
                CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, TARGET, e3.getMessage());
                HashMap hashMap = new HashMap();
                hashMap.put(RestConstants.ERROR_MESSAGE, e3.getMessage());
                return new ResponseEntity<>(hashMap, HttpStatus.BAD_REQUEST);
            }
        } catch (RestException e4) {
            log.error("Validation failed {}", e4.toString());
            CDCAuditLogger.logFailure(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), CDCAuditLogger.AuditConstants.REPAIR_DC_JOB, "CdcExecutionEnvResource", e4.getMessage());
            return new ResponseEntity<>(new ErrorResponse(Integer.valueOf(e4.getCode()), e4.getMessage(), e4.getTrace()), HttpStatus.BAD_REQUEST);
        }
    }

    @GetMapping(value = {JOB_SUBMISSIONS_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> getJobExecutions(@RequestParam Map<String, String> map) {
        String str = map.get(JOB_NAME);
        HashMap hashMap = new HashMap();
        hashMap.put(JOB_NAME, str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, System.currentTimeMillis(), RequestContextHolder.currentRequestAttributes().getRequest(), null);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        try {
            JobExecutionManagement jobExecutionManagement = new JobExecutionManagement();
            Throwable th = null;
            try {
                try {
                    List allExecutions = jobExecutionManagement.getAllExecutions(str);
                    ArrayList arrayList = new ArrayList();
                    for (CdcJobExecution cdcJobExecution : CommonUtil.safe(allExecutions)) {
                        syncJobStatus(str, cdcJobExecution);
                        ExecutionResponse executionResponse = ExecutionResponse.getInstance(cdcJobExecution);
                        HashMap hashMap2 = new HashMap();
                        boolean isHudiEnabled = JobControllerUtils.isHudiEnabled(jobExecutionManagement.getDefinition(str));
                        if (!executionResponse.getStatus().equalsIgnoreCase(ApplicationStatus.STOPPED.getStatus())) {
                            if (isHudiEnabled) {
                                updateJobExecutionStatusWithAppAsTarget(hashMap2, this.jobExecutionMetricsManagement, str, cdcJobExecution.getId());
                                updateConnectorAndAppStatus(hashMap2, executionResponse);
                            } else {
                                this.jobMetricService.refreshJob();
                                hashMap2.putAll(this.jobExecutionService.getJobExecutionStatus(cdcJobExecution));
                                updateConnectorStatus(hashMap2, executionResponse);
                            }
                        }
                        if (isHudiEnabled) {
                            updateConnectorAndAppNames(jobExecutionManagement, executionResponse, str, cdcJobExecution.getId());
                        } else {
                            updateConnectorNames(jobExecutionManagement, executionResponse, str);
                        }
                        arrayList.add(executionResponse);
                    }
                    ResponseEntity<Object> responseEntity = new ResponseEntity<>(new ViewJobExecutionsResponse(arrayList.size(), arrayList), HttpStatus.OK);
                    if (jobExecutionManagement != null) {
                        if (0 != 0) {
                            try {
                                jobExecutionManagement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jobExecutionManagement.close();
                        }
                    }
                    return responseEntity;
                } finally {
                }
            } catch (Throwable th3) {
                if (jobExecutionManagement != null) {
                    if (th != null) {
                        try {
                            jobExecutionManagement.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        jobExecutionManagement.close();
                    }
                }
                throw th3;
            }
        } catch (BadRequestException e) {
            log.error("Error during job status retrieval. {}", e.getMessage());
            return new ResponseEntity<>(new ErrorResponse((Exception) e), HttpStatus.BAD_REQUEST);
        } catch (RestException e2) {
            log.error("Error during status retrieval. {}", e2.toString());
            return new ResponseEntity<>(new ErrorResponse(Integer.valueOf(e2.getCode()), e2.getMessage(), e2.getTrace()), HttpStatus.BAD_REQUEST);
        } catch (Exception e3) {
            log.error("Error during job status retrieval. ", e3);
            return new ResponseEntity<>(new ErrorResponse(e3), HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }

    @PostMapping({"/job/status"})
    @ResponseBody
    public ResponseEntity<Object> getJobStatusById(@RequestBody Map<String, List<Integer>> map) {
        try {
            return new ResponseEntity<>(this.jobExecutionService.getJobStatus(map.get(LAST_SUBMISSION_ID)), HttpStatus.OK);
        } catch (Exception e) {
            log.error("Error during get job status by jobIds. ", e);
            return new ResponseEntity<>(new ErrorResponse(e), HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private void updateJobExecutionStatusWithAppAsTarget(Map<String, Object> map, JobExecutionMetricsManagement jobExecutionMetricsManagement, String str, Integer num) {
        String sourceConnectorName = jobExecutionMetricsManagement.getSourceConnectorName(str, String.valueOf(num));
        String appId = this.jobExecutionManagement.getJobExecution(num.intValue()).getAppId();
        if (sourceConnectorName == null && appId == null) {
            map.put(BodyConstants.SOURCE_CONNECTOR_NAME, "NA");
            map.put(BodyConstants.APP_ID, "NA");
            return;
        }
        String status = JobControllerUtils.isKafkaTypeLink(this.jobExecutionManagement.getConnection(this.jobExecutionManagement.getJobExecution(num.intValue()).getSourceConnectorId().intValue())) ? ApplicationStatus.RUNNING.getStatus() : sourceConnectorName != null ? JobControllerUtils.getConnectorTaskStatus(sourceConnectorName, true) : "NA";
        String name = YarnClientUtil.getStatus(appId).name();
        map.put(BodyConstants.SOURCE_CONNECTOR_STATUS, status);
        map.put(BodyConstants.APP_STATUS, name);
        if (ApplicationStatus.RUNNING.getStatus().equalsIgnoreCase(status) && ApplicationStatus.RUNNING.getStatus().equalsIgnoreCase(name)) {
            map.put(MetricsConstants.EXECUTION_STATUS, ApplicationStatus.RUNNING.getStatus());
        } else {
            map.put(MetricsConstants.EXECUTION_STATUS, ApplicationStatus.FAILED.getStatus());
        }
    }

    private void updateConnectorAndAppNames(JobExecutionManagement jobExecutionManagement, ExecutionResponse executionResponse, String str, Integer num) {
        if (ApplicationStatus.STOPPED.getStatus().equals(executionResponse.getStatus())) {
            executionResponse.setSourceConnectorName("NA");
            executionResponse.setAppId("NA");
        } else {
            executionResponse.setSourceConnectorName(jobExecutionManagement.getConnectorName(str, String.valueOf(executionResponse.getSubmissionId()), true));
            executionResponse.setAppId(jobExecutionManagement.getJobExecution(num.intValue()).getAppId());
        }
    }

    private void updateConnectorAndAppStatus(Map<String, Object> map, ExecutionResponse executionResponse) {
        if (String.valueOf(map.get(BodyConstants.SOURCE_CONNECTOR_NAME)).equalsIgnoreCase("NA") || String.valueOf(map.get(BodyConstants.APP_STATUS)).equalsIgnoreCase("NA")) {
            return;
        }
        executionResponse.setStatus(String.valueOf(map.get(MetricsConstants.EXECUTION_STATUS)));
        executionResponse.setSourceStatus(String.valueOf(map.get(BodyConstants.SOURCE_CONNECTOR_STATUS)));
        executionResponse.setAppStatus(String.valueOf(map.get(BodyConstants.APP_STATUS)));
    }

    private void updateConnectorStatus(Map<String, Object> map, ExecutionResponse executionResponse) {
        if (String.valueOf(map.get(BodyConstants.SOURCE_CONNECTOR_NAME)).equalsIgnoreCase("NA") || String.valueOf(map.get(BodyConstants.SINK_CONNECTOR_NAME)).equalsIgnoreCase("NA")) {
            return;
        }
        executionResponse.setStatus(String.valueOf(map.get(MetricsConstants.EXECUTION_STATUS)));
        executionResponse.setSourceStatus(String.valueOf(map.get(BodyConstants.SOURCE_CONNECTOR_STATUS)));
        executionResponse.setSinkStatus(String.valueOf(map.get(BodyConstants.SINK_CONNECTOR_STATUS)));
    }

    private void updateConnectorNames(JobExecutionManagement jobExecutionManagement, ExecutionResponse executionResponse, String str) {
        if (ApplicationStatus.STOPPED.getStatus().equals(executionResponse.getStatus())) {
            executionResponse.setSourceConnectorName("NA");
            executionResponse.setSinkConnectorName("NA");
        } else {
            executionResponse.setSourceConnectorName(jobExecutionManagement.getConnectorName(str, String.valueOf(executionResponse.getSubmissionId()), true));
            executionResponse.setSinkConnectorName(jobExecutionManagement.getConnectorName(str, String.valueOf(executionResponse.getSubmissionId()), false));
        }
    }

    public void startDataComparisonJob(Map<String, Map<String, String>> map, List<DcJobCompareDefinition> list, CdcJobExecution cdcJobExecution, boolean z, int i) throws SQLException, ClassNotFoundException {
        new DataComparisonJob().startCdlDataCompare(map, list, cdcJobExecution, z, i);
    }

    @PutMapping(value = {JOB_START_PATH}, produces = {"application/json"})
    @ResponseBody
    public ResponseEntity<Object> startJobExecution(@PathVariable("jobName") String str, @RequestParam Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put(JOB_NAME, str);
        EndpointInputValidator.validateNonEmptyNoSpecialCharacters(hashMap);
        long currentTimeMillis = System.currentTimeMillis();
        HttpServletRequest request = RequestContextHolder.currentRequestAttributes().getRequest();
        ResponseEntity<Object> checkPermissionAndReturnError = this.cdcJobExecutor.checkPermissionAndReturnError(str, System.currentTimeMillis(), request, CDCAuditLogger.AuditConstants.START_CDL_JOB);
        if (checkPermissionAndReturnError != null) {
            return checkPermissionAndReturnError;
        }
        ApplicationHandler applicationHandler = null;
        try {
            CdcDistributeLockEntity createDistributeLock = this.distributeLockManagement.createDistributeLock(str, RequestUtil.getUserName(request));
            try {
                try {
                    try {
                        JobExecutionManagement jobExecutionManagement = new JobExecutionManagement();
                        Throwable th = null;
                        try {
                            CdcJobDefinition definition = jobExecutionManagement.getDefinition(str);
                            if (definition == null) {
                                logAndReturnResponse("Job definition is not present for {}", str, ErrorConstants.JOB_NOT_FOUND_ERROR, CDCAuditLogger.AuditConstants.START_CDL_JOB, currentTimeMillis, request);
                            }
                            validateJobExecutionBeforeStart(jobExecutionManagement, str);
                            CdcConnection connection = getConnection(definition.getSourceConnectorId().intValue(), jobExecutionManagement, definition.getName(), CommonConstants.TYPE_SOURCE);
                            CdcConnection connection2 = getConnection(definition.getTargetConnectorId().intValue(), jobExecutionManagement, definition.getName(), CommonConstants.TYPE_TARGET);
                            validateSourceAndTargetConnection(connection, connection2, str, currentTimeMillis, request);
                            CdcJobExecution createJobExecution = jobExecutionManagement.createJobExecution(createJobExecution(new CdcJobExecution(), definition));
                            createJobExecution.setSourceConnectorId(connection.getId());
                            createJobExecution.setStatus(ApplicationStatus.BOOTING.getStatus());
                            CdcJobExecution updateJobExecution = jobExecutionManagement.updateJobExecution(createJobExecution);
                            if (definition.getJobType().equals("DATA_COMPARE_JOB")) {
                                updateJobExecution.setTargetConnectorId(connection2.getId());
                                ResponseEntity<Object> processDataComparejob = processDataComparejob(definition, updateJobExecution, map, connection, connection2);
                                if (jobExecutionManagement != null) {
                                    if (0 != 0) {
                                        try {
                                            jobExecutionManagement.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        jobExecutionManagement.close();
                                    }
                                }
                                if (createDistributeLock != null) {
                                    this.distributeLockManagement.deleteDistributeLockByResource(str);
                                }
                                return processDataComparejob;
                            }
                            if (!JobControllerUtils.isHudiEnabled(definition)) {
                                updateJobExecution.setTargetConnectorId(connection2.getId());
                            }
                            startConnector(connection, definition, updateJobExecution, CommonConstants.TYPE_SOURCE);
                            if (JobControllerUtils.isHudiEnabled(definition)) {
                                applicationHandler = JobExecutionUtils.launchApp(connection2, definition, updateJobExecution, null);
                            } else {
                                startConnector(connection2, definition, updateJobExecution, CommonConstants.TYPE_TARGET);
                            }
                            CdcJobExecution updateJobExecution2 = jobExecutionManagement.updateJobExecution(updateJobExecution);
                            if (ApplicationStatus.STARTED.getStatus().equals(updateJobExecution2.getStatus())) {
                                updateJobExecution2.setStatus(ApplicationStatus.RUNNING.getStatus());
                                updateJobExecution2.setExecutionStartTime(LocalDateTime.now());
                            }
                            CdcJobExecution updateJobExecution3 = jobExecutionManagement.updateJobExecution(updateJobExecution2);
                            String valueOf = String.valueOf(updateJobExecution3.getId());
                            String connectorName = jobExecutionManagement.getConnectorName(str, valueOf, true);
                            Object connectorStatusesAfterStart = getConnectorStatusesAfterStart(connection, connectorName);
                            Object obj = null;
                            String str2 = null;
                            if (!JobControllerUtils.isHudiEnabled(definition)) {
                                String connectorName2 = jobExecutionManagement.getConnectorName(str, valueOf, false);
                                obj = getConnectorStatusesAfterStart(connection2, connectorName2);
                                updateJobStatus(updateJobExecution3, connectorStatusesAfterStart, obj, JobControllerUtils.isKafkaTypeLink(connection), JobControllerUtils.isKafkaTypeLink(connection2), connectorName, connectorName2);
                            } else {
                                if (applicationHandler == null) {
                                    String str3 = "App Handler is null, job name is " + str;
                                    log.error(str3);
                                    throw new RuntimeException(str3);
                                }
                                str2 = applicationHandler.getStatus().name();
                                updateJobStatusWithAppAsTarget(updateJobExecution3, connectorStatusesAfterStart, JobControllerUtils.isKafkaTypeLink(connection), connectorName);
                            }
                            CdcJobExecution updateJobExecution4 = jobExecutionManagement.updateJobExecution(updateJobExecution3);
                            CDCAuditLogger.logSuccess(currentTimeMillis, System.currentTimeMillis(), RequestUtil.getClientIp(request), RequestUtil.getUserName(request), RequestUtil.getInstanceIp(request), "Start CDL Job " + str, TARGET);
                            ResponseEntity<Object> responseEntity = new ResponseEntity<>(ExecutionResponse.getInstance(updateJobExecution4, connectorStatusesAfterStart, obj, str2), HttpStatus.CREATED);
                            if (jobExecutionManagement != null) {
                                if (0 != 0) {
                                    try {
                                        jobExecutionManagement.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    jobExecutionManagement.close();
                                }
                            }
                            if (createDistributeLock != null) {
                                this.distributeLockManagement.deleteDistributeLockByResource(str);
                            }
                            return responseEntity;
                        } catch (Throwable th4) {
                            if (jobExecutionManagement != null) {
                                if (0 != 0) {
                                    try {
                                        jobExecutionManagement.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    jobExecutionManagement.close();
                                }
                            }
                            throw th4;
                        }
                    } catch (Throwable th6) {
                        if (createDistributeLock != null) {
                            this.distributeLockManagement.deleteDistributeLockByResource(str);
                        }
                        throw th6;
                    }
                } catch (RestException e) {
                    ResponseEntity<Object> handleRestException = handleRestException(e, null, currentTimeMillis, request);
                    if (createDistributeLock != null) {
                        this.distributeLockManagement.deleteDistributeLockByResource(str);
                    }
                    return handleRestException;
                }
            } catch (BadRequestException e2) {
                ResponseEntity<Object> handleBadRequestException = handleBadRequestException(e2, currentTimeMillis, request);
                if (createDistributeLock != null) {
                    this.distributeLockManagement.deleteDistributeLockByResource(str);
                }
                return handleBadRequestException;
            } catch (Exception e3) {
                ResponseEntity<Object> handleException = handleException(e3, null, currentTimeMillis, request);
                if (createDistributeLock != null) {
                    this.distributeLockManagement.deleteDistributeLockByResource(str);
                }
                return handleException;
            }
        } catch (RollbackException e4) {
            log.error("Failed to create distribute lock when start job {}, {}", str, e4.toString());
            return handleBadRequestException(new BadRequestException(ErrorConstants.CONCURRENT_START_ERROR), currentTimeMillis, request);
        }
    }

    private void validateSourceAndTargetConnection(CdcConnection cdcConnection, CdcConnection cdcConnection2, String str, long j, HttpServletRequest httpServletRequest) {
        if (cdcConnection == null || !CommonConstants.TRUE.equals(cdcConnection.getEnable())) {
            logAndReturnResponse("Valid source connection not present for job {}", str, CommonConstants.EMPTY, ErrorConstants.INVALID_SOURCE_ERROR, CDCAuditLogger.AuditConstants.START_CDL_JOB, j, httpServletRequest);
        }
        if (cdcConnection2 == null || !CommonConstants.TRUE.equals(cdcConnection2.getEnable())) {
            logAndReturnResponse("Valid target connection not present for job {}", str, CommonConstants.EMPTY, ErrorConstants.INVALID_TARGET_ERROR, CDCAuditLogger.AuditConstants.START_CDL_JOB, j, httpServletRequest);
        }
    }

    private ResponseEntity<Object> processDataComparejob(CdcJobDefinition cdcJobDefinition, CdcJobExecution cdcJobExecution, Map<String, String> map, CdcConnection cdcConnection, CdcConnection cdcConnection2) throws SQLException, ClassNotFoundException {
        Map<String, String> map2 = null;
        String type = cdcConnection2.getType();
        if (type.equals(DataSourcesConstants.KAFKA) || type.equals(DataSourcesConstants.HUDI)) {
            map2 = EntityConvertor.getPropertiesFromJSON(JobExecutionUtils.getHiveExecEnv(cdcJobDefinition).getProperties());
        }
        List<DcJobCompareDefinition> dcJobCompareDefinitions = new DcJobCompareDefinitionManagement().getDcJobCompareDefinitions(cdcJobDefinition.getId(), CommonConstants.EMPTY, CommonConstants.EMPTY);
        Map<String, String> propertiesFromJSON = EntityConvertor.getPropertiesFromJSON(cdcConnection.getProperties(), false);
        Map<String, String> propertiesFromJSON2 = EntityConvertor.getPropertiesFromJSON(cdcJobDefinition.getDecryptedSourceProperties(), true);
        Map<String, String> propertiesFromJSON3 = EntityConvertor.getPropertiesFromJSON(cdcJobDefinition.getDecryptedTargetProperties(), true);
        propertiesFromJSON.put(ValidationConstants.SCHEMA, propertiesFromJSON2.get(ValidationConstants.SCHEMA));
        propertiesFromJSON.put(BodyConstants.TYPE, cdcConnection.getType());
        Map<String, String> propertiesFromJSON4 = EntityConvertor.getPropertiesFromJSON(cdcConnection2.getProperties(), false);
        propertiesFromJSON4.put(ValidationConstants.SCHEMA, propertiesFromJSON3.get(ValidationConstants.SCHEMA));
        propertiesFromJSON4.put(BodyConstants.TYPE, cdcConnection2.getType());
        int i = 0;
        if (map.containsKey("jobCorrelatedID")) {
            i = Integer.parseInt(map.get("jobCorrelatedID"));
        }
        cdcJobExecution.setExecutionStartTime(LocalDateTime.now());
        Map<String, Map<String, String>> hashMap = new HashMap<>();
        hashMap.put("envConfig", map2);
        hashMap.put("sourceConfig", propertiesFromJSON);
        hashMap.put("targetConfig", propertiesFromJSON4);
        startDataComparisonJob(hashMap, dcJobCompareDefinitions, cdcJobExecution, cdcJobDefinition.isIncremental(), i);
        return new ResponseEntity<>(ExecutionResponse.getDataCompareJobExecutionInstance(this.jobExecutionManagement.updateJobExecution(cdcJobExecution)), HttpStatus.CREATED);
    }

    private ResponseEntity<Object> handleException(Exception exc, CdcJobExecution cdcJobExecution, long j, HttpServletRequest httpServletRequest) {
        log.error("Failed to start job. ", exc);
        doJobExecutionManagement(cdcJobExecution, exc);
        recordExecutionError(cdcJobExecution, exc);
        CDCAuditLogger.logFailure(j, System.currentTimeMillis(), RequestUtil.getClientIp(httpServletRequest), RequestUtil.getUserName(httpServletRequest), RequestUtil.getInstanceIp(httpServletRequest), CDCAuditLogger.AuditConstants.START_CDL_JOB, TARGET, exc.getMessage());
        return new ResponseEntity<>(new ErrorResponse(exc), HttpStatus.INTERNAL_SERVER_ERROR);
    }

    private ResponseEntity<Object> handleBadRequestException(BadRequestException badRequestException, long j, HttpServletRequest httpServletRequest) {
        log.error("Failed to start job. {}, ", badRequestException.toString());
        HashMap hashMap = new HashMap();
        hashMap.put(RestConstants.ERROR_MESSAGE, badRequestException.getMessage());
        CDCAuditLogger.logFailure(j, System.currentTimeMillis(), RequestUtil.getClientIp(httpServletRequest), RequestUtil.getUserName(httpServletRequest), RequestUtil.getInstanceIp(httpServletRequest), CDCAuditLogger.AuditConstants.START_CDL_JOB, TARGET, badRequestException.getMessage());
        return new ResponseEntity<>(hashMap, HttpStatus.BAD_REQUEST);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private ResponseEntity<Object> handleRestException(RestException restException, CdcJobExecution cdcJobExecution, long j, HttpServletRequest httpServletRequest) {
        log.error("Failed to start job. {}", restException.toString());
        doJobExecutionManagement(cdcJobExecution, restException);
        recordExecutionError(cdcJobExecution, restException);
        CDCAuditLogger.logFailure(j, System.currentTimeMillis(), RequestUtil.getClientIp(httpServletRequest), RequestUtil.getUserName(httpServletRequest), RequestUtil.getInstanceIp(httpServletRequest), CDCAuditLogger.AuditConstants.START_CDL_JOB, TARGET, restException.getMessage());
        return new ResponseEntity<>(prepareResponse(restException), HttpStatus.BAD_REQUEST);
    }

    private void validateJobExecutionBeforeStart(JobExecutionManagement jobExecutionManagement, String str) throws Exception {
        CdcJobExecution latestRunningJobExecution = jobExecutionManagement.getLatestRunningJobExecution(str);
        if (latestRunningJobExecution != null) {
            String status = latestRunningJobExecution.getStatus();
            if (latestRunningJobExecution.getJobType().equalsIgnoreCase("DATA_COMPARE_JOB")) {
                new DataComparisonJob().syncJobExecutionAndComparePairStatus(latestRunningJobExecution, jobExecutionManagement);
            } else {
                syncJobStatus(str, latestRunningJobExecution);
            }
            if (!status.equals(latestRunningJobExecution.getStatus())) {
                jobExecutionManagement.updateJobExecution(latestRunningJobExecution);
            }
            if (ApplicationStatus.STARTED.getStatus().equals(latestRunningJobExecution.getStatus())) {
                throw new BadRequestException(str + " cannot be started as the job is already started with submission id " + latestRunningJobExecution.getId() + CommonConstants.DOT);
            }
            if (ApplicationStatus.RUNNING.getStatus().equals(latestRunningJobExecution.getStatus())) {
                throw new BadRequestException(str + " cannot be started as the job is running with submission id " + latestRunningJobExecution.getId() + CommonConstants.DOT);
            }
            if (ApplicationStatus.PARTIAL.getStatus().equals(latestRunningJobExecution.getStatus())) {
                throw new BadRequestException(str + " cannot be started as the job is partially running with submission id " + latestRunningJobExecution.getId() + CommonConstants.DOT);
            }
            if (ApplicationStatus.PAUSED.getStatus().equals(latestRunningJobExecution.getStatus())) {
                throw new BadRequestException(str + " cannot be started as the job is paused with submission id " + latestRunningJobExecution.getId() + ". Please stop it first.");
            }
            if (ApplicationStatus.FAILED.getStatus().equals(latestRunningJobExecution.getStatus())) {
                throw new BadRequestException(str + " cannot be started as the job is failed with submission id " + latestRunningJobExecution.getId() + ". Please stop it first.");
            }
        }
    }

    private void recordExecutionError(CdcJobExecution cdcJobExecution, Exception exc) {
        if (cdcJobExecution != null) {
            JobExecutionManagement jobExecutionManagement = new JobExecutionManagement();
            Throwable th = null;
            try {
                try {
                    cdcJobExecution.setErrorCode(String.valueOf(ErrorConstants.KAFKA_CONNECT_INVALID_RESPONSE));
                    cdcJobExecution.setErrorMessage(exc.getMessage());
                    cdcJobExecution.setStatus(ApplicationStatus.FAILED.getStatus());
                    jobExecutionManagement.updateJobExecution(cdcJobExecution);
                    if (jobExecutionManagement != null) {
                        if (0 == 0) {
                            jobExecutionManagement.close();
                            return;
                        }
                        try {
                            jobExecutionManagement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (jobExecutionManagement != null) {
                    if (th != null) {
                        try {
                            jobExecutionManagement.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        jobExecutionManagement.close();
                    }
                }
                throw th4;
            }
        }
    }

    private Object getConnectorStatusesAfterPause(CdcConnection cdcConnection, String str) {
        Object obj;
        int i = 0;
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            obj = "Link " + cdcConnection.getName() + " is already paused.";
        } else {
            while (true) {
                try {
                    obj = JobControllerUtils.getConnectorStatus(str, true);
                    if (ApplicationStatus.PAUSED.getStatus().equals(JobControllerUtils.getState(obj))) {
                        break;
                    }
                    i = JobControllerUtils.incrementRetryCounterAndWait(i, 5, (Map) obj);
                } catch (EntityException e) {
                    log.error("Connector {} not yet paused", str);
                    obj = "Connector " + str + " is pausing.";
                }
            }
        }
        return obj;
    }

    private Object getConnectorStatusesAfterResume(CdcConnection cdcConnection, String str) {
        Object obj;
        int i = 0;
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            obj = "Link " + cdcConnection.getName() + " is already running.";
        } else {
            while (true) {
                try {
                    obj = JobControllerUtils.getConnectorStatus(str, true);
                    if (!ApplicationStatus.PAUSED.getStatus().equals(JobControllerUtils.getState(obj))) {
                        break;
                    }
                    i = JobControllerUtils.incrementRetryCounterAndWait(i, 5, (Map) obj);
                } catch (EntityException e) {
                    log.error("Connector {} not yet ready ", str);
                    obj = "Connector " + str + " is starting.";
                }
            }
        }
        return obj;
    }

    private Object getConnectorStatusesAfterStart(CdcConnection cdcConnection, String str) {
        Object obj;
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            obj = "Link " + cdcConnection.getName() + " is already running.";
        } else {
            try {
                obj = JobControllerUtils.getConnectorStatus(str, false);
            } catch (EntityException e) {
                log.error("Connector {} not ready yet", str);
                obj = "Connector " + str + " is starting.";
            }
        }
        return obj;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, com.huawei.cdc.service.exception.RestException, com.huawei.cdc.service.exception.ParameterException] */
    private void logAndReturnResponse(String str, String str2, int i, String str3, long j, HttpServletRequest httpServletRequest) throws ParameterException {
        log.error(str, str2);
        ?? parameterException = new ParameterException(i, str2);
        CDCAuditLogger.logFailure(j, System.currentTimeMillis(), RequestUtil.getClientIp(httpServletRequest), RequestUtil.getUserName(httpServletRequest), RequestUtil.getInstanceIp(httpServletRequest), str3, TARGET, parameterException.getMessage());
        throw parameterException;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, com.huawei.cdc.service.exception.RestException, com.huawei.cdc.service.exception.EntityException] */
    private void logAndReturnResponse(String str, String str2, String str3, int i, String str4, long j, HttpServletRequest httpServletRequest) throws ParameterException {
        log.error(str, str2);
        ?? entityException = new EntityException(i, str3);
        CDCAuditLogger.logFailure(j, System.currentTimeMillis(), RequestUtil.getClientIp(httpServletRequest), RequestUtil.getUserName(httpServletRequest), RequestUtil.getInstanceIp(httpServletRequest), str4, TARGET, entityException.getMessage());
        throw entityException;
    }

    private void restartConnector(String str) {
        Response doRequest = RestClient.doRequest(RestConstants.RESTART_CONNECTOR_SUFFIX.replace(RestConstants.CONNECTOR_NAME, str), RestConstants.HTTP_POST);
        String str2 = (String) doRequest.readEntity(String.class);
        if (doRequest.getStatus() == HttpStatus.OK.value() || doRequest.getStatus() == HttpStatus.NO_CONTENT.value()) {
            return;
        }
        handleError(str2, str);
    }

    private void handleError(String str, String str2) {
        String sanitize = JsonSanitizer.sanitize(str);
        try {
            Map map = (Map) CommonConstants.JSON_MAPPER.readValue(sanitize, new TypeReference<Map<String, Object>>() { // from class: com.huawei.cdc.service.job.controller.CDCJobExecutionResource.2
            });
            throw new EntityException(ErrorConstants.KAFKA_CONNECT_ERROR, String.valueOf(map.get(RestConstants.KAFKA_ERROR_CODE)), String.valueOf(map.get("message")));
        } catch (IOException e) {
            log.error("Could not read JSON response {}", sanitize, e);
            throw new EntityException(ErrorConstants.KAFKA_CONNECT_INVALID_RESPONSE, "getting status of connector " + str2);
        }
    }

    public void stopConnector(CdcConnection cdcConnection, String str, String str2) {
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            return;
        }
        Response doRequest = RestClient.doRequest(RestConstants.STOP_CONNECTOR_SUFFIX.replace(RestConstants.CONNECTOR_NAME, str), RestConstants.HTTP_DELETE);
        String str3 = (String) doRequest.readEntity(String.class);
        if (doRequest.getStatus() != HttpStatus.NO_CONTENT.value()) {
            if (doRequest.getStatus() == HttpStatus.NOT_FOUND.value()) {
                log.warn("Exception closing " + str2 + " connector {}", str + " not found.");
            } else {
                log.error("Exception closing " + str2 + " connector {}", str);
                handleError(str3, str);
            }
        }
    }

    protected void syncJobStatus(String str, CdcJobExecution cdcJobExecution) throws Exception {
        String valueOf = String.valueOf(cdcJobExecution.getId());
        CdcConnection connection = this.jobExecutionManagement.getConnection(cdcJobExecution.getSourceConnectorId().intValue());
        String str2 = null;
        boolean z = false;
        if (connection != null) {
            str2 = this.jobExecutionManagement.getConnectorName(str, valueOf, true);
            z = JobControllerUtils.isKafkaTypeLink(connection);
        }
        if (JobControllerUtils.isHudiEnabled(this.jobExecutionManagement.getDefinition(str))) {
            syncAppJobStatus(str, cdcJobExecution, z, str2);
            return;
        }
        Integer targetConnectorId = cdcJobExecution.getTargetConnectorId();
        CdcConnection connection2 = targetConnectorId != null ? this.jobExecutionManagement.getConnection(targetConnectorId.intValue()) : null;
        if (connection == null || connection2 == null) {
            return;
        }
        syncJobStatus(cdcJobExecution, z, JobControllerUtils.isKafkaTypeLink(connection2), str2, this.jobExecutionManagement.getConnectorName(str, valueOf, false));
    }

    private void syncAppJobStatus(String str, CdcJobExecution cdcJobExecution, boolean z, String str2) throws Exception {
        String status;
        String linkStatus = getLinkStatus(z, str2);
        String appId = cdcJobExecution.getAppId();
        String name = YarnClientUtil.getStatus(appId).name();
        if (linkStatus == null || name == null) {
            try {
                log.warn("currentSourceStatus is null, appStatus is {}. Try to stop connector {}", name, str2);
                stopConnectorIfPresent(cdcJobExecution, linkStatus, str2, this.jobExecutionManagement, BodyConstants.LOG_SOURCE);
                stopAppIfPresent(appId);
                status = ApplicationStatus.STOPPED.getStatus();
            } catch (Exception e) {
                log.error("Exception closing/stopping connector/App while syncing job status.");
                throw e;
            }
        } else {
            status = JobControllerUtils.getJobStatus(linkStatus, name);
        }
        cdcJobExecution.setErrorCode(CommonConstants.EMPTY);
        cdcJobExecution.setErrorMessage(CommonConstants.EMPTY);
        updateJobExecution(status, cdcJobExecution, this.jobExecutionManagement);
    }

    private void stopAppIfPresent(String str) {
        if (ApplicationHandler.State.CONNECTED.equals(YarnClientUtil.getStatus(str))) {
            log.info("Try to kill app [{}]", str);
            YarnClientUtil.stopJob(str);
            waitForAppStatus(str, SparkAppHandle.State.KILLED.name(), "STOP");
            if (SparkAppHandle.State.KILLED.name().equals(YarnClientUtil.getStatus(str).name())) {
                return;
            }
            JobExecutionUtils.handleAppStartStopFailure(str, "STOP");
        }
    }

    protected void syncJobStatus(CdcJobExecution cdcJobExecution, boolean z, boolean z2, String str, String str2) throws Exception {
        String status;
        String linkStatus = getLinkStatus(z, str);
        String linkStatus2 = getLinkStatus(z2, str2);
        if (linkStatus == null || linkStatus2 == null) {
            try {
                log.warn("currentSourceStatus is {}, currentTargetStatus is {}. Try to stop SourceConnector {} and SinkConnector {}", new Object[]{linkStatus, linkStatus2, str, str2});
                stopConnectorIfPresent(cdcJobExecution, linkStatus, str, this.jobExecutionManagement, BodyConstants.LOG_SOURCE);
                stopConnectorIfPresent(cdcJobExecution, linkStatus2, str2, this.jobExecutionManagement, "target");
                status = ApplicationStatus.STOPPED.getStatus();
            } catch (Exception e) {
                log.error("Exception closing connectors while syncing job status.");
                throw e;
            }
        } else {
            status = JobControllerUtils.getJobStatus(linkStatus, linkStatus2);
        }
        cdcJobExecution.setErrorCode(CommonConstants.EMPTY);
        cdcJobExecution.setErrorMessage(CommonConstants.EMPTY);
        updateJobExecution(status, cdcJobExecution, this.jobExecutionManagement);
    }

    private void updateJobExecution(String str, CdcJobExecution cdcJobExecution, JobExecutionManagement jobExecutionManagement) {
        if (str == null || str.equals(cdcJobExecution.getStatus())) {
            return;
        }
        cdcJobExecution.setStatus(str);
        jobExecutionManagement.updateJobExecution(cdcJobExecution);
    }

    private void stopConnectorIfPresent(CdcJobExecution cdcJobExecution, String str, String str2, JobExecutionManagement jobExecutionManagement, String str3) throws Exception {
        if (str != null) {
            try {
                stopConnector(jobExecutionManagement.getConnection((str3.equals(BodyConstants.LOG_SOURCE) ? cdcJobExecution.getSourceConnectorId() : cdcJobExecution.getTargetConnectorId()).intValue()), str2, str3);
            } catch (Exception e) {
                log.error("Stop connector {} failed", str2, e);
            }
        }
    }

    private String getLinkStatus(boolean z, String str) {
        if (z) {
            return ApplicationStatus.RUNNING.getStatus();
        }
        if (JobControllerUtils.isConnectorRunning(str)) {
            return JobControllerUtils.getConnectorTaskStatus(str, true);
        }
        return null;
    }

    void updateJobStatus(CdcJobExecution cdcJobExecution, Object obj, Object obj2, boolean z, boolean z2, String str, String str2) {
        String status = z ? ApplicationStatus.RUNNING.getStatus() : JobControllerUtils.getState(obj);
        String status2 = z2 ? ApplicationStatus.RUNNING.getStatus() : JobControllerUtils.getState(obj2);
        String status3 = z ? ApplicationStatus.RUNNING.getStatus() : JobControllerUtils.connectorTaskStatus(status, str, false);
        String status4 = z2 ? ApplicationStatus.RUNNING.getStatus() : JobControllerUtils.connectorTaskStatus(status2, str2, false);
        if (StringUtils.isNotBlank(status) && StringUtils.isNotBlank(status2)) {
            if (ApplicationStatus.FAILED.getStatus().equals(status3)) {
                updateStatus(cdcJobExecution, str, getErrorInfo(obj));
            } else if (ApplicationStatus.FAILED.getStatus().equals(status4)) {
                updateStatus(cdcJobExecution, str2, getErrorInfo(obj2));
            } else {
                cdcJobExecution.setStatus(JobControllerUtils.getJobStatus(status3, status4));
            }
        }
    }

    private void updateJobStatusWithAppAsTarget(CdcJobExecution cdcJobExecution, Object obj, boolean z, String str) {
        String status = z ? ApplicationStatus.RUNNING.getStatus() : JobControllerUtils.getState(obj);
        String name = YarnClientUtil.getStatus(cdcJobExecution.getAppId()).name();
        String status2 = z ? ApplicationStatus.RUNNING.getStatus() : JobControllerUtils.connectorTaskStatus(status, str, false);
        if (StringUtils.isNotBlank(status) && StringUtils.isNotBlank(name)) {
            if (ApplicationStatus.FAILED.getStatus().equals(status2)) {
                updateStatus(cdcJobExecution, str, getErrorInfo(obj));
            } else if (ApplicationStatus.FAILED.getStatus().equals(name)) {
                updateStatus(cdcJobExecution, cdcJobExecution.getAppId(), getErrorInfo(name));
            } else {
                cdcJobExecution.setStatus(JobControllerUtils.getJobStatus(status2, name));
            }
        }
    }

    String getErrorInfo(Object obj) {
        if (obj instanceof String) {
            return (String) obj;
        }
        Map<String, String> info = JobControllerUtils.getInfo(obj);
        return info == null ? CommonConstants.EMPTY : String.valueOf(info);
    }

    void updateStatus(CdcJobExecution cdcJobExecution, String str, String str2) {
        if (CommonConstants.EMPTY.equals(str2)) {
            return;
        }
        cdcJobExecution.setErrorMessage(str + CommonConstants.SPACE + CommonConstants.COLON + CommonConstants.NEWLINE + str2 + CommonConstants.NEWLINE);
    }

    private String getConnectorName(CdcJobDefinition cdcJobDefinition, CdcJobExecution cdcJobExecution, String str) {
        return CommonConstants.TYPE_SOURCE.equals(str) ? cdcJobDefinition.getName() + "---" + cdcJobExecution.getSourceConnectorId() + "---" + cdcJobExecution.getId() : cdcJobDefinition.getName() + "---" + cdcJobExecution.getTargetConnectorId() + "---" + cdcJobExecution.getId();
    }

    void startConnector(CdcConnection cdcConnection, CdcJobDefinition cdcJobDefinition, CdcJobExecution cdcJobExecution, String str) {
        if (JobControllerUtils.isKafkaTypeLink(cdcConnection)) {
            return;
        }
        String connectorName = getConnectorName(cdcJobDefinition, cdcJobExecution, str);
        HashMap hashMap = new HashMap();
        hashMap.put(CommonConstants.CONNECTION_ID, String.valueOf(cdcConnection.getId()));
        hashMap.put(CommonConstants.JOB_DEFINITION_ID, String.valueOf(cdcJobDefinition.getId()));
        hashMap.put(CommonConstants.JOB_SUBMISSION_ID, String.valueOf(cdcJobExecution.getId()));
        hashMap.put("name", connectorName);
        String addEncryptSensitiveDataEnableProperty = JobExecutionUtils.addEncryptSensitiveDataEnableProperty(new CrypterUtils().encryptData(CommonConstants.TYPE_SOURCE.equals(str) ? EntityConvertor.prepareRequestBodyForStart(cdcConnection.getProperties(), cdcJobDefinition.getDecryptedSourceProperties(), cdcJobDefinition.getProperties(), hashMap) : EntityConvertor.prepareRequestBodyForStart(cdcConnection.getProperties(), cdcJobDefinition.getDecryptedTargetProperties(), cdcJobDefinition.getProperties(), hashMap)));
        JobExecutionUtils.updateExecutionProperties(cdcJobExecution, connectorName, addEncryptSensitiveDataEnableProperty);
        Response doRequest = RestClient.doRequest(RestConstants.CONNECTOR_SUFFIX, addEncryptSensitiveDataEnableProperty, RestConstants.HTTP_POST);
        String sanitize = JsonSanitizer.sanitize((String) doRequest.readEntity(String.class));
        if (doRequest.getStatus() == HttpStatus.CREATED.value()) {
            if (ApplicationStatus.BOOTING.getStatus().equals(cdcJobExecution.getStatus())) {
                cdcJobExecution.setStatus(ApplicationStatus.STARTED.getStatus());
            }
        } else {
            try {
                Map map = (Map) CommonConstants.JSON_MAPPER.readValue(sanitize, new TypeReference<Map<String, Object>>() { // from class: com.huawei.cdc.service.job.controller.CDCJobExecutionResource.3
                });
                throw new EntityException(ErrorConstants.KAFKA_CONNECT_ERROR, String.valueOf(map.get(RestConstants.KAFKA_ERROR_CODE)), String.valueOf(map.get("message")));
            } catch (IOException e) {
                log.error("Could not read JSON response {}", sanitize, e);
                throw new EntityException(ErrorConstants.KAFKA_CONNECT_INVALID_RESPONSE, "starting " + connectorName);
            }
        }
    }

    private void waitForAppStatus(String str, String str2, String str3) {
        int parseInt = Integer.parseInt(CommonConfiguration.SPARK_START_OR_STOP_MAX_RETRY);
        String name = YarnClientUtil.getStatus(str).name();
        while (true) {
            int i = parseInt;
            parseInt--;
            if (i <= 0 || name.equals(str2)) {
                return;
            }
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                JobExecutionUtils.handleAppStartStopFailure(str, str3);
            }
        }
    }

    private CdcJobExecution createJobExecution(CdcJobExecution cdcJobExecution, CdcJobDefinition cdcJobDefinition) {
        String name = cdcJobDefinition.getName();
        String description = cdcJobDefinition.getDescription();
        String jobType = cdcJobDefinition.getJobType();
        int intValue = cdcJobDefinition.getId().intValue();
        cdcJobExecution.setName(name);
        cdcJobExecution.setDescription(description);
        cdcJobExecution.setJobType(jobType);
        cdcJobExecution.setJobDefinitionId(Integer.valueOf(intValue));
        cdcJobExecution.setStatus(ApplicationStatus.NEW.getStatus());
        return cdcJobExecution;
    }

    private CdcConnection getConnection(int i, JobExecutionManagement jobExecutionManagement, String str, String str2) {
        try {
            CdcConnection connection = jobExecutionManagement.getConnection(i);
            if (connection != null) {
                return connection;
            }
            if (CommonConstants.TYPE_SOURCE.equals(str2)) {
                throw new ParameterException(ErrorConstants.CDL_NO_SOURCE_CONNECTION_LINK, str);
            }
            throw new ParameterException(ErrorConstants.CDL_NO_TARGET_CONNECTION_LINK, str);
        } catch (EclipseLinkException e) {
            throw new EntityException(ErrorConstants.INVALID_SOURCE_ERROR, str);
        }
    }
}
