package org.apache.flume.source.s3;

import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.serialization.DecodeErrorPolicy;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.EventDeserializerFactory;
import org.apache.flume.serialization.PositionTracker;
import org.apache.flume.serialization.ResettableGenericInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/flume/source/s3/S3ObjectEventReader.class */
public class S3ObjectEventReader implements ReliableEventReader {
    private static final Logger logger = LoggerFactory.getLogger(S3ObjectEventReader.class);
    private static String EVENTPOSTION = "eventPostion";
    private static String HASHCODE = "hashCode";
    private final String bucketName;
    private final String directory;
    private final boolean annotateBaseName;
    private final String baseNameHeader;
    private final String deserializerType;
    private final Context deserializerContext;
    private final Charset inputCharset;
    private final DecodeErrorPolicy decodeErrorPolicy;
    private Optional<S3ObjectInfo> currentFile;
    private Optional<S3ObjectInfo> lastFileRead;
    private boolean committed;
    private int listFilesCount;
    private ObjectListing objListing;
    private ListIterator<S3ObjectSummary> objIter;
    private AmazonS3Client s3Client;
    private MetadataBackingStore backingStore;

    /* loaded from: input_file:org/apache/flume/source/s3/S3ObjectEventReader$Builder.class */
    public static class Builder {
        private String bucketName;
        private String directory;
        private boolean annotateBaseName;
        private String baseNameHeader;
        private String deserializerType = S3SourceConfigurationConstants.DEFAULT_DESERIALIZER;
        private Context deserializerContext = new Context();
        private String inputCharset = S3SourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
        private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf(S3SourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY.toUpperCase(Locale.ENGLISH));
        private AmazonS3Client s3Client;
        private MetadataBackingStore backingStore;

        public Builder setS3Client(AmazonS3Client amazonS3Client) {
            this.s3Client = amazonS3Client;
            return this;
        }

        public Builder bucket(String str) {
            this.bucketName = str;
            return this;
        }

        public Builder directory(String str) {
            this.directory = str;
            return this;
        }

        public Builder annotateBaseName(Boolean bool) {
            this.annotateBaseName = bool.booleanValue();
            return this;
        }

        public Builder baseNameHeader(String str) {
            this.baseNameHeader = str;
            return this;
        }

        public Builder deserializerType(String str) {
            this.deserializerType = str;
            return this;
        }

        public Builder deserializerContext(Context context) {
            this.deserializerContext = context;
            return this;
        }

        public Builder inputCharset(String str) {
            this.inputCharset = str;
            return this;
        }

        public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) {
            this.decodeErrorPolicy = decodeErrorPolicy;
            return this;
        }

        public Builder backingStore(MetadataBackingStore metadataBackingStore) {
            this.backingStore = metadataBackingStore;
            return this;
        }

        public S3ObjectEventReader build() throws IOException {
            return new S3ObjectEventReader(this.bucketName, this.directory, this.annotateBaseName, this.baseNameHeader, this.deserializerType, this.deserializerContext, this.inputCharset, this.decodeErrorPolicy, this.s3Client, this.backingStore);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/source/s3/S3ObjectEventReader$S3ObjectInfo.class */
    public static class S3ObjectInfo {
        private final String key;
        private final long length;
        private final EventDeserializer deserializer;

        public S3ObjectInfo(String str, EventDeserializer eventDeserializer, long j) {
            this.key = str;
            this.length = j;
            this.deserializer = eventDeserializer;
        }

        public long getLength() {
            return this.length;
        }

        public EventDeserializer getDeserializer() {
            return this.deserializer;
        }

        public String getKey() {
            return this.key;
        }
    }

    private S3ObjectEventReader(String str, String str2, boolean z, String str3, String str4, Context context, String str5, DecodeErrorPolicy decodeErrorPolicy, AmazonS3Client amazonS3Client, MetadataBackingStore metadataBackingStore) throws IOException {
        this.currentFile = Optional.absent();
        this.lastFileRead = Optional.absent();
        this.committed = true;
        this.listFilesCount = 0;
        Preconditions.checkNotNull(str4);
        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(str5);
        Preconditions.checkNotNull(amazonS3Client);
        Preconditions.checkNotNull(metadataBackingStore);
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing {} with directory={}, metaDataStoreType={}, deserializer={}", new Object[]{S3ObjectEventReader.class.getSimpleName(), str, metadataBackingStore.getName(), str4});
        }
        this.bucketName = str;
        this.directory = str2;
        this.annotateBaseName = z;
        this.baseNameHeader = str3;
        this.deserializerType = str4;
        this.deserializerContext = context;
        this.inputCharset = Charset.forName(str5);
        this.decodeErrorPolicy = (DecodeErrorPolicy) Preconditions.checkNotNull(decodeErrorPolicy);
        this.s3Client = amazonS3Client;
        this.backingStore = metadataBackingStore;
    }

    @VisibleForTesting
    int getListFilesCount() {
        return this.listFilesCount;
    }

    public String getLastFileRead() {
        if (this.lastFileRead.isPresent()) {
            return ((S3ObjectInfo) this.lastFileRead.get()).getKey();
        }
        return null;
    }

    public Event readEvent() throws IOException {
        List<Event> readEvents = readEvents(1);
        if (readEvents.isEmpty()) {
            return null;
        }
        return readEvents.get(0);
    }

    public List<Event> readEvents(int i) throws IOException {
        if (this.committed) {
            if (!this.currentFile.isPresent()) {
                this.currentFile = getNextFile();
            }
            if (!this.currentFile.isPresent()) {
                return Collections.emptyList();
            }
        } else {
            if (!this.currentFile.isPresent()) {
                throw new IllegalStateException("File should exist when commit is outstanding.");
            }
            logger.info("Last read was never committed - resetting mark position.");
            ((S3ObjectInfo) this.currentFile.get()).getDeserializer().reset();
        }
        EventDeserializer deserializer = ((S3ObjectInfo) this.currentFile.get()).getDeserializer();
        long position = deserializer.getPosition();
        List<Event> readEvents = deserializer.readEvents(i);
        setPositionIntoEvent(readEvents, position, deserializer.getPosition());
        if (readEvents.isEmpty()) {
            retireCurrentFile();
            this.currentFile = getNextFile();
            if (!this.currentFile.isPresent()) {
                return Collections.emptyList();
            }
            EventDeserializer deserializer2 = ((S3ObjectInfo) this.currentFile.get()).getDeserializer();
            long position2 = deserializer2.getPosition();
            readEvents = deserializer2.readEvents(i);
            setPositionIntoEvent(readEvents, position2, deserializer2.getPosition());
        }
        for (Event event : readEvents) {
            event.getHeaders().put(HASHCODE, Integer.toString(((S3ObjectInfo) this.currentFile.get()).getKey().hashCode()));
            if (this.annotateBaseName) {
                event.getHeaders().put(this.baseNameHeader, ((S3ObjectInfo) this.currentFile.get()).getKey());
            }
        }
        this.committed = false;
        this.lastFileRead = this.currentFile;
        return readEvents;
    }

    public void close() throws IOException {
        if (this.currentFile.isPresent()) {
            ((S3ObjectInfo) this.currentFile.get()).getDeserializer().close();
            this.currentFile = Optional.absent();
        }
    }

    public void commit() throws IOException {
        if (this.committed || !this.currentFile.isPresent()) {
            return;
        }
        ((S3ObjectInfo) this.currentFile.get()).getDeserializer().mark();
        this.committed = true;
    }

    private void retireCurrentFile() throws IOException {
        Preconditions.checkState(this.currentFile.isPresent());
        String key = ((S3ObjectInfo) this.currentFile.get()).getKey();
        ((S3ObjectInfo) this.currentFile.get()).getDeserializer().close();
        this.backingStore.add(key);
        logger.info("retireCurrentFile: file is {}.", key);
    }

    private Optional<S3ObjectInfo> getNextFile() {
        if (this.objListing == null) {
            logger.info("Getting new listing");
            this.objListing = this.s3Client.listObjects(new ListObjectsRequest().withBucketName(this.bucketName).withPrefix(this.directory).withDelimiter("/"));
            this.objIter = null;
        }
        if (this.objIter == null) {
            this.objIter = this.objListing.getObjectSummaries().listIterator();
        }
        if (this.objIter.hasNext()) {
            S3ObjectSummary next = this.objIter.next();
            return this.backingStore.contains(next.getKey()) ? getNextFile() : openFile(next);
        }
        if (!this.objListing.isTruncated()) {
            this.objListing = null;
            return Optional.absent();
        }
        this.objListing = this.s3Client.listNextBatchOfObjects(this.objListing);
        this.objIter = null;
        return getNextFile();
    }

    private Optional<S3ObjectInfo> openFile(S3ObjectSummary s3ObjectSummary) {
        String key = s3ObjectSummary.getKey();
        try {
            PositionTracker positionTracker = this.backingStore.getPositionTracker(key);
            if (!positionTracker.getTarget().equals(key)) {
                positionTracker.close();
                logger.debug("Deleting Meta file");
                this.backingStore.resetPositionTracker();
                positionTracker = this.backingStore.getPositionTracker(key);
            }
            Preconditions.checkState(positionTracker.getTarget().equals(key), "Tracker target %s does not equal expected filename %s", new Object[]{positionTracker.getTarget(), key});
            return Optional.of(new S3ObjectInfo(key, EventDeserializerFactory.getInstance(this.deserializerType, this.deserializerContext, new ResettableGenericInputStream(new S3StreamCreator(this.s3Client, this.bucketName, key), positionTracker, 16384, this.inputCharset, this.decodeErrorPolicy, s3ObjectSummary.getSize())), s3ObjectSummary.getSize()));
        } catch (FileNotFoundException e) {
            logger.warn("Could not find file: " + key, e);
            return Optional.absent();
        } catch (IOException e2) {
            logger.error("Exception opening file: " + key, e2);
            return Optional.absent();
        }
    }

    private void setPositionIntoEvent(List<Event> list, long j, long j2) {
        if (list == null || list.isEmpty() || j == -1 || j2 == -1) {
            return;
        }
        long length = ((S3ObjectInfo) this.currentFile.get()).getLength();
        if (list.size() == 1 && j == 0 && j2 >= length) {
            list.get(0).getHeaders().put(EVENTPOSTION, "FE");
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            list.get(i).getHeaders().put(EVENTPOSTION, "M");
        }
        if (j == 0) {
            list.get(0).getHeaders().put(EVENTPOSTION, "F");
        }
        if (j2 >= length) {
            list.get(list.size() - 1).getHeaders().put(EVENTPOSTION, "E");
        }
    }
}
