package com.huawei.datasight.smallfs.server;

import com.google.common.annotations.VisibleForTesting;
import com.huawei.datasight.smallfs.SmallFSException;
import com.huawei.datasight.smallfs.SmallFSUnsupportException;
import com.huawei.datasight.smallfs.conf.SmallFSConfigKeys;
import com.huawei.datasight.smallfs.io.BytesWritable;
import com.huawei.datasight.smallfs.meta.FileIndexMeta;
import com.huawei.datasight.smallfs.server.FGCAuditLogger;
import com.huawei.datasight.smallfs.server.ha.FGCCheckpointException;
import com.huawei.datasight.smallfs.server.ha.FGCCheckpointService;
import com.huawei.datasight.smallfs.server.ha.FGCEditLog;
import com.huawei.datasight.smallfs.server.ha.FGCEditLogException;
import com.huawei.datasight.smallfs.server.ha.FGCEditLogRemoteReader;
import com.huawei.datasight.smallfs.server.ha.FGCEditLogger;
import com.huawei.datasight.smallfs.server.ha.FGCExtensionFilter;
import com.huawei.datasight.smallfs.server.ha.FGCOperation;
import com.huawei.datasight.smallfs.server.ha.FGCServiceState;
import com.huawei.datasight.smallfs.server.ha.FGCSessionDetail;
import com.huawei.datasight.smallfs.tools.FSHelper;
import com.huawei.datasight.smallfs.tools.MapReduceUtil;
import com.huawei.datasight.smallfs.tools.PathHelper;
import com.huawei.datasight.smallfs.utils.ConfigUtil;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.yarn.api.records.ApplicationId;

/* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService.class */
public class FileMergedService extends Service {
    private static final Log LOG = LogFactory.getLog(FileMergedService.class);
    private Configuration conf;
    private static final String MERGE_INPUT = "_merge_input_";
    private static final String OWNER_GROUP_SPLIT = "@@";
    private static final int MILLIS_UNIT = 1000;
    private static final short SEQFILE_REPLICATION = 10;
    private static final String JOBID = "merge_job_id";
    private DistributedFileSystem fs;
    private ConfigUtil cu;
    private FGCNameSpace ns;
    private String currentJobID = null;

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$ExceedMaxMergeFileCountException.class */
    public static class ExceedMaxMergeFileCountException extends Exception {
        private static final long serialVersionUID = -6540855722889474612L;
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$FileDistributedDeleteMapper.class */
    public static class FileDistributedDeleteMapper extends Mapper<Text, BytesWritable, Object, Object> {
        private static final Log MAP_LOG = LogFactory.getLog(FileDistributedDeleteMapper.class);
        private FileSystem fs;

        protected void setup(Mapper<Text, BytesWritable, Object, Object>.Context context) throws IOException, InterruptedException {
            super.setup(context);
            this.fs = FileSystem.get(context.getConfiguration());
        }

        protected void map(Text text, BytesWritable bytesWritable, Mapper<Text, BytesWritable, Object, Object>.Context context) throws IOException, InterruptedException {
            Path path = new Path(text.toString());
            try {
                FileStatus fileStatus = this.fs.getFileStatus(path);
                if (fileStatus != null && fileStatus.getModificationTime() < context.getConfiguration().getLong(MapReduceUtil.JOB_START_TIME, 0L) && this.fs.delete(path, false)) {
                    MAP_LOG.info("Deleted old small file: " + text.toString());
                }
            } catch (FileNotFoundException e) {
                MAP_LOG.info("File not found in hdfs: " + path.toString());
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Text) obj, (BytesWritable) obj2, (Mapper<Text, BytesWritable, Object, Object>.Context) context);
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$FileDitributedDeleteInputFormat.class */
    public static class FileDitributedDeleteInputFormat extends InputFormat<Text, BytesWritable> {
        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
            Configuration configuration = jobContext.getConfiguration();
            int i = 50000;
            try {
                i = configuration.getInt("smallfs.file.delete.concurrency", 50000);
                if (i < 1) {
                    i = 1;
                }
            } catch (NumberFormatException e) {
                FileMergedService.LOG.error("Error setting smallfs.file.delete.concurrency resetting to default value");
            }
            Path path = new Path(configuration.get(FileMergedService.MERGE_INPUT));
            Text text = new Text();
            BytesWritable bytesWritable = new BytesWritable();
            ArrayList arrayList = new ArrayList();
            long j = 0;
            int i2 = 0;
            SequenceFile.Reader reader = new SequenceFile.Reader(jobContext.getConfiguration(), new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(path)});
            Throwable th = null;
            while (reader.next(text, bytesWritable)) {
                try {
                    try {
                        i2++;
                        if (i2 >= i) {
                            long position = reader.getPosition();
                            i2 = 0;
                            arrayList.add(new FileSplit(path, j, position - j, (String[]) null));
                            if (FileMergedService.LOG.isDebugEnabled()) {
                                FileMergedService.LOG.debug("Split file: " + path.toUri().getPath() + ", start: " + j + ", end: " + position);
                            }
                            j = reader.getPosition();
                        }
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (reader != null) {
                        if (th != null) {
                            try {
                                reader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            reader.close();
                        }
                    }
                    throw th2;
                }
            }
            if (i2 > 0) {
                arrayList.add(new FileSplit(path, j, reader.getPosition() - j, (String[]) null));
            }
            if (reader != null) {
                if (0 != 0) {
                    try {
                        reader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    reader.close();
                }
            }
            return arrayList;
        }

        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new SequenceFileRecordReader();
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$FileMergeInputFormat.class */
    public static class FileMergeInputFormat extends InputFormat<Text, FilesList> {
        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
            Path path = new Path(jobContext.getConfiguration().get(FileMergedService.MERGE_INPUT));
            Text text = new Text();
            FilesList filesList = new FilesList();
            ArrayList arrayList = new ArrayList();
            long j = 0;
            SequenceFile.Reader reader = new SequenceFile.Reader(jobContext.getConfiguration(), new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(path)});
            Throwable th = null;
            while (reader.next(text, filesList)) {
                try {
                    try {
                        long position = reader.getPosition();
                        arrayList.add(new FileSplit(path, j, position - j, (String[]) null));
                        if (FileMergedService.LOG.isDebugEnabled()) {
                            FileMergedService.LOG.debug("Split file: " + path.toUri().getPath() + ", start: " + j + ", end: " + position);
                        }
                        j = position;
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (reader != null) {
                        if (th != null) {
                            try {
                                reader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            reader.close();
                        }
                    }
                    throw th2;
                }
            }
            if (reader != null) {
                if (0 != 0) {
                    try {
                        reader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    reader.close();
                }
            }
            return arrayList;
        }

        public RecordReader<Text, FilesList> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new SequenceFileRecordReader();
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$FileMergeMapper.class */
    public static class FileMergeMapper extends Mapper<Text, FilesList, Object, Object> {
        private static final int BUFFER_SIZE = 8192;

        /* JADX WARN: Finally extract failed */
        protected void map(Text text, FilesList filesList, Mapper<Text, FilesList, Object, Object>.Context context) throws IOException, InterruptedException {
            URI defaultUri = FileSystem.getDefaultUri(context.getConfiguration());
            if (!defaultUri.getScheme().equals("hdfs")) {
                FileMergedService.LOG.error("SmallFS is dependent on hdfs, but only find " + defaultUri);
                throw new SmallFSUnsupportException("SmallFS is dependent on hdfs, but only find " + defaultUri);
            }
            DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(context.getConfiguration());
            String jobID = context.getJobID().toString();
            String jobID2 = context.getTaskAttemptID().getJobID().toString();
            if (!jobID2.contains("local")) {
                String[] split = jobID2.split("_");
                ApplicationId newInstance = ApplicationId.newInstance(Long.parseLong(split[1]), Integer.parseInt(split[2]));
                String[] split2 = context.getConfiguration().get("yarn.nodemanager.local-dirs").split(",");
                String str = null;
                int i = 0;
                while (true) {
                    if (i >= split2.length) {
                        break;
                    }
                    String str2 = split2[i] + "/usercache/" + context.getUser() + "/appcache/" + newInstance.toString();
                    if (new File(str2).exists()) {
                        str = str2 + "/" + context.getTaskAttemptID();
                        break;
                    }
                    i++;
                }
                FileMergedService.LOG.info("Local dir " + str);
                if (str != null) {
                    context.getConfiguration().set("fgc.editlogs.local.dir", str);
                }
            }
            FGCEditLogger.getInstance(context.getConfiguration(), distributedFileSystem).clear(jobID);
            FileMergedService.LOG.info("Clear subdirectory and files without extension jobID in local editlog directory");
            Path path = new Path(PathHelper.parent(filesList.smallFiles[0]), ".sfs");
            FileMergedService.LOG.info("Big file parent path: " + path.toUri().getPath());
            if (!distributedFileSystem.isDirectory(path)) {
                distributedFileSystem.mkdirs(path, FSHelper.secureDirPermission());
            }
            Path path2 = new Path(path, FSHelper.generateNewBigFileName(context));
            FileMergedService.LOG.info("Big file path: " + path2.toUri().getPath());
            FSDataOutputStream fSDataOutputStream = null;
            try {
                try {
                    try {
                        fSDataOutputStream = distributedFileSystem.create(path2, true);
                        runMerge(filesList, distributedFileSystem, path2.toUri().getPath(), fSDataOutputStream, context);
                        fSDataOutputStream.hsync();
                        FGCEditLogger.getInstance(context.getConfiguration(), distributedFileSystem).sync();
                        IOUtils.closeStream(fSDataOutputStream);
                        String[] split3 = filesList.ownerGroup.split(FileMergedService.OWNER_GROUP_SPLIT);
                        distributedFileSystem.setOwner(path2, split3[0], split3[1]);
                    } catch (RuntimeException e) {
                        FileMergedService.LOG.error("Error in job", e);
                        throw e;
                    }
                } catch (Exception e2) {
                    FileMergedService.LOG.error("Error in job", e2);
                    throw new RuntimeException(e2);
                }
            } catch (Throwable th) {
                IOUtils.closeStream(fSDataOutputStream);
                throw th;
            }
        }

        private void runMerge(FilesList filesList, DistributedFileSystem distributedFileSystem, String str, FSDataOutputStream fSDataOutputStream, Mapper<Text, FilesList, Object, Object>.Context context) throws IOException, FGCEditLogException {
            byte[] bArr = new byte[BUFFER_SIZE];
            long j = 0;
            String jobID = context.getJobID().toString();
            FGCEditLogger fGCEditLogger = FGCEditLogger.getInstance(context.getConfiguration(), distributedFileSystem);
            for (String str2 : filesList.smallFiles) {
                Path path = new Path(str2);
                if (distributedFileSystem.exists(path) && distributedFileSystem.isFileClosed(path)) {
                    FSDataInputStream open = distributedFileSystem.open(path, BUFFER_SIZE);
                    int read = open.read(bArr);
                    while (true) {
                        int i = read;
                        if (i < 0) {
                            break;
                        }
                        fSDataOutputStream.write(bArr, 0, i);
                        read = open.read(bArr);
                    }
                    open.close();
                    long pos = fSDataOutputStream.getPos();
                    getLogFileName(context, fGCEditLogger);
                    fGCEditLogger.log(str2, new FileIndexMeta(str, pos - j, j), FGCOperation.CREATE, jobID);
                    context.progress();
                    FileMergedService.LOG.info("Merge " + str2 + " to big file " + str + " start " + j + " length " + (pos - j));
                    j = pos;
                }
            }
        }

        private void getLogFileName(Mapper<Text, FilesList, Object, Object>.Context context, FGCEditLogger fGCEditLogger) {
            fGCEditLogger.setFileName(context.getConfiguration().get(MapReduceUtil.JOB_START_TIME) + "." + context.getTaskAttemptID().toString());
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Text) obj, (FilesList) obj2, (Mapper<Text, FilesList, Object, Object>.Context) context);
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$FilesList.class */
    public static class FilesList implements Writable {
        private String ownerGroup;
        private String[] smallFiles;

        public FilesList(String[] strArr, String str) {
            this.smallFiles = (String[]) Arrays.copyOf(strArr, strArr.length);
            this.ownerGroup = str;
        }

        public String[] getSmallFiles() {
            return (String[]) Arrays.copyOf(this.smallFiles, this.smallFiles.length);
        }

        public FilesList() {
        }

        public void write(DataOutput dataOutput) throws IOException {
            Text.writeString(dataOutput, this.ownerGroup);
            dataOutput.writeInt(this.smallFiles.length);
            for (String str : this.smallFiles) {
                Text.writeString(dataOutput, str);
            }
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.ownerGroup = Text.readString(dataInput);
            int readInt = dataInput.readInt();
            this.smallFiles = new String[readInt];
            for (int i = 0; i < readInt; i++) {
                this.smallFiles[i] = Text.readString(dataInput);
            }
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileMergedService$MergeContext.class */
    public static class MergeContext {
        private final long sFileSizeThreshold;
        private final long mergeTimeThreshold;
        private final long curTime;
        private final long bigFileSize;
        private final long maxMergeFileCount;
        private Map<String, List<String>> ownerGroupToFile = new HashMap();
        private Map<String, Long> ownerGroupToFilesSize = new HashMap();
        private Map<String, FilesList> bigFileMetaMap = new HashMap();
        private long count = 0;
        private boolean hasMerge = false;

        public MergeContext(ConfigUtil configUtil, Configuration configuration) {
            this.sFileSizeThreshold = configUtil.getSFileSizeThreshold();
            this.mergeTimeThreshold = configUtil.getMergeTimeThreshold() * 1000;
            this.curTime = Long.parseLong(configuration.get(MapReduceUtil.JOB_START_TIME, "0"));
            this.bigFileSize = configUtil.getMergedFileSize();
            this.maxMergeFileCount = configUtil.getMaxMergeFileCount();
        }

        public boolean addMergeFile(FileStatus fileStatus) throws IOException, ExceedMaxMergeFileCountException {
            if (this.count >= this.maxMergeFileCount) {
                throw new ExceedMaxMergeFileCountException();
            }
            if (!isAccepted(fileStatus, this.sFileSizeThreshold, this.mergeTimeThreshold, this.curTime)) {
                return false;
            }
            String ownGroup = getOwnGroup(fileStatus);
            Long l = this.ownerGroupToFilesSize.get(ownGroup);
            if (l == null) {
                this.ownerGroupToFile.put(ownGroup, new ArrayList());
                l = 0L;
            }
            long longValue = l.longValue() + fileStatus.getLen();
            if (longValue < this.bigFileSize) {
                addFileToList(fileStatus, ownGroup, l);
                return true;
            }
            if (longValue == this.bigFileSize) {
                addFileToList(fileStatus, ownGroup, l);
                generateNewBigFile(ownGroup);
                this.ownerGroupToFile.get(ownGroup).clear();
                this.ownerGroupToFilesSize.put(ownGroup, 0L);
                this.hasMerge = true;
                return true;
            }
            generateNewBigFile(ownGroup);
            this.ownerGroupToFile.get(ownGroup).clear();
            this.ownerGroupToFilesSize.put(ownGroup, 0L);
            this.hasMerge = true;
            addFileToList(fileStatus, ownGroup, 0L);
            return true;
        }

        private void generateNewBigFile(String str) {
            List<String> list = this.ownerGroupToFile.get(str);
            if (list.size() > 0) {
                this.bigFileMetaMap.put(FSHelper.generateNewFileName(), new FilesList((String[]) list.toArray(new String[0]), str));
                this.count += list.size();
            }
        }

        private void addFileToList(FileStatus fileStatus, String str, Long l) {
            this.ownerGroupToFile.get(str).add(fileStatus.getPath().toUri().getPath());
            this.ownerGroupToFilesSize.put(str, Long.valueOf(l.longValue() + fileStatus.getLen()));
        }

        private boolean isAccepted(FileStatus fileStatus, long j, long j2, long j3) {
            return fileStatus.getLen() <= j && j3 - fileStatus.getModificationTime() > j2;
        }

        private String getOwnGroup(FileStatus fileStatus) {
            return fileStatus.getOwner() + FileMergedService.OWNER_GROUP_SPLIT + fileStatus.getGroup();
        }

        public long getCount() {
            return this.count;
        }

        public boolean hasMerge() {
            return this.hasMerge;
        }

        public void write(SequenceFile.Writer writer) throws IOException {
            for (Map.Entry<String, FilesList> entry : this.bigFileMetaMap.entrySet()) {
                String key = entry.getKey();
                writer.append(new Text(key), entry.getValue());
                writer.sync();
                if (FileMergedService.LOG.isDebugEnabled()) {
                    FileMergedService.LOG.debug("Schedule merge file " + key + ", contains " + this.bigFileMetaMap.get(key).getSmallFiles().length + "files");
                }
            }
        }

        public void clear() {
            this.bigFileMetaMap.clear();
            this.ownerGroupToFile.clear();
            this.ownerGroupToFilesSize.clear();
        }
    }

    @VisibleForTesting
    public DistributedFileSystem getFs() {
        return this.fs;
    }

    @VisibleForTesting
    public void setFs(DistributedFileSystem distributedFileSystem) {
        this.fs = distributedFileSystem;
    }

    public FileMergedService(Configuration configuration, DistributedFileSystem distributedFileSystem, FGCNameSpace fGCNameSpace) throws IOException {
        setServiceState(ServiceState.INITIAL);
        this.conf = configuration;
        this.cu = new ConfigUtil(configuration);
        this.fs = distributedFileSystem;
        this.ns = fGCNameSpace;
    }

    @Override // com.huawei.datasight.smallfs.server.Service
    public void run() throws SmallFSException {
        FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.INFO, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_MERGE, getUgi().getUserName(), null, FileMergedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.INIT);
        setRunApp(false);
        try {
            clear();
            cleanMarks();
            String str = "" + System.currentTimeMillis();
            preAppStart(str);
            if (!isRunApp()) {
                LOG.info("No file merged...");
                setServiceState(ServiceState.SUCCESS);
                FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.INFO, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_MERGE, getUgi().getUserName(), null, FileMergedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.SUCCESS);
                return;
            }
            if (this.fgcController != null && this.fgcController.getControllerState() != FGCServiceState.ACTIVE) {
                LOG.warn("Merge job incomplete as controller is not active");
                return;
            }
            setServiceState(ServiceState.STARTING);
            String runMergeMRJob = runMergeMRJob(str);
            mark(1, null);
            if (this.fgcController != null && this.fgcController.getControllerState() != FGCServiceState.ACTIVE) {
                LOG.warn("Merge job incomplete as controller is not active");
                return;
            }
            postAppFinish(runMergeMRJob, str);
            FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.INFO, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_MERGE, getUgi().getUserName(), null, FileMergedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.SUCCESS);
            setServiceState(ServiceState.SUCCESS);
            finalizeService();
        } catch (Exception e) {
            finalizeService();
            setServiceFailure(e);
            throw new SmallFSException(e);
        }
    }

    private void finalizeService() {
        clear();
        cleanMarks();
        ServiceJobListener serviceJobListener = getServiceJobListener();
        if (null == serviceJobListener || this.currentJobID == null) {
            return;
        }
        serviceJobListener.jobFinished(this.currentJobID);
        this.currentJobID = null;
    }

    private Path inputPath() {
        return new Path(this.cu.getSFSRootDir(), "merge/input");
    }

    private Path outputPath() {
        return new Path(this.cu.getSFSRootDir(), "merge/output");
    }

    private void setServiceFailure(Exception exc) {
        FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.ERROR, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_MERGE, getUgi().getUserName(), null, FileMergedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.FAILURE);
        setServiceState(ServiceState.FAILED);
        LOG.error(exc);
    }

    private String runMergeMRJob(String str) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(this.conf, ServiceTypesConstants.MERGE_DESC);
        job.setJarByClass(FileMergedService.class);
        job.setInputFormatClass(FileMergeInputFormat.class);
        job.setMapperClass(FileMergeMapper.class);
        job.setNumReduceTasks(0);
        job.setSpeculativeExecution(false);
        MapReduceUtil.addDependencyJars(job.getConfiguration(), SmallFSConfigKeys.class);
        FileInputFormat.addInputPath(job, inputPath());
        FileOutputFormat.setOutputPath(job, outputPath());
        LOG.info("Submit job: " + job.getJobName());
        job.getConfiguration().set(MapReduceUtil.JOB_START_TIME, str);
        job.submit();
        setServiceState(ServiceState.RUNNING);
        String jobID = job.getJobID().toString();
        FGCServiceHelper.setRunnningJob(jobID);
        ServiceJobListener serviceJobListener = getServiceJobListener();
        if (null != serviceJobListener) {
            FGCSessionDetail fGCSessionDetail = new FGCSessionDetail();
            fGCSessionDetail.jobId = jobID;
            fGCSessionDetail.jobName = ServiceTypesConstants.MERGE_DESC;
            fGCSessionDetail.jobStartTime = str;
            serviceJobListener.jobSubmitted(fGCSessionDetail);
        }
        this.currentJobID = jobID;
        if (FGCServiceHelper.waitForJobSucess(jobID, this.conf, job)) {
            LOG.info("Job " + job.getJobName() + " finished with " + FGCAuditLogger.AuditConstants.SUCCESS);
            return jobID;
        }
        LOG.error("Job " + job.getJobName() + " finished with failed");
        throw new IOException("Merge Job is failed at merging big file.");
    }

    private void clear() {
        try {
            this.fs.delete(inputPath(), true);
            LOG.info("Delete " + inputPath());
            this.fs.delete(outputPath(), true);
            LOG.info("Delete " + outputPath());
        } catch (IOException e) {
            LOG.error("Error deleting");
        }
    }

    private void preAppStart(String str) throws IOException {
        SequenceFile.Writer writer = null;
        try {
            Path path = new Path(inputPath(), MERGE_INPUT);
            this.conf.set(MERGE_INPUT, path.toUri().getPath());
            this.conf.set(MapReduceUtil.JOB_START_TIME, str);
            writer = SequenceFile.createWriter(this.conf, new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(FilesList.class), SequenceFile.Writer.replication((short) 10)});
            this.fs.setPermission(path, FSHelper.secureFilePermission());
            LOG.info("create map input file: " + path.toString());
            setRunApp(formMergeMRInput(writer));
            writer.hsync();
            IOUtils.closeStream(writer);
        } catch (Throwable th) {
            IOUtils.closeStream(writer);
            throw th;
        }
    }

    private boolean formMergeMRInput(SequenceFile.Writer writer) throws IOException {
        MergeContext mergeContext = new MergeContext(this.cu, this.conf);
        Stack<Path> stack = new Stack<>();
        for (String str : this.cu.getMonitorPath()) {
            if (!str.endsWith(".sfs")) {
                stack.push(new Path(str));
            }
        }
        LOG.info("Start to traverse the file list");
        while (!stack.empty()) {
            Path pop = stack.pop();
            if (this.fs.exists(pop)) {
                createInputFile(mergeContext, stack, pop);
                mergeContext.write(writer);
                mergeContext.clear();
            } else {
                LOG.warn("Folder " + pop.toString() + " does not exist.");
            }
        }
        LOG.info("Finish to traverse the file list, " + mergeContext.getCount() + " files need to be merged");
        return mergeContext.hasMerge();
    }

    private void createInputFile(MergeContext mergeContext, Stack<Path> stack, Path path) throws IOException {
        RemoteIterator listStatusIterator = this.fs.listStatusIterator(path);
        while (listStatusIterator.hasNext()) {
            try {
                FileStatus fileStatus = (FileStatus) listStatusIterator.next();
                if (fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(".sfs")) {
                    stack.push(fileStatus.getPath());
                } else if (fileStatus.isFile()) {
                    mergeContext.addMergeFile(fileStatus);
                }
            } catch (ExceedMaxMergeFileCountException e) {
                return;
            }
        }
    }

    private void postAppFinish(String str, String str2) throws SmallFSException {
        setServiceState(ServiceState.WAITING);
        this.conf.set(MapReduceUtil.JOB_START_TIME, str2);
        int mark = getMark();
        String markAdd = getMarkAdd(mark);
        try {
            try {
                StopWatch stopWatch = new StopWatch();
                Throwable th = null;
                try {
                    LOG.info("Start to do post process for delete opration");
                    stopWatch.reset().start();
                    LOG.info("Start to do checkpoint for merge service");
                    FGCCheckpointService.getInstance(this.conf, this.ns).doCheckpointing(str, new FGCExtensionFilter[0]);
                    FGCCheckpointService.getInstance(this.conf, this.ns).doCheckpointing(str2, new FGCExtensionFilter[0]);
                    LOG.info("End to do checkpoint for merge service");
                    FGCServiceHelper.resetRunnningJob();
                    switch (mark) {
                        case 1:
                            LOG.info("Start to distributed delete small files");
                            runDistributedDelete(str, mark);
                            LOG.info("End to distributed delete small files");
                        case 2:
                            if (null != markAdd) {
                                LOG.info("Find exist distributed delete small files job " + markAdd + " in mark file, try to get its status");
                                if (FGCServiceHelper.waitForJob(markAdd, this.conf, new Job[0])) {
                                    LOG.info("Distributed delete small files job " + str + " success");
                                } else {
                                    LOG.error("Distributed delete small files job " + str + " error");
                                    LOG.info("Distribute delete incomplete, some merged file cleared in next cleanup cycle");
                                }
                            }
                            FSHelper.renameFilesExtension(str2, FSHelper.getActiveEditlogExt(), this.fs, this.conf.get("fgc.editlogs.dir"));
                            FSHelper.renameFilesExtension(str, FSHelper.getActiveEditlogExt(), this.fs, this.conf.get("fgc.editlogs.dir"));
                            mark(3, null);
                            unmark(3);
                            LOG.info("Renamed editlog for standby FGC checkpointing");
                            break;
                    }
                    setServiceState(ServiceState.SUCCESS);
                    LOG.info("End to do post process for delete opration, time spent: " + stopWatch.now(TimeUnit.SECONDS) + " seconds");
                    if (stopWatch != null) {
                        if (0 != 0) {
                            try {
                                stopWatch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            stopWatch.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (stopWatch != null) {
                        if (0 != 0) {
                            try {
                                stopWatch.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            stopWatch.close();
                        }
                    }
                    throw th3;
                }
            } catch (FGCCheckpointException | IOException | ClassNotFoundException | InterruptedException e) {
                throw new SmallFSException(e);
            }
        } finally {
            cleanMarks();
        }
    }

    private void runDistributedDelete(String str, int i) throws IOException, InterruptedException, ClassNotFoundException {
        clear();
        this.fs.mkdirs(inputPath(), FSHelper.secureDirPermission());
        this.conf.set(MERGE_INPUT, new Path(inputPath(), MERGE_INPUT).toUri().getPath());
        this.conf.set(JOBID, str);
        LOG.info("Start create the small file delete mr job input file");
        createDistributedDeleteInput(str);
        LOG.info("End create the small file delete mr job input file");
        Job job = Job.getInstance(this.conf, ServiceTypesConstants.MERGE_DELETE_DESC);
        job.setJarByClass(FileMergedService.class);
        job.setInputFormatClass(FileDitributedDeleteInputFormat.class);
        job.setMapperClass(FileDistributedDeleteMapper.class);
        job.setNumReduceTasks(0);
        MapReduceUtil.addDependencyJars(job.getConfiguration(), SmallFSConfigKeys.class);
        FileInputFormat.addInputPath(job, inputPath());
        FileOutputFormat.setOutputPath(job, outputPath());
        LOG.info("Submit job: " + job.getJobName());
        job.submit();
        String jobID = job.getJobID().toString();
        mark(2, jobID);
        unmark(1);
        FGCServiceHelper.waitForJob(jobID, this.conf, job);
    }

    private void createDistributedDeleteInput(String str) throws IOException {
        Path path = new Path(this.conf.get(MERGE_INPUT));
        SequenceFile.Writer createWriter = SequenceFile.createWriter(this.conf, new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(BytesWritable.class), SequenceFile.Writer.replication((short) 10)});
        Throwable th = null;
        try {
            try {
                this.fs.setPermission(path, FSHelper.secureFilePermission());
                BytesWritable bytesWritable = new BytesWritable(new byte[]{0});
                FGCEditLogRemoteReader fGCEditLogRemoteReader = new FGCEditLogRemoteReader(this.conf, str, !this.cu.isHAMode(), new FGCExtensionFilter[0]);
                while (true) {
                    FGCEditLog readEditLogsFroEditFolder = fGCEditLogRemoteReader.readEditLogsFroEditFolder();
                    if (readEditLogsFroEditFolder == null) {
                        break;
                    }
                    createWriter.append(new Text(readEditLogsFroEditFolder.getSmallFilePath()), bytesWritable);
                    createWriter.sync();
                }
                if (!this.cu.isHAMode()) {
                    fGCEditLogRemoteReader.purgeAll();
                }
                createWriter.hsync();
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    @Override // com.huawei.datasight.smallfs.server.Service
    public void resume(String str, Map<String, Object> map) throws SmallFSException {
        postAppFinish(str, (String) map.get(MapReduceUtil.JOB_START_TIME));
    }

    private Path markPath() {
        return new Path(this.cu.getSFSRootDir(), "merge/mark");
    }

    /* JADX WARN: Finally extract failed */
    private void mark(int i, String str) {
        try {
            if (!this.fs.exists(markPath())) {
                this.fs.mkdirs(markPath());
            }
            if (str == null) {
                this.fs.create(new Path(markPath() + "/" + i)).close();
            } else {
                FSDataOutputStream fSDataOutputStream = null;
                try {
                    fSDataOutputStream = this.fs.create(new Path(markPath() + "/" + i));
                    fSDataOutputStream.writeUTF(str);
                    fSDataOutputStream.hsync();
                    if (fSDataOutputStream != null) {
                        fSDataOutputStream.close();
                    }
                } catch (Throwable th) {
                    if (fSDataOutputStream != null) {
                        fSDataOutputStream.close();
                    }
                    throw th;
                }
            }
        } catch (IOException e) {
            String str2 = "Failed to mark " + i;
            if (null != str) {
                str2 = str2 + " with additional info " + str;
            }
            LOG.warn(str2);
        }
    }

    private void unmark(int i) {
        try {
            if (this.fs.exists(markPath())) {
                this.fs.delete(new Path(markPath() + "/" + i), false);
            }
        } catch (IOException e) {
            LOG.warn("Failed to unmark " + new Path(markPath() + "/" + i).toString());
        }
    }

    private void cleanMarks() {
        try {
            this.fs.delete(markPath(), true);
        } catch (IOException e) {
            LOG.warn("Failed to delete " + markPath().toString());
        }
    }

    private int getMark() {
        try {
            FileStatus[] listStatus = this.fs.listStatus(markPath());
            if (listStatus == null || listStatus.length == 0) {
                return 1;
            }
            Arrays.sort(listStatus);
            return Integer.parseInt(listStatus[listStatus.length - 1].getPath().getName());
        } catch (IOException | IllegalArgumentException e) {
            return 1;
        }
    }

    private String getMarkAdd(int i) {
        FSDataInputStream fSDataInputStream = null;
        try {
            fSDataInputStream = this.fs.open(new Path(markPath() + "/" + i));
            String readUTF = fSDataInputStream.readUTF();
            IOUtils.closeStream(fSDataInputStream);
            return readUTF;
        } catch (IOException | IllegalArgumentException e) {
            IOUtils.closeStream(fSDataInputStream);
            return null;
        } catch (Throwable th) {
            IOUtils.closeStream(fSDataInputStream);
            throw th;
        }
    }
}
