package org.apache.storm.hdfs.ha.codedistributor;

import backtype.storm.codedistributor.ICodeDistributor;
import com.google.common.collect.Lists;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.class */
public class HDFSCodeDistributor implements ICodeDistributor {
    private static final Logger LOG = LoggerFactory.getLogger(HDFSCodeDistributor.class);
    private static final String HDFS_STORM_DIR = "hdfs.storm.dir";
    private FileSystem fs;
    private Path stormDir;

    public void prepare(Map map) throws Exception {
        Validate.notNull(map.get(HDFS_STORM_DIR), "you must specify hdfs.storm.dir");
        Configuration configuration = new Configuration();
        HdfsSecurityUtil.login(map, configuration);
        this.fs = FileSystem.get(configuration);
        this.stormDir = new Path(String.valueOf(map.get(HDFS_STORM_DIR)));
        if (this.fs.exists(this.stormDir)) {
            return;
        }
        this.fs.mkdirs(this.stormDir);
    }

    public File upload(String str, String str2) throws Exception {
        File file = new File(str);
        LOG.info("Copying the storm code from directory: {} to {}{}{}", file.getAbsolutePath(), this.stormDir.toString(), "/", str2);
        File[] listFiles = file.listFiles();
        Path path = new Path(this.stormDir, new Path(str2));
        this.fs.mkdirs(path);
        for (File file2 : listFiles) {
            this.fs.copyFromLocalFile(new Path(file2.getAbsolutePath()), path);
        }
        File file3 = new File(str, "storm-code-distributor.meta");
        RemoteIterator<LocatedFileStatus> listFiles2 = this.fs.listFiles(path, false);
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file3));
        while (listFiles2.hasNext()) {
            bufferedWriter.write(listFiles2.next().getPath().toString());
            bufferedWriter.newLine();
        }
        bufferedWriter.close();
        return file3;
    }

    public List<File> download(String str, File file) throws Exception {
        File parentFile = file.getParentFile();
        Iterator<String> it = IOUtils.readLines(new FileInputStream(file)).iterator();
        while (it.hasNext()) {
            this.fs.copyToLocalFile(new Path(it.next()), new Path(parentFile.getAbsolutePath()));
        }
        return Lists.newArrayList(parentFile.listFiles());
    }

    public short getReplicationCount(String str) throws IOException {
        Path path = new Path(this.stormDir, new Path(str));
        if (this.fs.exists(path)) {
            return this.fs.getFileStatus(path).getReplication();
        }
        LOG.warn("getReplicationCount called for {} but no such directory exists, returning 0", str);
        return (short) 0;
    }

    public void cleanup(String str) throws IOException {
        this.fs.delete(new Path(this.stormDir, new Path(str)), true);
    }

    public void close(Map map) {
    }
}
