package org.apache.hadoop.fs.obs.commit.staging;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.obs.OBSFileSystem;
import org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter;
import org.apache.hadoop.fs.obs.commit.CommitUtils;
import org.apache.hadoop.fs.obs.commit.CommitUtilsWithMR;
import org.apache.hadoop.fs.obs.commit.OBSExtendUtils;
import org.apache.hadoop.fs.s3a.commit.DurationInfo;
import org.apache.hadoop.fs.s3a.commit.Tasks;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.staging.ConflictResolution;
import org.apache.hadoop.fs.s3a.commit.staging.Paths;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/fs/obs/commit/staging/StagingCommitter.class */
public class StagingCommitter extends AbstractOBSCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(StagingCommitter.class);
    public static final String NAME = "staging";
    private final Path constructorOutputPath;
    private final long uploadPartSize;
    private final String uuid;
    private final boolean uniqueFilenames;
    private final FileOutputCommitter wrappedCommitter;
    private ConflictResolution conflictResolution;
    private String obsKeyPrefix;
    private Path commitsDirectory;

    public StagingCommitter(Path path, TaskAttemptContext taskAttemptContext) throws IOException {
        super(path, taskAttemptContext);
        this.constructorOutputPath = (Path) Preconditions.checkNotNull(getOutputPath(), "output path");
        Configuration conf = getConf();
        this.uploadPartSize = conf.getLongBytes("fs.s3a.multipart.size", 104857600L);
        this.uuid = getUploadUUID(conf, taskAttemptContext.getJobID());
        this.uniqueFilenames = conf.getBoolean("fs.s3a.committer.staging.unique-filenames", true);
        setWorkPath(buildWorkPath(taskAttemptContext, this.uuid));
        this.wrappedCommitter = createWrappedCommitter(taskAttemptContext, conf);
        setOutputPath(this.constructorOutputPath);
        Path outputPath = getOutputPath();
        Preconditions.checkNotNull(outputPath, "Output path cannot be null");
        OBSFileSystem oBSFileSystem = CommitUtils.getOBSFileSystem(outputPath, taskAttemptContext.getConfiguration());
        this.obsKeyPrefix = OBSExtendUtils.pathToKey(oBSFileSystem, outputPath);
        LOG.debug("{}: final output path is {}", getRole(), outputPath);
        LOG.debug("Conflict resolution mode: {}", getConflictResolutionMode(getJobContext(), oBSFileSystem.getConf()));
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public String getName() {
        return NAME;
    }

    protected FileOutputCommitter createWrappedCommitter(JobContext jobContext, Configuration configuration) throws IOException {
        initFileOutputCommitterOptions(jobContext);
        this.commitsDirectory = Paths.getMultipartUploadCommitsDirectory(configuration, this.uuid);
        return new FileOutputCommitter(this.commitsDirectory, jobContext);
    }

    protected void initFileOutputCommitterOptions(JobContext jobContext) {
        jobContext.getConfiguration().setInt("mapreduce.fileoutputcommitter.algorithm.version", 1);
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public String toString() {
        StringBuilder sb = new StringBuilder("StagingCommitter{");
        sb.append(super.toString());
        sb.append(", conflictResolution=").append(this.conflictResolution);
        if (this.wrappedCommitter != null) {
            sb.append(", wrappedCommitter=").append(this.wrappedCommitter);
        }
        sb.append('}');
        return sb.toString();
    }

    public static String getUploadUUID(Configuration configuration, String str) {
        return configuration.getTrimmed("fs.s3a.committer.staging.uuid", configuration.getTrimmed("spark.sql.sources.writeJobUUID", configuration.getTrimmed("spark.app.id", str)));
    }

    public static String getUploadUUID(Configuration configuration, JobID jobID) {
        return getUploadUUID(configuration, jobID.toString());
    }

    private static Path buildWorkPath(JobContext jobContext, String str) throws IOException {
        if (jobContext instanceof TaskAttemptContext) {
            return taskAttemptWorkingPath((TaskAttemptContext) jobContext, str);
        }
        return null;
    }

    public Boolean useUniqueFilenames() {
        return Boolean.valueOf(this.uniqueFilenames);
    }

    public FileSystem getJobAttemptFileSystem(JobContext jobContext) throws IOException {
        return getJobAttemptPath(jobContext).getFileSystem(jobContext.getConfiguration());
    }

    public static Path getJobAttemptPath(JobContext jobContext, Path path) {
        return getJobAttemptPath(CommitUtilsWithMR.getAppAttemptId(jobContext), path);
    }

    private static Path getJobAttemptPath(int i, Path path) {
        return new Path(getPendingJobAttemptsPath(path), String.valueOf(i));
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    protected Path getJobAttemptPath(int i) {
        return new Path(getPendingJobAttemptsPath(this.commitsDirectory), String.valueOf(i));
    }

    private static Path getPendingTaskAttemptsPath(JobContext jobContext, Path path) {
        return new Path(getJobAttemptPath(jobContext, path), "_temporary");
    }

    public static Path getTaskAttemptPath(TaskAttemptContext taskAttemptContext, Path path) {
        return new Path(getPendingTaskAttemptsPath(taskAttemptContext, path), String.valueOf(taskAttemptContext.getTaskAttemptID()));
    }

    private static Path getPendingJobAttemptsPath(Path path) {
        Preconditions.checkNotNull(path, "Null 'out' path");
        return new Path(path, "_temporary");
    }

    public Path getCommittedTaskPath(TaskAttemptContext taskAttemptContext) {
        return getCommittedTaskPath(CommitUtilsWithMR.getAppAttemptId(taskAttemptContext), taskAttemptContext);
    }

    private static void validateContext(TaskAttemptContext taskAttemptContext) {
        Preconditions.checkNotNull(taskAttemptContext, "null context");
        Preconditions.checkNotNull(taskAttemptContext.getTaskAttemptID(), "null task attempt ID");
        Preconditions.checkNotNull(taskAttemptContext.getTaskAttemptID().getTaskID(), "null task ID");
        Preconditions.checkNotNull(taskAttemptContext.getTaskAttemptID().getJobID(), "null job ID");
    }

    protected Path getCommittedTaskPath(int i, TaskAttemptContext taskAttemptContext) {
        validateContext(taskAttemptContext);
        return new Path(getJobAttemptPath(i), String.valueOf(taskAttemptContext.getTaskAttemptID().getTaskID()));
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public Path getTempTaskAttemptPath(TaskAttemptContext taskAttemptContext) {
        throw new UnsupportedOperationException("Unimplemented");
    }

    protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext taskAttemptContext) throws IOException {
        Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
        Preconditions.checkNotNull(taskAttemptPath, "No attemptPath path in {}", new Object[]{this});
        LOG.debug("Scanning {} for files to commit", taskAttemptPath);
        return OBSExtendUtils.listAndFilter(getTaskAttemptFilesystem(taskAttemptContext), taskAttemptPath, true, OBSExtendUtils.HIDDEN_FILE_FILTER);
    }

    protected String getFinalKey(String str, JobContext jobContext) {
        return this.uniqueFilenames ? getOBSPrefix(jobContext) + "/" + Paths.addUUID(str, this.uuid) : getOBSPrefix(jobContext) + "/" + str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Path getFinalPath(String str, JobContext jobContext) throws IOException {
        return OBSExtendUtils.keyToQualifiedPath(getDestOBSFS(), getFinalKey(str, jobContext));
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public Path getBaseTaskAttemptPath(TaskAttemptContext taskAttemptContext) {
        return getWorkPath();
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public Path getJobAttemptPath(JobContext jobContext) {
        return this.wrappedCommitter.getJobAttemptPath(jobContext);
    }

    public void setupJob(JobContext jobContext) throws IOException {
        LOG.debug("{}, Setting up job {}", getRole(), CommitUtilsWithMR.jobIdString(jobContext));
        jobContext.getConfiguration().set("fs.s3a.committer.staging.uuid", this.uuid);
        this.wrappedCommitter.setupJob(jobContext);
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    protected List<SinglePendingCommit> listPendingUploadsToCommit(JobContext jobContext) throws IOException {
        return listPendingUploads(jobContext, false);
    }

    protected List<SinglePendingCommit> listPendingUploadsToAbort(JobContext jobContext) throws IOException {
        return listPendingUploads(jobContext, true);
    }

    protected List<SinglePendingCommit> listPendingUploads(JobContext jobContext, boolean z) throws IOException {
        try {
            Path jobAttemptPath = this.wrappedCommitter.getJobAttemptPath(jobContext);
            FileSystem fileSystem = jobAttemptPath.getFileSystem(jobContext.getConfiguration());
            return loadPendingsetFiles(jobContext, z, fileSystem, OBSExtendUtils.listAndFilter(fileSystem, jobAttemptPath, false, OBSExtendUtils.HIDDEN_FILE_FILTER));
        } catch (IOException e) {
            maybeIgnore(z, "Listing pending uploads", e);
            return new ArrayList(0);
        }
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public void cleanupStagingDirs() {
        Path workPath = getWorkPath();
        if (workPath != null) {
            LOG.debug("Cleaning up work path {}", workPath);
            try {
                OBSExtendUtils.deleteQuietly(workPath.getFileSystem(getConf()), workPath, true);
            } catch (IOException e) {
                LOG.info("Cleaning up work path: {} failed", workPath.toString(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public void cleanup(JobContext jobContext, boolean z) throws IOException {
        try {
            this.wrappedCommitter.cleanupJob(jobContext);
            deleteDestinationPaths(jobContext);
        } catch (IOException e) {
            if (!z) {
                throw e;
            }
            LOG.info("clean up failed", e);
        }
        super.cleanup(jobContext, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public void abortPendingUploadsInCleanup(boolean z) throws IOException {
        if (getConf().getBoolean("fs.s3a.committer.staging.abort.pending.uploads", true)) {
            super.abortPendingUploadsInCleanup(z);
        } else {
            LOG.info("Not cleanup up pending uploads to {} as {} is false ", getOutputPath(), "fs.s3a.committer.staging.abort.pending.uploads");
        }
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    protected void preCommitJob(JobContext jobContext, List<SinglePendingCommit> list) throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public void abortJobInternal(JobContext jobContext, boolean z) throws IOException {
        try {
            try {
                DurationInfo durationInfo = new DurationInfo(LOG, "%s: aborting job in state %s ", new Object[]{getRole(), CommitUtilsWithMR.jobIdString(jobContext)});
                Throwable th = null;
                try {
                    try {
                        abortPendingUploads(jobContext, listPendingUploadsToAbort(jobContext), z);
                        if (durationInfo != null) {
                            if (0 != 0) {
                                try {
                                    durationInfo.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                durationInfo.close();
                            }
                        }
                        super.abortJobInternal(jobContext, 0 != 0 || z);
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (durationInfo != null) {
                        if (th != null) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                super.abortJobInternal(jobContext, 0 != 0 || z);
                throw th6;
            }
        } catch (FileNotFoundException e) {
            LOG.debug("No job directory to read uploads from");
            super.abortJobInternal(jobContext, 0 != 0 || z);
        } catch (IOException e2) {
            maybeIgnore(z, "aborting job", e2);
            super.abortJobInternal(jobContext, 1 != 0 || z);
        }
    }

    protected void deleteDestinationPaths(JobContext jobContext) throws IOException {
        OBSExtendUtils.deleteWithWarning(getJobAttemptFileSystem(jobContext), getJobAttemptPath(jobContext), true);
        OBSExtendUtils.deleteWithWarning(getDestFS(), new Path(getOutputPath(), "_temporary"), true);
        deleteTaskWorkingPathQuietly(jobContext);
    }

    @Override // org.apache.hadoop.fs.obs.commit.AbstractOBSCommitter
    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
        Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
        DurationInfo durationInfo = new DurationInfo(LOG, "%s: setup task attempt path %s ", new Object[]{getRole(), taskAttemptPath});
        Throwable th = null;
        try {
            try {
                taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath);
                this.wrappedCommitter.setupTask(taskAttemptContext);
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "%s: needsTaskCommit() Task %s", new Object[]{getRole(), taskAttemptContext.getTaskAttemptID()});
            Throwable th = null;
            try {
                try {
                    Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
                    FileStatus[] listStatus = getTaskAttemptFilesystem(taskAttemptContext).listStatus(taskAttemptPath);
                    LOG.debug("{} files to commit under {}", Integer.valueOf(listStatus.length), taskAttemptPath);
                    boolean z = listStatus.length > 0;
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                    return z;
                } finally {
                }
            } finally {
            }
        } catch (FileNotFoundException e) {
            LOG.info("No files to commit");
            throw e;
        }
    }

    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "%s: commit task %s", new Object[]{getRole(), taskAttemptContext.getTaskAttemptID()});
            Throwable th = null;
            try {
                try {
                    LOG.info("{}: upload file count: {}", getRole(), Integer.valueOf(commitTaskInternal(taskAttemptContext, getTaskOutput(taskAttemptContext))));
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("{}: commit of task {} failed", new Object[]{getRole(), taskAttemptContext.getTaskAttemptID(), e});
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int commitTaskInternal(TaskAttemptContext taskAttemptContext, List<? extends FileStatus> list) throws IOException {
        LOG.info("{}: commitTaskInternal", getRole());
        Configuration configuration = taskAttemptContext.getConfiguration();
        Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
        FileSystem taskAttemptFilesystem = getTaskAttemptFilesystem(taskAttemptContext);
        LOG.info("{}: attempt path is {}", getRole(), taskAttemptPath);
        Path taskAttemptPath2 = this.wrappedCommitter.getTaskAttemptPath(taskAttemptContext);
        FileSystem fileSystem = taskAttemptPath2.getFileSystem(configuration);
        int size = list.size();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        LOG.info("{}: uploading from staging directory to S3", getRole());
        LOG.info("{}: Saving pending data information to {}", getRole(), taskAttemptPath2);
        if (list.isEmpty()) {
            LOG.warn("{}: No files to commit", getRole());
        } else {
            boolean z = true;
            taskAttemptContext.progress();
            PendingSet pendingSet = new PendingSet(size);
            try {
                Tasks.foreach(list).stopOnFailure().executeWith(buildThreadPool(taskAttemptContext)).run(fileStatus -> {
                    Path path = fileStatus.getPath();
                    File file = new File(path.toUri().getPath());
                    String relativePath = Paths.getRelativePath(taskAttemptPath, path);
                    String partition = Paths.getPartition(relativePath);
                    SinglePendingCommit uploadFileToPendingCommit = getCommitOperations().uploadFileToPendingCommit(file, OBSExtendUtils.keyToQualifiedPath(getDestOBSFS(), getFinalKey(relativePath, taskAttemptContext)), partition, this.uploadPartSize);
                    LOG.info("{}: adding pending commit {}", getRole(), uploadFileToPendingCommit);
                    concurrentLinkedQueue.add(uploadFileToPendingCommit);
                });
                Iterator it = concurrentLinkedQueue.iterator();
                while (it.hasNext()) {
                    pendingSet.add((SinglePendingCommit) it.next());
                }
                LOG.info("Saving {} pending commit(s)) to file {}", Integer.valueOf(pendingSet.size()), taskAttemptPath2);
                pendingSet.save(fileSystem, taskAttemptPath2, false);
                z = false;
                if (0 != 0) {
                    LOG.error("{}: Exception during commit process, aborting {} commit(s)", getRole(), Integer.valueOf(concurrentLinkedQueue.size()));
                    Tasks.foreach(concurrentLinkedQueue).suppressExceptions().run(singlePendingCommit -> {
                        getCommitOperations().abortSingleCommit(singlePendingCommit);
                    });
                    deleteTaskAttemptPathQuietly(taskAttemptContext);
                }
                Paths.clearTempFolderInfo(taskAttemptContext.getTaskAttemptID());
            } catch (Throwable th) {
                if (z) {
                    LOG.error("{}: Exception during commit process, aborting {} commit(s)", getRole(), Integer.valueOf(concurrentLinkedQueue.size()));
                    Tasks.foreach(concurrentLinkedQueue).suppressExceptions().run(singlePendingCommit2 -> {
                        getCommitOperations().abortSingleCommit(singlePendingCommit2);
                    });
                    deleteTaskAttemptPathQuietly(taskAttemptContext);
                }
                throw th;
            }
        }
        LOG.info("Committing wrapped task");
        this.wrappedCommitter.commitTask(taskAttemptContext);
        LOG.info("Cleaning up attempt dir {}", taskAttemptPath);
        taskAttemptFilesystem.delete(taskAttemptPath, true);
        return concurrentLinkedQueue.size();
    }

    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "Abort task %s", new Object[]{taskAttemptContext.getTaskAttemptID()});
            Throwable th = null;
            try {
                try {
                    deleteTaskAttemptPathQuietly(taskAttemptContext);
                    deleteTaskWorkingPathQuietly(taskAttemptContext);
                    this.wrappedCommitter.abortTask(taskAttemptContext);
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("{}: exception when aborting task {}", new Object[]{getRole(), taskAttemptContext.getTaskAttemptID(), e});
            throw e;
        }
    }

    private static Path taskAttemptWorkingPath(TaskAttemptContext taskAttemptContext, String str) throws IOException {
        return getTaskAttemptPath(taskAttemptContext, Paths.getLocalTaskAttemptTempDir(taskAttemptContext.getConfiguration(), str, taskAttemptContext.getTaskAttemptID()));
    }

    protected void deleteTaskWorkingPathQuietly(JobContext jobContext) {
        Path path = null;
        try {
            path = buildWorkPath(jobContext, getUUID());
            if (path != null) {
                OBSExtendUtils.deleteQuietly(path.getFileSystem(getConf()), path, true);
            }
        } catch (IOException e) {
            if (path != null) {
                LOG.info("delete task working path:" + path.toString() + " failed", e);
            }
        }
    }

    private String getOBSPrefix(JobContext jobContext) {
        return this.obsKeyPrefix;
    }

    protected String getUUID() {
        return this.uuid;
    }

    public final ConflictResolution getConflictResolutionMode(JobContext jobContext, Configuration configuration) {
        if (this.conflictResolution == null) {
            this.conflictResolution = ConflictResolution.valueOf(getConfictModeOption(jobContext, configuration));
        }
        return this.conflictResolution;
    }

    public static String getConfictModeOption(JobContext jobContext, Configuration configuration) {
        return CommitUtilsWithMR.getConfigurationOption(jobContext, configuration, "fs.s3a.committer.staging.conflict-mode", "fail").toUpperCase(Locale.ENGLISH);
    }
}
