package com.huawei.flume.sinks.elasticsearch.client;

import com.google.gson.Gson;
import com.huawei.flume.sinks.elasticsearch.ESSinkConstants;
import com.huawei.flume.sinks.elasticsearch.ElasticSearchEventSerializer;
import com.huawei.flume.sinks.elasticsearch.IndexNameBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/flume/sinks/elasticsearch/client/ESRestClient.class */
public class ESRestClient implements ESClient {
    private static final String HTTP_METHOD_POST = "POST";
    private static final String HTTP_METHOD_GET = "GET";
    private static final String BULK_ENDPOINT = "_bulk";
    private static final String INDEX_OPERATION_NAME = "index";
    private static final String INDEX_PARAM = "_index";
    private static final String TYPE_PARAM = "_type";
    private static final String CLUSTER_HEALTH_BULK = "/_cluster/health";
    private static final String CALLBACK_CONNECT_TIMEOUT = "callbackConnectTimeout";
    private static final String CALLBACK_SOCKET_TIMEOUT = "callbackSocketTimeout";
    private static final String BUILDER_MAX_TIMEOUT = "builderMaxTimeout";
    private static final int CALLBACK_CONNECT_TIMEOUT_DEFAULT = 5000;
    private static final int CALLBACK_SOCKET_TIMEOUT_DEFAULT = 60000;
    private static final int BUILDER_MAX_TIMEOUT_DEFAULT = 60000;
    private static final int RETRY_MIN_TIMES = 12;
    private RestClient client;
    private ElasticSearchEventSerializer serializer;
    private StringBuilder bulkBuilder;
    private int callbackConnectTimeout;
    private int callbackSocketTimeout;
    private int builderMaxTimeout;
    private String requestScheme;
    private static final Logger logger = LoggerFactory.getLogger(ESRestClient.class);
    private static final Header[] defaultHeaders = {new BasicHeader("Accept", "application/json"), new BasicHeader("Content-type", "application/json")};
    private static final Map<String, String> params = Collections.singletonMap("pretty", "true");

    public ESRestClient(String[] strArr, ElasticSearchEventSerializer elasticSearchEventSerializer, Context context, Boolean bool) throws IOException {
        initRestParam(context);
        this.serializer = elasticSearchEventSerializer;
        this.bulkBuilder = new StringBuilder();
        if (bool.booleanValue()) {
            this.requestScheme = "https";
            System.setProperty(ESSinkConstants.ES_SECURITY_MODULE, "true");
        } else {
            this.requestScheme = "http";
            System.setProperty(ESSinkConstants.ES_SECURITY_MODULE, "false");
        }
        openClient(strArr);
    }

    private void initRestParam(Context context) {
        if (StringUtils.isNotBlank(context.getString(CALLBACK_CONNECT_TIMEOUT))) {
            this.callbackConnectTimeout = Integer.parseInt(context.getString(CALLBACK_CONNECT_TIMEOUT));
        } else {
            this.callbackConnectTimeout = CALLBACK_CONNECT_TIMEOUT_DEFAULT;
            logger.debug("Missing Param: callbackConnectTimeout, use defalut value: 5000");
        }
        if (StringUtils.isNotBlank(context.getString(CALLBACK_SOCKET_TIMEOUT))) {
            this.callbackSocketTimeout = Integer.parseInt(context.getString(CALLBACK_SOCKET_TIMEOUT));
        } else {
            this.callbackSocketTimeout = 60000;
            logger.debug("Missing Param: callbackSocketTimeout, use defalut value: 60000");
        }
        if (StringUtils.isNotBlank(context.getString(BUILDER_MAX_TIMEOUT))) {
            this.builderMaxTimeout = Integer.parseInt(context.getString(BUILDER_MAX_TIMEOUT));
        } else {
            this.builderMaxTimeout = 60000;
            logger.debug("Missing Param: builderMaxTimeout, use defalut value: 60000");
        }
    }

    @Override // com.huawei.flume.sinks.elasticsearch.client.ESClient
    public void close() {
        if (this.client != null) {
            try {
                this.client.close();
            } catch (IOException e) {
                logger.warn("Close ESRestClient failed.");
            }
        }
        this.client = null;
    }

    @Override // com.huawei.flume.sinks.elasticsearch.client.ESClient
    public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String str) throws Exception {
        BytesReference bytes = BytesReference.bytes(this.serializer.getContentBuilder(event));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
        hashMap2.put(TYPE_PARAM, str);
        hashMap.put(INDEX_OPERATION_NAME, hashMap2);
        Gson gson = new Gson();
        synchronized (this.bulkBuilder) {
            this.bulkBuilder.append(gson.toJson(hashMap));
            this.bulkBuilder.append("\n");
            this.bulkBuilder.append(bytes.toBytesRef().utf8ToString());
            this.bulkBuilder.append("\n");
        }
    }

    @Override // com.huawei.flume.sinks.elasticsearch.client.ESClient
    public void execute() throws Exception {
        StringEntity stringEntity;
        synchronized (this.bulkBuilder) {
            stringEntity = new StringEntity(this.bulkBuilder.toString(), ESSinkConstants.CHARSET);
            this.bulkBuilder = new StringBuilder();
        }
        Request request = new Request(HTTP_METHOD_POST, BULK_ENDPOINT);
        request.addParameter("pretty", params.get("pretty"));
        request.setEntity(stringEntity);
        Response performRequest = this.client.performRequest(request);
        int statusCode = performRequest.getStatusLine().getStatusCode();
        logger.info("Status code from elasticsearch: " + statusCode);
        if (performRequest.getEntity() != null) {
            logger.debug("Status message from elasticsearch: " + EntityUtils.toString(performRequest.getEntity(), ESSinkConstants.CHARSET));
        }
        if (statusCode != 200) {
            if (performRequest.getEntity() == null) {
                throw new EventDeliveryException("Elasticsearch status code was: " + statusCode);
            }
            throw new EventDeliveryException(EntityUtils.toString(performRequest.getEntity(), ESSinkConstants.CHARSET));
        }
    }

    public void configure(Context context) {
    }

    private void openClient(String[] strArr) throws IOException {
        String[] split;
        logger.info("Using ElasticSearch hostnames: {} ", Arrays.toString(strArr));
        int max = Math.max(RETRY_MIN_TIMES, strArr.length * 2);
        for (int i = 0; i < max; i++) {
            String str = strArr[i % strArr.length];
            if (str.contains("[")) {
                split = str.trim().split("]:");
                split[0] = split[0] + "]";
            } else {
                split = str.trim().split(":");
            }
            logger.info("ipPort[0]=" + split[0] + ", ipPort[1]=" + split[1]);
            RestClientBuilder maxRetryTimeoutMillis = RestClient.builder(new HttpHost[]{new HttpHost(split[0], Integer.parseInt(split[1]), this.requestScheme)}).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { // from class: com.huawei.flume.sinks.elasticsearch.client.ESRestClient.1
                public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
                    return builder.setConnectTimeout(ESRestClient.this.callbackConnectTimeout).setSocketTimeout(ESRestClient.this.callbackSocketTimeout);
                }
            }).setMaxRetryTimeoutMillis(this.builderMaxTimeout);
            maxRetryTimeoutMillis.setDefaultHeaders(defaultHeaders);
            try {
                this.client = maxRetryTimeoutMillis.build();
                try {
                } catch (IOException e) {
                    logger.warn("Error to connect to " + str + ", try another host.", e);
                    close();
                    try {
                        Thread.sleep(30000L);
                    } catch (InterruptedException e2) {
                        logger.warn("Sleep was interrupted. ", e2);
                    }
                }
            } catch (Exception e3) {
                logger.warn("Connect to es failed, and the error is {}  and the rebuild times are {}/{}. ", new Object[]{e3, Integer.valueOf(i), Integer.valueOf(max)});
                close();
                try {
                    Thread.sleep(30000L);
                } catch (InterruptedException e4) {
                    logger.warn("Sleep was interrupted. ", e4);
                }
            }
            if (this.client.performRequest(new Request(HTTP_METHOD_GET, CLUSTER_HEALTH_BULK)).getStatusLine().getStatusCode() == 200) {
                logger.info("Connect has been established with {}", str);
                return;
            } else {
                logger.warn("Failed to send request to {}, try another host.", str);
                close();
            }
        }
        throw new IOException("Failed to get connection with hosts : " + Arrays.toString(strArr));
    }
}
