package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Preconditions;
import org.spark-project.guava.util.concurrent.Futures;
import org.spark-project.guava.util.concurrent.ListenableFuture;
import org.spark-project.guava.util.concurrent.ListeningExecutorService;
import org.spark-project.guava.util.concurrent.MoreExecutors;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/fs/s3a/S3ABlockOutputStream.class */
public class S3ABlockOutputStream extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) S3ABlockOutputStream.class);
    private final S3AFileSystem fs;
    private final String key;
    private final int blockSize;
    private final ProgressListener progressListener;
    private final ListeningExecutorService executorService;
    private final S3ADataBlocks.BlockFactory blockFactory;
    private MultiPartUpload multiPartUpload;
    private S3ADataBlocks.DataBlock activeBlock;
    private final S3AInstrumentation.OutputStreamStatistics statistics;
    private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
    private final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithProportionalSleep(5, 2000, TimeUnit.MILLISECONDS);
    private final byte[] singleCharWrite = new byte[1];
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private long blockCount = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/fs/s3a/S3ABlockOutputStream$BlockUploadProgress.class */
    public final class BlockUploadProgress implements ProgressListener {
        private final S3ADataBlocks.DataBlock block;
        private final ProgressListener nextListener;
        private final long transferQueueTime;
        private long transferStartTime;

        private BlockUploadProgress(S3ADataBlocks.DataBlock dataBlock, ProgressListener progressListener, long j) {
            this.block = dataBlock;
            this.transferQueueTime = j;
            this.nextListener = progressListener;
        }

        @Override // com.amazonaws.event.ProgressListener
        public void progressChanged(ProgressEvent progressEvent) {
            ProgressEventType eventType = progressEvent.getEventType();
            long bytesTransferred = progressEvent.getBytesTransferred();
            int dataSize = this.block.dataSize();
            switch (eventType) {
                case REQUEST_BYTE_TRANSFER_EVENT:
                    S3ABlockOutputStream.this.statistics.bytesTransferred(bytesTransferred);
                    break;
                case TRANSFER_PART_STARTED_EVENT:
                    this.transferStartTime = S3ABlockOutputStream.this.now();
                    S3ABlockOutputStream.this.statistics.blockUploadStarted(this.transferStartTime - this.transferQueueTime, dataSize);
                    S3ABlockOutputStream.this.incrementWriteOperations();
                    break;
                case TRANSFER_PART_COMPLETED_EVENT:
                    S3ABlockOutputStream.this.statistics.blockUploadCompleted(S3ABlockOutputStream.this.now() - this.transferStartTime, dataSize);
                    break;
                case TRANSFER_PART_FAILED_EVENT:
                    S3ABlockOutputStream.this.statistics.blockUploadFailed(S3ABlockOutputStream.this.now() - this.transferStartTime, dataSize);
                    S3ABlockOutputStream.LOG.warn("Transfer failure of block {}", this.block);
                    break;
            }
            if (this.nextListener != null) {
                this.nextListener.progressChanged(progressEvent);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/fs/s3a/S3ABlockOutputStream$MultiPartUpload.class */
    public class MultiPartUpload {
        private final String uploadId;
        private final List<ListenableFuture<PartETag>> partETagsFutures = new ArrayList(2);

        MultiPartUpload() throws IOException {
            this.uploadId = S3ABlockOutputStream.this.writeOperationHelper.initiateMultiPartUpload();
            S3ABlockOutputStream.LOG.debug("Initiated multi-part upload for {} with id '{}'", S3ABlockOutputStream.this.writeOperationHelper, this.uploadId);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void uploadBlockAsync(final S3ADataBlocks.DataBlock dataBlock) throws IOException {
            S3ABlockOutputStream.LOG.debug("Queueing upload of {}", dataBlock);
            int dataSize = dataBlock.dataSize();
            final S3ADataBlocks.BlockUploadData startUpload = dataBlock.startUpload();
            final int size = this.partETagsFutures.size() + 1;
            final UploadPartRequest newUploadPartRequest = S3ABlockOutputStream.this.writeOperationHelper.newUploadPartRequest(this.uploadId, size, dataSize, startUpload.getUploadStream(), startUpload.getFile());
            newUploadPartRequest.setGeneralProgressListener(new BlockUploadProgress(dataBlock, S3ABlockOutputStream.this.progressListener, S3ABlockOutputStream.this.now()));
            S3ABlockOutputStream.this.statistics.blockUploadQueued(dataBlock.dataSize());
            this.partETagsFutures.add(S3ABlockOutputStream.this.executorService.submit(new Callable<PartETag>() { // from class: org.apache.hadoop.fs.s3a.S3ABlockOutputStream.MultiPartUpload.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public PartETag call() throws Exception {
                    S3ABlockOutputStream.LOG.debug("Uploading part {} for id '{}'", Integer.valueOf(size), MultiPartUpload.this.uploadId);
                    try {
                        PartETag partETag = S3ABlockOutputStream.this.fs.uploadPart(newUploadPartRequest).getPartETag();
                        S3ABlockOutputStream.LOG.debug("Completed upload of {} to part {}", dataBlock, partETag.getETag());
                        S3ABlockOutputStream.LOG.debug("Stream statistics of {}", S3ABlockOutputStream.this.statistics);
                        S3AUtils.closeAll(S3ABlockOutputStream.LOG, startUpload, dataBlock);
                        return partETag;
                    } catch (Throwable th) {
                        S3AUtils.closeAll(S3ABlockOutputStream.LOG, startUpload, dataBlock);
                        throw th;
                    }
                }
            }));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<PartETag> waitForAllPartUploads() throws IOException {
            S3ABlockOutputStream.LOG.debug("Waiting for {} uploads to complete", Integer.valueOf(this.partETagsFutures.size()));
            try {
                return (List) Futures.allAsList(this.partETagsFutures).get();
            } catch (InterruptedException e) {
                S3ABlockOutputStream.LOG.warn("Interrupted partUpload", (Throwable) e);
                Thread.currentThread().interrupt();
                return null;
            } catch (ExecutionException e2) {
                S3ABlockOutputStream.LOG.debug("While waiting for upload completion", (Throwable) e2);
                S3ABlockOutputStream.LOG.debug("Cancelling futures");
                Iterator<ListenableFuture<PartETag>> it = this.partETagsFutures.iterator();
                while (it.hasNext()) {
                    it.next().cancel(true);
                }
                abort();
                throw S3AUtils.extractException("Multi-part upload with id '" + this.uploadId + "' to " + S3ABlockOutputStream.this.key, S3ABlockOutputStream.this.key, e2);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompleteMultipartUploadResult complete(List<PartETag> list) throws IOException {
            int i;
            int i2 = 0;
            String format = String.format("Completing multi-part upload for key '%s', id '%s' with %s partitions ", S3ABlockOutputStream.this.key, this.uploadId, Integer.valueOf(list.size()));
            do {
                try {
                    S3ABlockOutputStream.LOG.debug(format);
                    return S3ABlockOutputStream.this.writeOperationHelper.completeMultipartUpload(this.uploadId, list);
                } catch (AmazonClientException e) {
                    S3ABlockOutputStream.this.statistics.exceptionInMultipartComplete();
                    i = i2;
                    i2++;
                }
            } while (shouldRetry(format, e, i));
            throw S3AUtils.translateException(format, S3ABlockOutputStream.this.key, e);
        }

        public void abort() {
            int i;
            int i2 = 0;
            S3ABlockOutputStream.this.fs.incrementStatistic(Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED);
            String format = String.format("Aborting multi-part upload for '%s', id '%s", S3ABlockOutputStream.this.writeOperationHelper, this.uploadId);
            do {
                try {
                    S3ABlockOutputStream.LOG.debug(format);
                    S3ABlockOutputStream.this.writeOperationHelper.abortMultipartUpload(this.uploadId);
                    return;
                } catch (AmazonClientException e) {
                    S3ABlockOutputStream.this.statistics.exceptionInMultipartAbort();
                    i = i2;
                    i2++;
                }
            } while (shouldRetry(format, e, i));
            S3ABlockOutputStream.LOG.warn("Unable to abort multipart upload, you may need to purge  uploaded parts", (Throwable) e);
        }

        private boolean shouldRetry(String str, AmazonClientException amazonClientException, int i) {
            try {
                RetryPolicy.RetryAction shouldRetry = S3ABlockOutputStream.this.retryPolicy.shouldRetry(amazonClientException, i, 0, true);
                boolean z = shouldRetry == RetryPolicy.RetryAction.RETRY;
                if (z) {
                    S3ABlockOutputStream.this.fs.incrementStatistic(Statistic.IGNORED_ERRORS);
                    S3ABlockOutputStream.LOG.info("Retrying {} after exception ", str, amazonClientException);
                    Thread.sleep(shouldRetry.delayMillis);
                }
                return z;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            } catch (Exception e2) {
                return false;
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/fs/s3a/S3ABlockOutputStream$ProgressableListener.class */
    private static class ProgressableListener implements ProgressListener {
        private final Progressable progress;

        public ProgressableListener(Progressable progressable) {
            this.progress = progressable;
        }

        @Override // com.amazonaws.event.ProgressListener
        public void progressChanged(ProgressEvent progressEvent) {
            if (this.progress != null) {
                this.progress.progress();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public S3ABlockOutputStream(S3AFileSystem s3AFileSystem, String str, ExecutorService executorService, Progressable progressable, long j, S3ADataBlocks.BlockFactory blockFactory, S3AInstrumentation.OutputStreamStatistics outputStreamStatistics, S3AFileSystem.WriteOperationHelper writeOperationHelper) throws IOException {
        this.fs = s3AFileSystem;
        this.key = str;
        this.blockFactory = blockFactory;
        this.blockSize = (int) j;
        this.statistics = outputStreamStatistics;
        this.writeOperationHelper = writeOperationHelper;
        Preconditions.checkArgument(j >= 5242880, "Block size is too small: %d", new Object[]{Long.valueOf(j)});
        this.executorService = MoreExecutors.listeningDecorator(executorService);
        this.multiPartUpload = null;
        this.progressListener = progressable instanceof ProgressListener ? (ProgressListener) progressable : new ProgressableListener(progressable);
        createBlockIfNeeded();
        LOG.debug("Initialized S3ABlockOutputStream for {} output to {}", writeOperationHelper, this.activeBlock);
    }

    private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() throws IOException {
        if (this.activeBlock == null) {
            this.blockCount++;
            if (this.blockCount >= 10000) {
                LOG.error("Number of partitions in stream exceeds limit for S3: 10000 write may fail.");
            }
            this.activeBlock = this.blockFactory.create(this.blockCount, this.blockSize, this.statistics);
        }
        return this.activeBlock;
    }

    private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
        return this.activeBlock;
    }

    private synchronized boolean hasActiveBlock() {
        return this.activeBlock != null;
    }

    private void clearActiveBlock() {
        if (this.activeBlock != null) {
            LOG.debug("Clearing active block");
        }
        synchronized (this) {
            this.activeBlock = null;
        }
    }

    void checkOpen() throws IOException {
        if (this.closed.get()) {
            throw new IOException("Filesystem " + this.writeOperationHelper + " closed");
        }
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public synchronized void flush() throws IOException {
        checkOpen();
        S3ADataBlocks.DataBlock activeBlock = getActiveBlock();
        if (activeBlock != null) {
            activeBlock.flush();
        }
    }

    @Override // java.io.OutputStream
    public synchronized void write(int i) throws IOException {
        this.singleCharWrite[0] = (byte) i;
        write(this.singleCharWrite, 0, 1);
    }

    @Override // java.io.OutputStream
    public synchronized void write(byte[] bArr, int i, int i2) throws IOException {
        S3ADataBlocks.validateWriteArgs(bArr, i, i2);
        checkOpen();
        if (i2 == 0) {
            return;
        }
        S3ADataBlocks.DataBlock createBlockIfNeeded = createBlockIfNeeded();
        int write = createBlockIfNeeded.write(bArr, i, i2);
        int remainingCapacity = createBlockIfNeeded.remainingCapacity();
        if (write < i2) {
            LOG.debug("writing more data than block has capacity -triggering upload");
            uploadCurrentBlock();
            write(bArr, i + write, i2 - write);
        } else if (remainingCapacity == 0) {
            uploadCurrentBlock();
        }
    }

    private synchronized void uploadCurrentBlock() throws IOException {
        Preconditions.checkState(hasActiveBlock(), "No active block");
        LOG.debug("Writing block # {}", Long.valueOf(this.blockCount));
        if (this.multiPartUpload == null) {
            LOG.debug("Initiating Multipart upload");
            this.multiPartUpload = new MultiPartUpload();
        }
        try {
            this.multiPartUpload.uploadBlockAsync(getActiveBlock());
        } finally {
            clearActiveBlock();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            LOG.debug("Ignoring close() as stream is already closed");
            return;
        }
        S3ADataBlocks.DataBlock activeBlock = getActiveBlock();
        boolean hasActiveBlock = hasActiveBlock();
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this;
        objArr[1] = Long.valueOf(this.blockCount);
        objArr[2] = hasActiveBlock ? activeBlock : "(none)";
        logger.debug("{}: Closing block #{}: current block= {}", objArr);
        try {
            try {
                if (this.multiPartUpload != null) {
                    if (hasActiveBlock && activeBlock.hasData()) {
                        uploadCurrentBlock();
                    }
                    this.multiPartUpload.complete(this.multiPartUpload.waitForAllPartUploads());
                } else if (hasActiveBlock) {
                    putObject();
                }
                LOG.debug("Upload complete for {}", this.writeOperationHelper);
                S3AUtils.closeAll(LOG, activeBlock, this.blockFactory);
                LOG.debug("Statistics: {}", this.statistics);
                S3AUtils.closeAll(LOG, this.statistics);
                clearActiveBlock();
                this.writeOperationHelper.writeSuccessful();
            } catch (IOException e) {
                this.writeOperationHelper.writeFailed(e);
                throw e;
            }
        } catch (Throwable th) {
            S3AUtils.closeAll(LOG, activeBlock, this.blockFactory);
            LOG.debug("Statistics: {}", this.statistics);
            S3AUtils.closeAll(LOG, this.statistics);
            clearActiveBlock();
            throw th;
        }
    }

    private void putObject() throws IOException {
        LOG.debug("Executing regular upload for {}", this.writeOperationHelper);
        final S3ADataBlocks.DataBlock activeBlock = getActiveBlock();
        int dataSize = activeBlock.dataSize();
        final S3ADataBlocks.BlockUploadData startUpload = activeBlock.startUpload();
        final PutObjectRequest newPutRequest = startUpload.hasFile() ? this.writeOperationHelper.newPutRequest(startUpload.getFile()) : this.writeOperationHelper.newPutRequest(startUpload.getUploadStream(), dataSize);
        newPutRequest.setGeneralProgressListener(new BlockUploadProgress(activeBlock, this.progressListener, now()));
        this.statistics.blockUploadQueued(dataSize);
        ListenableFuture submit = this.executorService.submit(new Callable<PutObjectResult>() { // from class: org.apache.hadoop.fs.s3a.S3ABlockOutputStream.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public PutObjectResult call() throws Exception {
                try {
                    PutObjectResult putObject = S3ABlockOutputStream.this.writeOperationHelper.putObject(newPutRequest);
                    S3AUtils.closeAll(S3ABlockOutputStream.LOG, startUpload, activeBlock);
                    return putObject;
                } catch (Throwable th) {
                    S3AUtils.closeAll(S3ABlockOutputStream.LOG, startUpload, activeBlock);
                    throw th;
                }
            }
        });
        clearActiveBlock();
        try {
            submit.get();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted object upload", (Throwable) e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            throw S3AUtils.extractException("regular upload", this.key, e2);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("S3ABlockOutputStream{");
        sb.append(this.writeOperationHelper.toString());
        sb.append(", blockSize=").append(this.blockSize);
        S3ADataBlocks.DataBlock dataBlock = this.activeBlock;
        if (dataBlock != null) {
            sb.append(", activeBlock=").append(dataBlock);
        }
        sb.append('}');
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void incrementWriteOperations() {
        this.fs.incrementWriteOperations();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long now() {
        return System.currentTimeMillis();
    }

    S3AInstrumentation.OutputStreamStatistics getStatistics() {
        return this.statistics;
    }
}
