package org.apache.flume.source.s3;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.huawei.hadoop.security.crypter.CrypterUtil;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.serialization.DecodeErrorPolicy;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.s3.S3ObjectEventReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/s3/OBSSource.class */
public class OBSSource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(OBSSource.class);
    private static final int POLL_DELAY_MS = 1000;
    public static final String FILENAME_HEADER = "fileHeader";
    public static final boolean DEFAULT_FILE_HEADER = false;
    public static final String BASENAME_HEADER_KEY = "basenameHeaderKey";
    public static final String DEFAULT_BASENAME_HEADER_KEY = "basename";
    private String bucketName;
    private String directory;
    private boolean basenameHeader;
    private String basenameHeaderKey;
    private int batchSize;
    private MetadataBackingStore backingStore;
    private String deserializerType;
    private Context deserializerContext;
    private String inputCharset;
    private DecodeErrorPolicy decodeErrorPolicy;
    private ScheduledExecutorService executor;
    private int maxBackoff;
    private SourceCounter sourceCounter;
    private S3ObjectEventReader reader;
    private AmazonS3Client s3Client;
    private volatile boolean hasFatalError = false;
    private boolean backoff = true;
    private boolean hitChannelException = false;

    /* loaded from: input_file:org/apache/flume/source/s3/OBSSource$S3SourceRunnable.class */
    private class S3SourceRunnable implements Runnable {
        private S3ObjectEventReader reader;
        private SourceCounter sourceCounter;

        public S3SourceRunnable(S3ObjectEventReader s3ObjectEventReader, SourceCounter sourceCounter) {
            this.reader = s3ObjectEventReader;
            this.sourceCounter = sourceCounter;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 250;
            while (!Thread.interrupted()) {
                try {
                    List<Event> readEvents = this.reader.readEvents(OBSSource.this.batchSize);
                    if (readEvents.isEmpty()) {
                        break;
                    }
                    this.sourceCounter.addToEventReceivedCount(readEvents.size());
                    this.sourceCounter.incrementAppendBatchReceivedCount();
                    try {
                        OBSSource.this.getChannelProcessor().processEventBatch(readEvents);
                        this.reader.commit();
                        i = 250;
                        this.sourceCounter.addToEventAcceptedCount(readEvents.size());
                        this.sourceCounter.incrementAppendBatchAcceptedCount();
                    } catch (ChannelException e) {
                        OBSSource.logger.warn("The channel is full, and cannot write data now. The source will try again after " + String.valueOf(i) + " milliseconds");
                        OBSSource.this.hitChannelException = true;
                        if (OBSSource.this.backoff) {
                            TimeUnit.MILLISECONDS.sleep(i);
                            int i2 = i << 1;
                            i = i2 >= OBSSource.this.maxBackoff ? OBSSource.this.maxBackoff : i2;
                        }
                    }
                } catch (Throwable th) {
                    OBSSource.logger.error("FATAL: " + OBSSource.this.toString() + ": Uncaught exception in S3Source thread. Restart or reconfigure Flume to continue processing.", th);
                    OBSSource.this.hasFatalError = true;
                    Throwables.propagate(th);
                    return;
                }
            }
        }
    }

    public synchronized void configure(Context context) {
        this.bucketName = context.getString(S3SourceConfigurationConstants.BUCKET_NAME);
        this.directory = context.getString(S3SourceConfigurationConstants.PREFIX);
        Preconditions.checkState(this.bucketName != null, "Configuration must specify a bucket Name");
        String string = context.getString(S3SourceConfigurationConstants.ACCESS_KEY);
        Preconditions.checkState(this.bucketName != null, "Configuration must specify an access key Id");
        String string2 = context.getString(S3SourceConfigurationConstants.SECRET_KEY);
        Preconditions.checkState(this.bucketName != null, "Configuration must specify a secret key");
        this.basenameHeader = context.getBoolean("basenameHeader", false).booleanValue();
        this.basenameHeaderKey = context.getString(BASENAME_HEADER_KEY, DEFAULT_BASENAME_HEADER_KEY);
        this.batchSize = context.getInteger(S3SourceConfigurationConstants.BATCH_SIZE, 1000).intValue();
        this.backingStore = MetadataBackingStoreFactory.get(context.getString(S3SourceConfigurationConstants.BACKING_STORE, S3SourceConfigurationConstants.DEFAULT_BACKING_STORE), this.bucketName, context);
        this.deserializerType = context.getString(S3SourceConfigurationConstants.DESERIALIZER, S3SourceConfigurationConstants.DEFAULT_DESERIALIZER);
        this.deserializerContext = new Context(context.getSubProperties("deserializer."));
        this.inputCharset = context.getString(S3SourceConfigurationConstants.INPUT_CHARSET, S3SourceConfigurationConstants.DEFAULT_INPUT_CHARSET);
        this.decodeErrorPolicy = DecodeErrorPolicy.valueOf(context.getString(S3SourceConfigurationConstants.DECODE_ERROR_POLICY, S3SourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY).toUpperCase(Locale.ENGLISH));
        this.maxBackoff = context.getInteger(S3SourceConfigurationConstants.MAX_BACKOFF, S3SourceConfigurationConstants.DEFAULT_MAX_BACKOFF).intValue();
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
        this.s3Client = new AmazonS3Client(new BasicAWSCredentials(string, CrypterUtil.decrypt(string2)));
        String string3 = context.getString(S3SourceConfigurationConstants.END_POINT);
        if (string3 != null) {
            this.s3Client.setEndpoint(string3);
        }
    }

    public synchronized void start() {
        logger.info("S3Source source starting with bucket - {}", this.bucketName);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        try {
            this.reader = new S3ObjectEventReader.Builder().deserializerType(this.deserializerType).deserializerContext(this.deserializerContext).inputCharset(this.inputCharset).decodeErrorPolicy(this.decodeErrorPolicy).setS3Client(this.s3Client).backingStore(this.backingStore).bucket(this.bucketName).directory(this.directory).annotateBaseName(Boolean.valueOf(this.basenameHeader)).baseNameHeader(this.basenameHeaderKey).build();
            this.executor.scheduleWithFixedDelay(new S3SourceRunnable(this.reader, this.sourceCounter), 0L, 1000L, TimeUnit.MILLISECONDS);
            super.start();
            logger.info("S3Source source started");
            this.sourceCounter.start();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating s3 event parser", e);
        }
    }

    public synchronized void stop() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.info("Interrupted while awaiting termination", e);
        }
        this.executor.shutdownNow();
        super.stop();
        this.sourceCounter.stop();
        logger.info("S3 source {} stopped. Metrics: {}", getName(), this.sourceCounter);
    }

    public String toString() {
        return "S3 Source  " + getName() + ": { bucket Name: " + this.bucketName + " }";
    }

    @VisibleForTesting
    protected boolean hasFatalError() {
        return this.hasFatalError;
    }

    @VisibleForTesting
    public void setS3Client(AmazonS3Client amazonS3Client) {
        this.s3Client = amazonS3Client;
    }

    @VisibleForTesting
    protected void setBackOff(boolean z) {
        this.backoff = z;
    }

    @VisibleForTesting
    protected boolean hitChannelException() {
        return this.hitChannelException;
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return this.sourceCounter;
    }
}
