package com.huawei.datasight.smallfs.server;

import com.huawei.datasight.smallfs.SmallFSException;
import com.huawei.datasight.smallfs.conf.SmallFSConfigKeys;
import com.huawei.datasight.smallfs.meta.FileIndexMeta;
import com.huawei.datasight.smallfs.server.FGCAuditLogger;
import com.huawei.datasight.smallfs.server.ha.FGCCheckpointException;
import com.huawei.datasight.smallfs.server.ha.FGCCheckpointService;
import com.huawei.datasight.smallfs.server.ha.FGCEditLogger;
import com.huawei.datasight.smallfs.server.ha.FGCExtensionFilter;
import com.huawei.datasight.smallfs.server.ha.FGCOperation;
import com.huawei.datasight.smallfs.server.ha.FGCServiceState;
import com.huawei.datasight.smallfs.server.ha.FGCSessionDetail;
import com.huawei.datasight.smallfs.tools.FSHelper;
import com.huawei.datasight.smallfs.tools.MapReduceUtil;
import com.huawei.datasight.smallfs.utils.ConfigUtil;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;

/* loaded from: input_file:com/huawei/datasight/smallfs/server/FileDeletedService.class */
public class FileDeletedService extends Service {
    private static final Log LOG = LogFactory.getLog(FileDeletedService.class);
    private static final String DELETE_INPUT = "_delete_input_";
    private static final short SEQFILE_REPLICATION = 10;
    private Configuration conf;
    private DistributedFileSystem fs;
    private ConfigUtil cu;
    private FGCNameSpace ns;
    private String currentJobID = null;

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileDeletedService$FileDeleteInputFormat.class */
    public static class FileDeleteInputFormat extends InputFormat<Text, FilesEntry> {
        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
            Path path = new Path(jobContext.getConfiguration().get(FileDeletedService.DELETE_INPUT));
            Text text = new Text();
            FilesEntry filesEntry = new FilesEntry();
            ArrayList arrayList = new ArrayList();
            long j = 0;
            SequenceFile.Reader reader = new SequenceFile.Reader(jobContext.getConfiguration(), new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(path)});
            Throwable th = null;
            while (reader.next(text, filesEntry)) {
                try {
                    try {
                        long position = reader.getPosition();
                        arrayList.add(new FileSplit(path, j, position - j, (String[]) null));
                        if (FileDeletedService.LOG.isDebugEnabled()) {
                            FileDeletedService.LOG.debug("Split file: " + path.toUri().getPath() + ", start: " + j + ", end: " + position);
                        }
                        j = position;
                    } catch (Throwable th2) {
                        if (reader != null) {
                            if (th != null) {
                                try {
                                    reader.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                reader.close();
                            }
                        }
                        throw th2;
                    }
                } finally {
                }
            }
            if (reader != null) {
                if (0 != 0) {
                    try {
                        reader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    reader.close();
                }
            }
            return arrayList;
        }

        public RecordReader<Text, FilesEntry> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new SequenceFileRecordReader();
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileDeletedService$FileDeleteMapper.class */
    public static class FileDeleteMapper extends Mapper<Text, FilesEntry, Object, Object> {
        private void getLogFileName(Mapper<Text, FilesEntry, Object, Object>.Context context, FGCEditLogger fGCEditLogger) {
            fGCEditLogger.setFileName(context.getConfiguration().get(MapReduceUtil.JOB_START_TIME) + "." + context.getTaskAttemptID().toString());
        }

        /* JADX WARN: Finally extract failed */
        protected void map(Text text, FilesEntry filesEntry, Mapper<Text, FilesEntry, Object, Object>.Context context) throws IOException, InterruptedException {
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());
            String jobID = context.getJobID().toString();
            String jobID2 = context.getTaskAttemptID().getJobID().toString();
            if (!jobID2.contains("local")) {
                String[] split = jobID2.split("_");
                ApplicationId newInstance = ApplicationId.newInstance(Long.parseLong(split[1]), Integer.parseInt(split[2]));
                String[] split2 = context.getConfiguration().get("yarn.nodemanager.local-dirs").split(",");
                String str = null;
                int i = 0;
                while (true) {
                    if (i >= split2.length) {
                        break;
                    }
                    String str2 = split2[i] + "/usercache/" + context.getUser() + "/appcache/" + newInstance.toString();
                    if (new File(str2).exists()) {
                        str = str2 + "/" + context.getTaskAttemptID();
                        break;
                    }
                    i++;
                }
                FileDeletedService.LOG.info("Local dir " + str);
                if (str != null) {
                    context.getConfiguration().set("fgc.editlogs.local.dir", str);
                }
            }
            FGCEditLogger fGCEditLogger = FGCEditLogger.getInstance(context.getConfiguration(), fileSystem);
            fGCEditLogger.clear(jobID);
            getLogFileName(context, fGCEditLogger);
            FileDeletedService.LOG.info("Clear subdirectory and files without extension jobID in local editlog directory");
            Path path = new Path(text.toString());
            Path path2 = new Path(path.getParent(), FSHelper.generateNewBigFileName(context));
            OutputStream outputStream = null;
            FSDataInputStream fSDataInputStream = null;
            FileDeletedService.LOG.info("Old big file: " + path + " will megred to new big file: " + path2);
            try {
                try {
                    fSDataInputStream = fileSystem.open(path);
                    outputStream = fileSystem.create(path2, true);
                    long j = 0;
                    for (FileIndexMeta fileIndexMeta : filesEntry.metas) {
                        FileDeletedService.LOG.info("Merge " + fileIndexMeta.getFilePath() + " to new big file " + path2 + ", start " + j + " length " + fileIndexMeta.getLength());
                        fSDataInputStream.seek(fileIndexMeta.getOffset());
                        IOUtils.copyBytes(fSDataInputStream, outputStream, fileIndexMeta.getLength(), false);
                        fGCEditLogger.log(fileIndexMeta.getFilePath(), new FileIndexMeta(path2.toUri().getPath(), fileIndexMeta.getLength(), j), FGCOperation.CREATE, jobID);
                        context.progress();
                        j = outputStream.getPos();
                    }
                    outputStream.hsync();
                    fGCEditLogger.sync();
                    IOUtils.closeStream(fSDataInputStream);
                    IOUtils.closeStream(outputStream);
                    FileStatus fileStatus = fileSystem.getFileStatus(path);
                    fileSystem.setOwner(path2, fileStatus.getOwner(), fileStatus.getGroup());
                } catch (RuntimeException e) {
                    FileDeletedService.LOG.error("Error in job;", e);
                    throw e;
                } catch (Exception e2) {
                    FileDeletedService.LOG.error("Error in job", e2);
                    FileDeletedService.LOG.warn("There are some error data in the big file" + path2 + ". Wait the next time to Merge");
                    throw new RuntimeException(e2);
                }
            } catch (Throwable th) {
                IOUtils.closeStream(fSDataInputStream);
                IOUtils.closeStream(outputStream);
                throw th;
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Text) obj, (FilesEntry) obj2, (Mapper<Text, FilesEntry, Object, Object>.Context) context);
        }
    }

    /* loaded from: input_file:com/huawei/datasight/smallfs/server/FileDeletedService$FilesEntry.class */
    public static class FilesEntry implements Writable {
        private List<FileIndexMeta> metas;

        public FilesEntry(List<FileIndexMeta> list) {
            this.metas = list;
        }

        public FilesEntry() {
        }

        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(this.metas.size());
            Iterator<FileIndexMeta> it = this.metas.iterator();
            while (it.hasNext()) {
                it.next().write(dataOutput);
            }
        }

        public void readFields(DataInput dataInput) throws IOException {
            int readInt = dataInput.readInt();
            this.metas = new ArrayList(readInt);
            for (int i = 0; i < readInt; i++) {
                FileIndexMeta fileIndexMeta = new FileIndexMeta();
                fileIndexMeta.readFields(dataInput);
                this.metas.add(fileIndexMeta);
            }
        }
    }

    public FileDeletedService(Configuration configuration, DistributedFileSystem distributedFileSystem, FGCNameSpace fGCNameSpace) throws IOException {
        setServiceState(ServiceState.INITIAL);
        this.conf = configuration;
        this.cu = new ConfigUtil(configuration);
        this.fs = distributedFileSystem;
        this.ns = fGCNameSpace;
    }

    @Override // com.huawei.datasight.smallfs.server.Service
    public void run() throws SmallFSException {
        FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.INFO, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_DELETE, getUgi().getUserName(), null, FileDeletedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.INIT);
        setRunApp(false);
        try {
            clear();
            String str = "" + System.currentTimeMillis();
            FGCServiceHelper.setRunnningJob(str);
            preAppStart();
            if (!isRunApp()) {
                LOG.info("No need to run delete service...");
                setServiceState(ServiceState.SUCCESS);
                FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.INFO, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_DELETE, getUgi().getUserName(), null, FileDeletedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.SUCCESS);
                DeleteLogger.instance(this.conf, this.fs).clearHis();
                LOG.info("Cleaned delete history log.");
                return;
            }
            if (this.fgcController != null && this.fgcController.getControllerState() != FGCServiceState.ACTIVE) {
                LOG.warn("Delete job incomplete as controller is not active");
                return;
            }
            String runDeleteMRJob = runDeleteMRJob(str);
            if (this.fgcController != null && this.fgcController.getControllerState() != FGCServiceState.ACTIVE) {
                LOG.warn("Delete job incomplete as controller is not active");
                return;
            }
            postAppFinish(runDeleteMRJob, str);
            FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.INFO, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_DELETE, getUgi().getUserName(), null, FileDeletedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.SUCCESS);
            setServiceState(ServiceState.SUCCESS);
            finalizeService();
        } catch (Exception e) {
            finalizeService();
            FGCAuditLogger.logAuditEvent(FGCAuditLogger.LogLevel.ERROR, FGCAuditLogger.AuditConstants.FGC_PROCESS, FGCAuditLogger.AuditConstants.SERVICE_DELETE, getUgi().getUserName(), null, FileDeletedService.class.getSimpleName(), FGCAuditLogger.AuditConstants.FAILURE);
            LOG.error(e);
            setServiceState(ServiceState.FAILED);
            throw new SmallFSException(e);
        }
    }

    private void finalizeService() {
        clear();
        ServiceJobListener serviceJobListener = getServiceJobListener();
        if (null == serviceJobListener || this.currentJobID == null) {
            return;
        }
        serviceJobListener.jobFinished(this.currentJobID);
        this.currentJobID = null;
    }

    private String runDeleteMRJob(String str) throws IOException, ClassNotFoundException, InterruptedException {
        String str2 = null;
        if (isRunApp()) {
            setServiceState(ServiceState.STARTING);
            Job job = Job.getInstance(this.conf, ServiceTypesConstants.DELETE_DESC);
            job.setJarByClass(FileMergedService.class);
            job.setInputFormatClass(FileDeleteInputFormat.class);
            job.setMapperClass(FileDeleteMapper.class);
            job.setNumReduceTasks(0);
            job.setSpeculativeExecution(false);
            MapReduceUtil.addDependencyJars(job.getConfiguration(), SmallFSConfigKeys.class);
            FileInputFormat.addInputPath(job, inputPath());
            FileOutputFormat.setOutputPath(job, outputPath());
            job.getConfiguration().set(MapReduceUtil.JOB_START_TIME, str);
            job.submit();
            LOG.info("Submit job: " + job.getJobName());
            setServiceState(ServiceState.RUNNING);
            str2 = job.getJobID().toString();
            FGCServiceHelper.setRunnningJob(str2);
            ServiceJobListener serviceJobListener = getServiceJobListener();
            if (null != serviceJobListener) {
                FGCSessionDetail fGCSessionDetail = new FGCSessionDetail();
                fGCSessionDetail.jobId = str2;
                fGCSessionDetail.jobName = ServiceTypesConstants.DELETE_DESC;
                fGCSessionDetail.jobStartTime = str;
                serviceJobListener.jobSubmitted(fGCSessionDetail);
            }
            this.currentJobID = str2;
            if (!FGCServiceHelper.waitForJobSucess(str2, this.conf, job)) {
                LOG.error("Job " + job.getJobName() + " finished with ailed");
                setServiceState(ServiceState.FAILED);
                throw new IOException("Delete Job is failed at merging new big file.");
            }
            LOG.info("Job " + job.getJobName() + " finished with success");
            setServiceState(ServiceState.SUCCESS);
        } else {
            setServiceState(ServiceState.SUCCESS);
            LOG.info("Don't need to remerge new big file!");
        }
        return str2;
    }

    private Path inputPath() {
        return new Path(this.cu.getSFSRootDir(), "delete/input");
    }

    private Path outputPath() {
        return new Path(this.cu.getSFSRootDir(), "delete/output");
    }

    private void clear() {
        try {
            this.fs.delete(inputPath(), true);
            LOG.info("Delete " + inputPath());
            this.fs.delete(outputPath(), true);
            LOG.info("Delete " + outputPath());
        } catch (IOException e) {
            LOG.error("Error deleting");
        }
    }

    private void preAppStart() throws SmallFSException {
        HashMap hashMap = new HashMap();
        LOG.info("Start to scan the delete log to get big files to be process");
        try {
            DeleteLogger.instance(this.conf, this.fs).closeWrite();
            Iterator<String> createReader = DeleteLogger.instance(this.conf, this.fs).createReader();
            while (createReader != null && createReader.hasNext()) {
                File file = new File(createReader.next());
                if (this.fs.exists(new Path(file.getPath()))) {
                    if (!new File(this.ns.table(file.getParentFile().getParentFile().getPath())).exists()) {
                        this.fs.delete(new Path(file.getPath()), true);
                    } else if (hashMap.get(file.getPath()) == null) {
                        List<FileIndexMeta> smallFileListOfBigFile = this.ns.smallFileListOfBigFile(file.getPath());
                        if (smallFileListOfBigFile == null || smallFileListOfBigFile.size() == 0) {
                            this.fs.delete(new Path(file.getPath()), false);
                        } else {
                            hashMap.put(file.getPath(), smallFileListOfBigFile);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Add file into list of to be processed: " + file.getPath());
                            }
                            setRunApp(true);
                        }
                    }
                }
            }
            LOG.info("End to scan the delete log to get big files to be process");
            writeToSeqFile(hashMap);
            LOG.info("Writed big file list to be processed into SequenceFile");
        } catch (IOException e) {
            setServiceState(ServiceState.FAILED);
            throw new SmallFSException(e.toString());
        }
    }

    private void writeToSeqFile(Map<String, List<FileIndexMeta>> map) throws IOException {
        Path path = new Path(inputPath(), DELETE_INPUT);
        this.conf.set(DELETE_INPUT, path.toUri().getPath());
        SequenceFile.Writer createWriter = SequenceFile.createWriter(this.conf, new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(FilesEntry.class), SequenceFile.Writer.replication((short) 10)});
        Throwable th = null;
        try {
            try {
                this.fs.setPermission(path, FSHelper.secureFilePermission());
                for (Map.Entry<String, List<FileIndexMeta>> entry : map.entrySet()) {
                    createWriter.append(new Text(entry.getKey()), new FilesEntry(entry.getValue()));
                    createWriter.sync();
                }
                createWriter.hsync();
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    private void postAppFinish(String str, String str2) throws SmallFSException {
        LOG.info("Start to do post process for delete opration");
        try {
            setServiceState(ServiceState.WAITING);
            if (str != null) {
                LOG.info("Start to do checkpoint for delete service");
                FGCCheckpointService.getInstance(this.conf, this.ns).doCheckpointing(str, new FGCExtensionFilter[0]);
                FGCCheckpointService.getInstance(this.conf, this.ns).doCheckpointing(str2, new FGCExtensionFilter[0]);
                LOG.info("End to do checkpoint for delete service");
                FGCServiceHelper.resetRunnningJob();
            }
            DeleteLogger.instance(this.conf, this.fs).clearHis();
            LOG.info("Cleaned delete history log.");
            LOG.info("Start to delete old big file");
            Path path = new Path(inputPath(), DELETE_INPUT);
            if (this.fs.exists(path)) {
                Text text = new Text();
                FilesEntry filesEntry = new FilesEntry();
                SequenceFile.Reader reader = new SequenceFile.Reader(this.conf, new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(path)});
                Throwable th = null;
                while (reader.next(text, filesEntry)) {
                    try {
                        try {
                            if (this.fs.delete(new Path(text.toString()), false)) {
                                LOG.info("Deleted big file: " + text.toString());
                            } else {
                                LOG.info("Failed to delete big file: " + text.toString());
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
                if (reader != null) {
                    if (0 != 0) {
                        try {
                            reader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        reader.close();
                    }
                }
            }
            LOG.info("End to delete old big file");
            FSHelper.renameFilesExtension(str2, FSHelper.getActiveEditlogExt(), this.fs, this.conf.get("fgc.editlogs.dir"));
            LOG.info("Renamed intermediate editlog for standby FGC checkpointing");
            if (str != null) {
                FSHelper.renameFilesExtension(str, FSHelper.getActiveEditlogExt(), this.fs, this.conf.get("fgc.editlogs.dir"));
                LOG.info("Renamed editlog for standby FGC checkpointing");
            }
            setServiceState(ServiceState.SUCCESS);
            LOG.info("End to do post process for delete opration");
        } catch (FGCCheckpointException | IOException e) {
            setServiceState(ServiceState.FAILED);
            throw new SmallFSException(e);
        }
    }

    @Override // com.huawei.datasight.smallfs.server.Service
    public void resume(String str, Map<String, Object> map) throws SmallFSException {
        postAppFinish(str, (String) map.get(MapReduceUtil.JOB_START_TIME));
    }
}
