package org.apache.flink.yarn;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YarnApplicationFileUploader.class */
class YarnApplicationFileUploader implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationFileUploader.class);
    private final FileSystem fileSystem;
    private final ApplicationId applicationId;
    private final Path homeDir;
    private final Path applicationDir;
    private final Map<String, FileStatus> providedSharedLibs;
    private final int fileReplication;
    private YarnLocalResourceDescriptor flinkDist;
    private final Map<String, LocalResource> localResources = new HashMap();
    private final List<Path> remotePaths = new ArrayList();
    private final List<YarnLocalResourceDescriptor> envShipResourceList = new ArrayList();

    private YarnApplicationFileUploader(FileSystem fileSystem, Path path, List<Path> list, ApplicationId applicationId, int i) throws IOException {
        this.fileSystem = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.homeDir = (Path) Preconditions.checkNotNull(path);
        this.applicationId = (ApplicationId) Preconditions.checkNotNull(applicationId);
        this.applicationDir = getApplicationDir(applicationId);
        this.providedSharedLibs = getAllFilesInProvidedLibDirs(list);
        Preconditions.checkArgument(i >= 1);
        this.fileReplication = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, LocalResource> getRegisteredLocalResources() {
        return this.localResources;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<Path> getRemotePaths() {
        return this.remotePaths;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<YarnLocalResourceDescriptor> getEnvShipResourceList() {
        return this.envShipResourceList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Path getHomeDir() {
        return this.homeDir;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Path getApplicationDir() {
        return this.applicationDir;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        IOUtils.closeQuietly(this.fileSystem);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public YarnLocalResourceDescriptor registerSingleLocalResource(String str, Path path, String str2, LocalResourceType localResourceType, boolean z, boolean z2) throws IOException {
        addToRemotePaths(z, path);
        if (Utils.isRemotePath(path.toString())) {
            FileStatus fileStatus = this.fileSystem.getFileStatus(path);
            LOG.debug("Using remote file {} to register local resource", fileStatus.getPath());
            YarnLocalResourceDescriptor fromFileStatus = YarnLocalResourceDescriptor.fromFileStatus(str, fileStatus, LocalResourceVisibility.APPLICATION, localResourceType);
            addToEnvShipResourceList(z2, fromFileStatus);
            this.localResources.put(str, fromFileStatus.toLocalResource());
            return fromFileStatus;
        }
        File file = new File(path.toUri().getPath());
        Tuple2<Path, Long> uploadLocalFileToRemote = uploadLocalFileToRemote(path, str2);
        YarnLocalResourceDescriptor yarnLocalResourceDescriptor = new YarnLocalResourceDescriptor(str, (Path) uploadLocalFileToRemote.f0, file.length(), ((Long) uploadLocalFileToRemote.f1).longValue(), LocalResourceVisibility.APPLICATION, localResourceType);
        addToEnvShipResourceList(z2, yarnLocalResourceDescriptor);
        this.localResources.put(str, yarnLocalResourceDescriptor.toLocalResource());
        return yarnLocalResourceDescriptor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Tuple2<Path, Long> uploadLocalFileToRemote(Path path, String str) throws IOException {
        File file = new File(path.toUri().getPath());
        Preconditions.checkArgument(!file.isDirectory(), "File to copy cannot be a directory: " + path);
        Path copyToRemoteApplicationDir = copyToRemoteApplicationDir(path, str, this.fileReplication);
        FileStatus[] waitForTransferToComplete = waitForTransferToComplete(copyToRemoteApplicationDir);
        if (waitForTransferToComplete == null || waitForTransferToComplete.length <= 0) {
            LOG.debug("Failed to fetch remote modification time from {}, using local timestamp {}", copyToRemoteApplicationDir, Long.valueOf(file.lastModified()));
            return Tuple2.of(copyToRemoteApplicationDir, Long.valueOf(file.lastModified()));
        }
        LOG.debug("Got modification time {} from remote path {}", Long.valueOf(waitForTransferToComplete[0].getModificationTime()), copyToRemoteApplicationDir);
        return Tuple2.of(copyToRemoteApplicationDir, Long.valueOf(waitForTransferToComplete[0].getModificationTime()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<String> registerMultipleLocalResources(Collection<Path> collection, final String str, LocalResourceType localResourceType) throws IOException {
        final ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        for (Path path : collection) {
            if (!Utils.isRemotePath(path.toString())) {
                File file = new File(path.toUri().getPath());
                if (file.isDirectory()) {
                    java.nio.file.Path path2 = file.toPath();
                    final java.nio.file.Path parent = path2.getParent();
                    Files.walkFileTree(path2, new SimpleFileVisitor<java.nio.file.Path>() { // from class: org.apache.flink.yarn.YarnApplicationFileUploader.1
                        @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                        public FileVisitResult visitFile(java.nio.file.Path path3, BasicFileAttributes basicFileAttributes) {
                            arrayList.add(new Path(path3.toUri()));
                            arrayList2.add(new Path(str, parent.relativize(path3).toString()));
                            return FileVisitResult.CONTINUE;
                        }
                    });
                } else {
                    arrayList.add(path);
                    arrayList2.add(new Path(str, path.getName()));
                }
            } else if (this.fileSystem.isDirectory(path)) {
                URI uri = path.getParent().toUri();
                RemoteIterator listFiles = this.fileSystem.listFiles(path, true);
                while (listFiles.hasNext()) {
                    Path path3 = ((LocatedFileStatus) listFiles.next()).getPath();
                    arrayList.add(path3);
                    arrayList2.add(new Path(str, uri.relativize(path3.toUri()).getPath()));
                }
            } else {
                arrayList.add(path);
                arrayList2.add(new Path(str, path.getName()));
            }
        }
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (int i = 0; i < arrayList.size(); i++) {
            Path path4 = (Path) arrayList.get(i);
            Path path5 = (Path) arrayList2.get(i);
            if (!isFlinkDistJar(path5.getName())) {
                String path6 = path5.toString();
                if (!registerSingleLocalResource(path6, path4, path5.getParent().toString(), localResourceType, true, true).alreadyRegisteredAsLocalResource()) {
                    if (path6.endsWith("jar")) {
                        hashSet.add(path5.toString());
                    } else {
                        hashSet2.add(path5.getParent().toString());
                    }
                }
            }
        }
        ArrayList arrayList3 = new ArrayList();
        Stream sorted = hashSet2.stream().sorted();
        arrayList3.getClass();
        sorted.forEach((v1) -> {
            r1.add(v1);
        });
        Stream sorted2 = hashSet.stream().sorted();
        arrayList3.getClass();
        sorted2.forEach((v1) -> {
            r1.add(v1);
        });
        return arrayList3;
    }

    public YarnLocalResourceDescriptor uploadFlinkDist(Path path) throws IOException, ClusterDeploymentException {
        if (this.flinkDist != null) {
            return this.flinkDist;
        }
        if (!this.providedSharedLibs.isEmpty()) {
            throw new ClusterDeploymentException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" has to also include the lib/, plugin/ and flink-dist jar. In other case, it cannot be used.");
        }
        this.flinkDist = registerSingleLocalResource(path.getName(), path, "", LocalResourceType.FILE, true, false);
        return this.flinkDist;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<String> registerProvidedLocalResources() {
        Preconditions.checkNotNull(this.localResources);
        ArrayList arrayList = new ArrayList();
        this.providedSharedLibs.forEach((str, fileStatus) -> {
            Path path = fileStatus.getPath();
            LOG.debug("Using remote file {} to register local resource", path);
            YarnLocalResourceDescriptor fromFileStatus = YarnLocalResourceDescriptor.fromFileStatus(str, fileStatus, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
            this.localResources.put(str, fromFileStatus.toLocalResource());
            this.remotePaths.add(path);
            this.envShipResourceList.add(fromFileStatus);
            if (!isFlinkDistJar(path.getName()) && !isPlugin(path)) {
                arrayList.add(str);
            } else if (isFlinkDistJar(path.getName())) {
                this.flinkDist = fromFileStatus;
            }
        });
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static YarnApplicationFileUploader from(FileSystem fileSystem, Path path, List<Path> list, ApplicationId applicationId, int i) throws IOException {
        return new YarnApplicationFileUploader(fileSystem, path, list, applicationId, i);
    }

    private Path copyToRemoteApplicationDir(Path path, String str, int i) throws IOException {
        Path path2 = new Path(getApplicationDirPath(this.homeDir, this.applicationId), (str.isEmpty() ? "" : str + "/") + path.getName());
        LOG.debug("Copying from {} to {} with replication factor {}", new Object[]{path, path2, Integer.valueOf(i)});
        this.fileSystem.copyFromLocalFile(false, true, path, path2);
        this.fileSystem.setReplication(path2, (short) i);
        return path2;
    }

    private FileStatus[] waitForTransferToComplete(Path path) throws IOException {
        for (int i = 1; i <= 4; i++) {
            try {
                return this.fileSystem.listStatus(path);
            } catch (FileNotFoundException e) {
                LOG.debug("Got FileNotFoundException while fetching uploaded remote resources at retry num {}", Integer.valueOf(i));
                try {
                    LOG.debug("Sleeping for {}ms", 100);
                    TimeUnit.MILLISECONDS.sleep(100L);
                } catch (InterruptedException e2) {
                    LOG.warn("Failed to sleep for {}ms at retry num {} while fetching uploaded remote resources", new Object[]{100, Integer.valueOf(i), e2});
                }
            }
        }
        return null;
    }

    private static boolean isFlinkDistJar(String str) {
        return str.startsWith("flink-dist") && str.endsWith("jar");
    }

    private static boolean isPlugin(Path path) {
        Path parent = path.getParent();
        while (true) {
            Path path2 = parent;
            if (path2 == null) {
                return false;
            }
            if ("plugins".equals(path2.getName())) {
                return true;
            }
            parent = path2.getParent();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Path getApplicationDirPath(Path path, ApplicationId applicationId) {
        return new Path((Path) Preconditions.checkNotNull(path), ".flink/" + Preconditions.checkNotNull(applicationId) + '/');
    }

    private Path getApplicationDir(ApplicationId applicationId) throws IOException {
        Path applicationDirPath = getApplicationDirPath(this.homeDir, applicationId);
        if (!this.fileSystem.exists(applicationDirPath)) {
            this.fileSystem.mkdirs(applicationDirPath, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
        }
        return applicationDirPath;
    }

    private Map<String, FileStatus> getAllFilesInProvidedLibDirs(List<Path> list) {
        HashMap hashMap = new HashMap();
        ((List) Preconditions.checkNotNull(list)).forEach(FunctionUtils.uncheckedConsumer(path -> {
            if (!this.fileSystem.exists(path) || !this.fileSystem.isDirectory(path)) {
                LOG.warn("Provided lib dir {} does not exist or is not a directory. Ignoring.", path);
                return;
            }
            RemoteIterator listFiles = this.fileSystem.listFiles(path, true);
            while (listFiles.hasNext()) {
                LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                FileStatus fileStatus = (FileStatus) hashMap.put(path.getParent().toUri().relativize(locatedFileStatus.getPath().toUri()).toString(), locatedFileStatus);
                if (fileStatus != null) {
                    throw new IOException("Two files with the same filename exist in the shared libs: " + fileStatus.getPath() + " - " + locatedFileStatus.getPath() + ". Please deduplicate.");
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("The following files were found in the shared lib dir: {}", hashMap.values().stream().map(fileStatus2 -> {
                    return fileStatus2.getPath().toString();
                }).collect(Collectors.joining(", ")));
            }
        }));
        return Collections.unmodifiableMap(hashMap);
    }

    private void addToRemotePaths(boolean z, Path path) {
        if (z) {
            this.remotePaths.add(path);
        }
    }

    private void addToEnvShipResourceList(boolean z, YarnLocalResourceDescriptor yarnLocalResourceDescriptor) {
        if (z) {
            this.envShipResourceList.add(yarnLocalResourceDescriptor);
        }
    }
}
