package org.apache.hudi.utilities.sources.helpers;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.json.JSONException;
import org.json.JSONObject;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.class */
public class S3EventsMetaSelector extends CloudObjectsSelector {
    private static final String S3_EVENT_RESPONSE_ELEMENTS = "responseElements";

    public S3EventsMetaSelector(TypedProperties typedProperties) {
        super(typedProperties);
    }

    public static S3EventsMetaSelector createSourceSelector(TypedProperties typedProperties) {
        String stringWithAltKeys = ConfigUtils.getStringWithAltKeys(typedProperties, DFSPathSelectorConfig.SOURCE_INPUT_SELECTOR, S3EventsMetaSelector.class.getName());
        try {
            S3EventsMetaSelector s3EventsMetaSelector = (S3EventsMetaSelector) ReflectionUtils.loadClass(stringWithAltKeys, new Class[]{TypedProperties.class}, new Object[]{typedProperties});
            log.info("Using path selector " + s3EventsMetaSelector.getClass().getName());
            return s3EventsMetaSelector;
        } catch (Exception e) {
            throw new HoodieException("Could not load source selector class " + stringWithAltKeys, e);
        }
    }

    protected List<Map<String, Object>> getValidEvents(SqsClient sqsClient, List<Message> list) throws IOException {
        return processAndDeleteInvalidMessages(list, getMessagesToProcess(sqsClient, this.queueUrl, this.longPollWait, this.visibilityTimeout, this.maxMessagePerBatch, this.maxMessagesPerRequest));
    }

    private List<Map<String, Object>> processAndDeleteInvalidMessages(List<Message> list, List<Message> list2) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Message message : list2) {
            JSONObject jSONObject = new JSONObject(message.body());
            ObjectMapper objectMapper = new ObjectMapper();
            Map map = jSONObject.has("Message") ? (Map) objectMapper.readValue(jSONObject.getString("Message"), Map.class) : (Map) objectMapper.readValue(jSONObject.toString(), Map.class);
            if (map.containsKey("Records")) {
                for (Map map2 : (List) map.get("Records")) {
                    map2.remove(S3_EVENT_RESPONSE_ELEMENTS);
                    String str = (String) map2.get("eventName");
                    Stream<String> stream = ALLOWED_S3_EVENT_PREFIX.stream();
                    str.getClass();
                    if (stream.anyMatch(str::startsWith)) {
                        arrayList.add(map2);
                    } else {
                        log.debug(String.format("This S3 event %s is not allowed, so ignoring it.", str));
                    }
                }
            } else {
                log.debug(String.format("Message is not expected format or it's s3:TestEvent. Message: %s", message));
            }
            list.add(message);
        }
        return arrayList;
    }

    public Pair<List<String>, String> getNextEventsFromQueue(SqsClient sqsClient, Option<String> option, List<Message> list) {
        list.clear();
        log.info("Reading messages....");
        try {
            log.info("Start Checkpoint : " + option);
            List<Map<String, Object>> validEvents = getValidEvents(sqsClient, list);
            log.info("Number of valid events: " + validEvents.size());
            ArrayList arrayList = new ArrayList();
            long orElse = validEvents.stream().mapToLong(map -> {
                return Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse((String) map.get("eventTime")))).getTime();
            }).max().orElse(((Long) option.map(Long::parseLong).orElse(0L)).longValue());
            Iterator<Map<String, Object>> it = validEvents.iterator();
            while (it.hasNext()) {
                arrayList.add(new ObjectMapper().writeValueAsString(it.next()).replace("%3D", "=").replace("%24", "$").replace("%A3", "£").replace("%23", "#").replace("%26", "&").replace("%3F", "?").replace("%7E", "~").replace("%25", "%").replace("%2B", "+"));
            }
            return new ImmutablePair(arrayList, orElse == 0 ? (String) option.orElse((Object) null) : String.valueOf(orElse));
        } catch (IOException | JSONException e) {
            throw new HoodieException("Unable to read from SQS: ", e);
        }
    }
}
