package org.apache.hudi.utilities.sources.helpers;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hudi.utilities.config.S3SourceConfig;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.class */
public class CloudObjectsSelector {
    public static final String S3_PREFIX = "s3://";
    public static final String SQS_ATTR_APPROX_MESSAGES = "ApproximateNumberOfMessages";
    static final String SQS_MODEL_MESSAGE = "Message";
    static final String SQS_MODEL_EVENT_RECORDS = "Records";
    static final String SQS_MODEL_EVENT_NAME = "eventName";
    static final String S3_MODEL_EVENT_TIME = "eventTime";
    static final String S3_FILE_SIZE = "fileSize";
    static final String S3_FILE_PATH = "filePath";
    public final String queueUrl;
    public final int longPollWait;
    public final int maxMessagePerBatch;
    public final int maxMessagesPerRequest;
    public final int visibilityTimeout;
    public final TypedProperties props;
    public final String fsName;
    private final String regionName;
    public static final List<String> ALLOWED_S3_EVENT_PREFIX = Collections.singletonList("ObjectCreated");
    public static volatile Logger log = LoggerFactory.getLogger(CloudObjectsSelector.class);

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector$Config.class */
    public static class Config {

        @Deprecated
        public static final String S3_SOURCE_QUEUE_URL = S3SourceConfig.S3_SOURCE_QUEUE_URL.key();

        @Deprecated
        public static final String S3_SOURCE_QUEUE_REGION = S3SourceConfig.S3_SOURCE_QUEUE_REGION.key();

        @Deprecated
        public static final String S3_SOURCE_QUEUE_FS = S3SourceConfig.S3_SOURCE_QUEUE_FS.key();

        @Deprecated
        public static final String S3_QUEUE_LONG_POLL_WAIT = S3SourceConfig.S3_QUEUE_LONG_POLL_WAIT.key();

        @Deprecated
        public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH = S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH.key();

        @Deprecated
        public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST = S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST.key();

        @Deprecated
        public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT = S3SourceConfig.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT.key();

        @Deprecated
        public static final String SOURCE_INPUT_SELECTOR = DFSPathSelectorConfig.SOURCE_INPUT_SELECTOR.key();
    }

    public CloudObjectsSelector(TypedProperties typedProperties) {
        ConfigUtils.checkRequiredConfigProperties(typedProperties, Arrays.asList(S3SourceConfig.S3_SOURCE_QUEUE_URL, S3SourceConfig.S3_SOURCE_QUEUE_REGION));
        this.props = typedProperties;
        this.queueUrl = ConfigUtils.getStringWithAltKeys(typedProperties, S3SourceConfig.S3_SOURCE_QUEUE_URL);
        this.regionName = ConfigUtils.getStringWithAltKeys(typedProperties, S3SourceConfig.S3_SOURCE_QUEUE_REGION);
        this.fsName = ConfigUtils.getStringWithAltKeys(typedProperties, S3SourceConfig.S3_SOURCE_QUEUE_FS, true);
        this.longPollWait = ConfigUtils.getIntWithAltKeys(typedProperties, S3SourceConfig.S3_QUEUE_LONG_POLL_WAIT);
        this.maxMessagePerBatch = ConfigUtils.getIntWithAltKeys(typedProperties, S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH);
        this.maxMessagesPerRequest = ConfigUtils.getIntWithAltKeys(typedProperties, S3SourceConfig.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST);
        this.visibilityTimeout = ConfigUtils.getIntWithAltKeys(typedProperties, S3SourceConfig.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT);
    }

    protected Map<String, String> getSqsQueueAttributes(SqsClient sqsClient, String str) {
        return sqsClient.getQueueAttributes((GetQueueAttributesRequest) GetQueueAttributesRequest.builder().queueUrl(str).attributeNames(new QueueAttributeName[]{QueueAttributeName.fromValue(SQS_ATTR_APPROX_MESSAGES)}).build()).attributesAsStrings();
    }

    protected Map<String, Object> getFileAttributesFromRecord(JSONObject jSONObject) throws UnsupportedEncodingException {
        HashMap hashMap = new HashMap();
        long time = Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(jSONObject.getString(S3_MODEL_EVENT_TIME)))).getTime();
        JSONObject jSONObject2 = jSONObject.getJSONObject("s3").getJSONObject("object");
        String str = this.fsName + "://" + URLDecoder.decode(jSONObject.getJSONObject("s3").getJSONObject("bucket").getString(CloudObjectsSelectorCommon.GCS_OBJECT_KEY), "UTF-8") + "/" + URLDecoder.decode(jSONObject2.getString("key"), "UTF-8");
        hashMap.put(S3_MODEL_EVENT_TIME, Long.valueOf(time));
        hashMap.put(S3_FILE_SIZE, Long.valueOf(jSONObject2.getLong(CloudObjectsSelectorCommon.GCS_OBJECT_SIZE)));
        hashMap.put(S3_FILE_PATH, str);
        return hashMap;
    }

    public SqsClient createAmazonSqsClient() {
        return (SqsClient) SqsClient.builder().region(Region.of(this.regionName)).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Message> getMessagesToProcess(SqsClient sqsClient, String str, int i, int i2, int i3, int i4) {
        ArrayList arrayList = new ArrayList();
        ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(str).waitTimeSeconds(Integer.valueOf(i)).visibilityTimeout(Integer.valueOf(i2)).maxNumberOfMessages(Integer.valueOf(i4)).build();
        long parseLong = Long.parseLong(getSqsQueueAttributes(sqsClient, str).get(SQS_ATTR_APPROX_MESSAGES));
        log.info("Approximately " + parseLong + " messages available in queue.");
        long min = Math.min(parseLong, i3);
        for (int i5 = 0; i5 < ((int) Math.ceil(min / i4)); i5++) {
            List messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
            log.debug("Number of messages: " + messages.size());
            arrayList.addAll(messages);
            if (messages.isEmpty()) {
                break;
            }
        }
        return arrayList;
    }

    protected List<List<Message>> createListPartitions(List<Message> list, int i) {
        ArrayList arrayList = new ArrayList();
        if (list.size() == 0 || i < 1) {
            return arrayList;
        }
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (i3 >= list.size()) {
                return arrayList;
            }
            int min = Math.min(i3 + i, list.size());
            if (i3 > min) {
                throw new IndexOutOfBoundsException("Index " + i3 + " is out of the list range <0," + (list.size() - 1) + ">");
            }
            arrayList.add(new ArrayList(list.subList(i3, min)));
            i2 = i3 + i;
        }
    }

    protected void deleteBatchOfMessages(SqsClient sqsClient, String str, List<Message> list) {
        if (list.isEmpty()) {
            return;
        }
        DeleteMessageBatchRequest.Builder queueUrl = DeleteMessageBatchRequest.builder().queueUrl(str);
        ArrayList arrayList = new ArrayList();
        for (Message message : list) {
            arrayList.add((DeleteMessageBatchRequestEntry) DeleteMessageBatchRequestEntry.builder().id(message.messageId()).receiptHandle(message.receiptHandle()).build());
        }
        queueUrl.entries(arrayList);
        List list2 = (List) sqsClient.deleteMessageBatch((DeleteMessageBatchRequest) queueUrl.build()).failed().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            log.info("Successfully deleted " + arrayList.size() + " messages from queue.");
        } else {
            log.warn("Failed to delete " + list2.size() + " messages out of " + arrayList.size() + " from queue.");
        }
    }

    public void deleteProcessedMessages(SqsClient sqsClient, String str, List<Message> list) {
        if (list.isEmpty()) {
            return;
        }
        Iterator<List<Message>> it = createListPartitions(list, 10).iterator();
        while (it.hasNext()) {
            deleteBatchOfMessages(sqsClient, str, it.next());
        }
    }
}
