package org.apache.spark.network.yarn;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
import org.p001sparkproject.guava.annotations.VisibleForTesting;
import org.p001sparkproject.guava.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/network/yarn/YarnShuffleService.class */
public class YarnShuffleService extends AuxiliaryService {
    private final Logger logger;
    private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
    private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
    private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
    private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
    private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
    private ShuffleSecretManager secretManager;
    private TransportServer shuffleServer;
    private Configuration _conf;

    @VisibleForTesting
    Path _recoveryPath;

    @VisibleForTesting
    ExternalShuffleBlockHandler blockHandler;

    @VisibleForTesting
    File registeredExecutorFile;

    @VisibleForTesting
    static int boundPort = -1;

    @VisibleForTesting
    static YarnShuffleService instance;

    public YarnShuffleService() {
        super("spark_shuffle");
        this.logger = LoggerFactory.getLogger((Class<?>) YarnShuffleService.class);
        this.shuffleServer = null;
        this._conf = null;
        this._recoveryPath = null;
        this.logger.info("Initializing YARN shuffle service for Spark");
        instance = this;
    }

    private boolean isAuthenticationEnabled() {
        return this.secretManager != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) {
        this._conf = configuration;
        this.registeredExecutorFile = new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
        TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(configuration));
        boolean z = configuration.getBoolean(SPARK_AUTHENTICATE_KEY, false);
        try {
            this.blockHandler = new ExternalShuffleBlockHandler(transportConf, this.registeredExecutorFile);
        } catch (Exception e) {
            this.logger.error("Failed to initialize external shuffle service", (Throwable) e);
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (z) {
            this.secretManager = new ShuffleSecretManager();
            newArrayList.add(new SaslServerBootstrap(transportConf, this.secretManager));
        }
        this.shuffleServer = new TransportContext(transportConf, this.blockHandler).createServer(configuration.getInt(SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT), newArrayList);
        int port = this.shuffleServer.getPort();
        boundPort = port;
        this.logger.info("Started YARN shuffle service for Spark on port {}. Authentication is {}.  Registered executor file is {}", Integer.valueOf(port), z ? HttpCrossOriginFilterInitializer.ENABLED_SUFFIX : "not enabled", this.registeredExecutorFile);
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void initializeApplication(ApplicationInitializationContext applicationInitializationContext) {
        String applicationId = applicationInitializationContext.getApplicationId().toString();
        try {
            ByteBuffer applicationDataForService = applicationInitializationContext.getApplicationDataForService();
            this.logger.info("Initializing application {}", applicationId);
            if (isAuthenticationEnabled()) {
                this.secretManager.registerApp(applicationId, applicationDataForService);
            }
        } catch (Exception e) {
            this.logger.error("Exception when initializing application {}", applicationId, e);
        }
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void stopApplication(ApplicationTerminationContext applicationTerminationContext) {
        String applicationId = applicationTerminationContext.getApplicationId().toString();
        try {
            this.logger.info("Stopping application {}", applicationId);
            if (isAuthenticationEnabled()) {
                this.secretManager.unregisterApp(applicationId);
            }
            this.blockHandler.applicationRemoved(applicationId, false);
        } catch (Exception e) {
            this.logger.error("Exception when stopping application {}", applicationId, e);
        }
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void initializeContainer(ContainerInitializationContext containerInitializationContext) {
        this.logger.info("Initializing container {}", containerInitializationContext.getContainerId());
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void stopContainer(ContainerTerminationContext containerTerminationContext) {
        this.logger.info("Stopping container {}", containerTerminationContext.getContainerId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() {
        try {
            if (this.shuffleServer != null) {
                this.shuffleServer.close();
            }
            if (this.blockHandler != null) {
                this.blockHandler.close();
            }
        } catch (Exception e) {
            this.logger.error("Exception when stopping service", (Throwable) e);
        }
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public ByteBuffer getMetaData() {
        return ByteBuffer.allocate(0);
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void setRecoveryPath(Path path) {
        this._recoveryPath = path;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public Path getRecoveryPath() {
        String[] trimmedStrings = this._conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS);
        int length = trimmedStrings.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            String str = trimmedStrings[i];
            File file = new File(new Path(str).toUri().getPath(), RECOVERY_FILE_NAME);
            if (!file.exists()) {
                i++;
            } else if (this._recoveryPath == null) {
                this._recoveryPath = new Path(str);
            } else if (!file.renameTo(new File(this._recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
                this.logger.error("Failed to move recovery file {} to the path {}", RECOVERY_FILE_NAME, this._recoveryPath.toString());
            }
        }
        if (this._recoveryPath == null) {
            this._recoveryPath = new Path(trimmedStrings[0]);
        }
        return this._recoveryPath;
    }
}
