package org.apache.hadoop.mapred;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.proto.SecurityProtos;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
import org.p001sparkproject.guava.annotations.VisibleForTesting;
import org.p001sparkproject.guava.base.Charsets;
import org.p001sparkproject.guava.cache.CacheBuilder;
import org.p001sparkproject.guava.cache.CacheLoader;
import org.p001sparkproject.guava.cache.LoadingCache;
import org.p001sparkproject.guava.cache.RemovalListener;
import org.p001sparkproject.guava.cache.RemovalNotification;
import org.p001sparkproject.guava.cache.Weigher;
import org.p001sparkproject.guava.util.concurrent.ThreadFactoryBuilder;
import py4j.commands.ReflectionCommand;

/* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler.class */
public class ShuffleHandler extends AuxiliaryService {
    public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
    public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
    public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
    public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4194304;
    private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
    private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
    private static final String DATA_FILE_NAME = "file.out";
    private static final String INDEX_FILE_NAME = "file.out.index";
    private int port;
    private ChannelFactory selector;
    private final ChannelGroup accepted;
    protected HttpPipelineFactory pipelineFact;
    private int sslFileBufferSize;
    private boolean manageOsCache;
    private int readaheadLength;
    private int maxShuffleConnections;
    private int shuffleBufferSize;
    private boolean shuffleTransferToAllowed;
    private int maxSessionOpenFiles;
    private ReadaheadPool readaheadPool;
    private Map<String, String> userRsrc;
    private JobTokenSecretManager secretManager;
    private DB stateDb;
    public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce_shuffle";
    public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
    public static final int DEFAULT_SHUFFLE_PORT = 13562;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = "mapreduce.shuffle.connection-keep-alive.enable";
    public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = "mapreduce.shuffle.connection-keep-alive.timeout";
    public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5;
    public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = "mapreduce.shuffle.mapoutput-info.meta.cache.size";
    public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = 1000;
    public static final String CONNECTION_CLOSE = "close";
    public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "mapreduce.shuffle.ssl.file.buffer.size";
    public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 61440;
    public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
    public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0;
    public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
    public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
    public static final String SHUFFLE_BUFFER_SIZE = "mapreduce.shuffle.transfer.buffer.size";
    public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 131072;
    public static final String SHUFFLE_TRANSFERTO_ALLOWED = "mapreduce.shuffle.transferTo.allowed";
    public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
    public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = false;
    public static final String SHUFFLE_MAX_SESSION_OPEN_FILES = "mapreduce.shuffle.max.session-open-files";
    public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
    boolean connectionKeepAliveEnabled;
    int connectionKeepAliveTimeOut;
    int mapOutputMetaInfoCacheSize;
    final ShuffleMetrics metrics;
    private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
    private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile("^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", 2);
    protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$AttemptPathIdentifier.class */
    public static class AttemptPathIdentifier {
        private final String jobId;
        private final String user;
        private final String attemptId;

        public AttemptPathIdentifier(String str, String str2, String str3) {
            this.jobId = str;
            this.user = str2;
            this.attemptId = str3;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AttemptPathIdentifier attemptPathIdentifier = (AttemptPathIdentifier) obj;
            return this.attemptId.equals(attemptPathIdentifier.attemptId) && this.jobId.equals(attemptPathIdentifier.jobId);
        }

        public int hashCode() {
            return (31 * this.jobId.hashCode()) + this.attemptId.hashCode();
        }

        public String toString() {
            return "AttemptPathIdentifier{attemptId='" + this.attemptId + "', jobId='" + this.jobId + "'}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$AttemptPathInfo.class */
    public static class AttemptPathInfo {
        private final Path indexPath;
        private final Path dataPath;

        public AttemptPathInfo(Path path, Path path2) {
            this.indexPath = path;
            this.dataPath = path2;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$HttpPipelineFactory.class */
    class HttpPipelineFactory implements ChannelPipelineFactory {
        final Shuffle SHUFFLE;
        private SSLFactory sslFactory;

        public HttpPipelineFactory(Configuration configuration) throws Exception {
            this.SHUFFLE = ShuffleHandler.this.getShuffle(configuration);
            if (configuration.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false)) {
                ShuffleHandler.LOG.info("Encrypted shuffle is enabled.");
                this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, configuration);
                this.sslFactory.init();
            }
        }

        public Shuffle getSHUFFLE() {
            return this.SHUFFLE;
        }

        public void destroy() {
            if (this.sslFactory != null) {
                this.sslFactory.destroy();
            }
        }

        @Override // org.jboss.netty.channel.ChannelPipelineFactory
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.sslFactory != null) {
                pipeline.addLast("ssl", new SslHandler(this.sslFactory.createSSLEngine()));
            }
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
            pipeline.addLast("encoder", new HttpResponseEncoder());
            pipeline.addLast("chunking", new ChunkedWriteHandler());
            pipeline.addLast("shuffle", this.SHUFFLE);
            return pipeline;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$LevelDBLogger.class */
    public static class LevelDBLogger implements Logger {
        private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);

        private LevelDBLogger() {
        }

        @Override // org.iq80.leveldb.Logger
        public void log(String str) {
            LOG.info(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$ReduceContext.class */
    public static class ReduceContext {
        private List<String> mapIds;
        private AtomicInteger mapsToWait;
        private AtomicInteger mapsToSend = new AtomicInteger(0);
        private int reduceId;
        private ChannelHandlerContext ctx;
        private String user;
        private Map<String, Shuffle.MapOutputInfo> infoMap;
        private String jobId;

        public ReduceContext(List<String> list, int i, ChannelHandlerContext channelHandlerContext, String str, Map<String, Shuffle.MapOutputInfo> map, String str2) {
            this.mapIds = list;
            this.reduceId = i;
            this.mapsToWait = new AtomicInteger(list.size());
            this.ctx = channelHandlerContext;
            this.user = str;
            this.infoMap = map;
            this.jobId = str2;
        }

        public int getReduceId() {
            return this.reduceId;
        }

        public ChannelHandlerContext getCtx() {
            return this.ctx;
        }

        public String getUser() {
            return this.user;
        }

        public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
            return this.infoMap;
        }

        public String getJobId() {
            return this.jobId;
        }

        public List<String> getMapIds() {
            return this.mapIds;
        }

        public AtomicInteger getMapsToSend() {
            return this.mapsToSend;
        }

        public AtomicInteger getMapsToWait() {
            return this.mapsToWait;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$ReduceMapFileCount.class */
    public class ReduceMapFileCount implements ChannelFutureListener {
        private ReduceContext reduceContext;

        public ReduceMapFileCount(ReduceContext reduceContext) {
            this.reduceContext = reduceContext;
        }

        @Override // org.jboss.netty.channel.ChannelFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (!channelFuture.isSuccess()) {
                channelFuture.getChannel().close();
            } else if (this.reduceContext.getMapsToWait().decrementAndGet() != 0) {
                ShuffleHandler.this.pipelineFact.getSHUFFLE().sendMap(this.reduceContext);
            } else {
                ShuffleHandler.this.metrics.operationComplete(channelFuture);
                channelFuture.getChannel().close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$Shuffle.class */
    public class Shuffle extends SimpleChannelUpstreamHandler {
        private static final int MAX_WEIGHT = 10485760;
        private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
        private static final int ALLOWED_CONCURRENCY = 16;
        private final Configuration conf;
        private final IndexCache indexCache;
        private int port;
        private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
        private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).softValues().concurrencyLevel(16).removalListener(new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>() { // from class: org.apache.hadoop.mapred.ShuffleHandler.Shuffle.3
            @Override // org.p001sparkproject.guava.cache.RemovalListener
            public void onRemoval(RemovalNotification<AttemptPathIdentifier, AttemptPathInfo> removalNotification) {
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("PathCache Eviction: " + removalNotification.getKey() + ", Reason=" + removalNotification.getCause());
                }
            }
        }).maximumWeight(10485760).weigher(new Weigher<AttemptPathIdentifier, AttemptPathInfo>() { // from class: org.apache.hadoop.mapred.ShuffleHandler.Shuffle.2
            @Override // org.p001sparkproject.guava.cache.Weigher
            public int weigh(AttemptPathIdentifier attemptPathIdentifier, AttemptPathInfo attemptPathInfo) {
                return attemptPathIdentifier.jobId.length() + attemptPathIdentifier.user.length() + attemptPathIdentifier.attemptId.length() + attemptPathInfo.indexPath.toString().length() + attemptPathInfo.dataPath.toString().length();
            }
        }).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() { // from class: org.apache.hadoop.mapred.ShuffleHandler.Shuffle.1
            @Override // org.p001sparkproject.guava.cache.CacheLoader
            public AttemptPathInfo load(AttemptPathIdentifier attemptPathIdentifier) throws Exception {
                String str = Shuffle.this.getBaseLocation(attemptPathIdentifier.jobId, attemptPathIdentifier.user) + attemptPathIdentifier.attemptId;
                Path localPathToRead = Shuffle.this.lDirAlloc.getLocalPathToRead(str + "/" + ShuffleHandler.INDEX_FILE_NAME, Shuffle.this.conf);
                Path localPathToRead2 = Shuffle.this.lDirAlloc.getLocalPathToRead(str + "/" + ShuffleHandler.DATA_FILE_NAME, Shuffle.this.conf);
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("Loaded : " + attemptPathIdentifier + " via loader");
                }
                return new AttemptPathInfo(localPathToRead, localPathToRead2);
            }
        });

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$Shuffle$MapOutputInfo.class */
        public class MapOutputInfo {
            final Path mapOutputFileName;
            final IndexRecord indexRecord;

            MapOutputInfo(Path path, IndexRecord indexRecord) {
                this.mapOutputFileName = path;
                this.indexRecord = indexRecord;
            }
        }

        public Shuffle(Configuration configuration) {
            this.conf = configuration;
            this.indexCache = new IndexCache(new JobConf(configuration));
            this.port = configuration.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
        }

        public void setPort(int i) {
            this.port = i;
        }

        private List<String> splitMaps(List<String> list) {
            if (null == list) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                Collections.addAll(arrayList, it.next().split(","));
            }
            return arrayList;
        }

        @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler, akka.remote.transport.netty.NettyServerHelpers
        public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            if (ShuffleHandler.this.maxShuffleConnections <= 0 || ShuffleHandler.this.accepted.size() < ShuffleHandler.this.maxShuffleConnections) {
                ShuffleHandler.this.accepted.add(channelStateEvent.getChannel());
                super.channelOpen(channelHandlerContext, channelStateEvent);
            } else {
                ShuffleHandler.LOG.info(String.format("Current number of shuffle connections (%d) is greater than or equal to the max allowed shuffle connections (%d)", Integer.valueOf(ShuffleHandler.this.accepted.size()), Integer.valueOf(ShuffleHandler.this.maxShuffleConnections)));
                channelStateEvent.getChannel().close();
            }
        }

        @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler, akka.remote.transport.netty.NettyServerHelpers
        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
            HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
            if (httpRequest.getMethod() != HttpMethod.GET) {
                sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
                return;
            }
            if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpRequest.getHeader("name")) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(httpRequest.getHeader("version"))) {
                sendError(channelHandlerContext, "Incompatible shuffle request version", HttpResponseStatus.BAD_REQUEST);
            }
            Map<String, List<String>> parameters = new QueryStringDecoder(httpRequest.getUri()).getParameters();
            List<String> list = parameters.get(NettyTransceiver.NETTY_KEEPALIVE_OPTION);
            boolean z = false;
            if (list != null && list.size() == 1) {
                z = Boolean.valueOf(list.get(0)).booleanValue();
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("KeepAliveParam : " + list + " : " + z);
                }
            }
            List<String> splitMaps = splitMaps(parameters.get("map"));
            List<String> list2 = parameters.get("reduce");
            List<String> list3 = parameters.get(org.apache.hadoop.mapreduce.JobID.JOB);
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("RECV: " + httpRequest.getUri() + "\n  mapId: " + splitMaps + "\n  reduceId: " + list2 + "\n  jobId: " + list3 + "\n  keepAlive: " + z);
            }
            if (splitMaps == null || list2 == null || list3 == null) {
                sendError(channelHandlerContext, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (list2.size() != 1 || list3.size() != 1) {
                sendError(channelHandlerContext, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            try {
                int parseInt = Integer.parseInt(list2.get(0));
                String str = list3.get(0);
                String uri = httpRequest.getUri();
                if (null == uri) {
                    sendError(channelHandlerContext, HttpResponseStatus.FORBIDDEN);
                    return;
                }
                DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                try {
                    verifyRequest(str, channelHandlerContext, httpRequest, defaultHttpResponse, new URL("http", "", this.port, uri));
                    HashMap hashMap = new HashMap();
                    Channel channel = messageEvent.getChannel();
                    String str2 = (String) ShuffleHandler.this.userRsrc.get(str);
                    try {
                        populateHeaders(splitMaps, str, str2, parseInt, httpRequest, defaultHttpResponse, z, hashMap);
                        channel.write(defaultHttpResponse);
                        ReduceContext reduceContext = new ReduceContext(splitMaps, parseInt, channelHandlerContext, str2, hashMap, str);
                        for (int i = 0; i < Math.min(ShuffleHandler.this.maxSessionOpenFiles, splitMaps.size()) && sendMap(reduceContext) != null; i++) {
                        }
                    } catch (IOException e) {
                        channel.write(defaultHttpResponse);
                        ShuffleHandler.LOG.error("Shuffle error in populating headers :", e);
                        sendError(channelHandlerContext, getErrorMessage(e), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    }
                } catch (IOException e2) {
                    ShuffleHandler.LOG.warn("Shuffle failure ", e2);
                    sendError(channelHandlerContext, e2.getMessage(), HttpResponseStatus.UNAUTHORIZED);
                }
            } catch (NumberFormatException e3) {
                sendError(channelHandlerContext, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
            } catch (IllegalArgumentException e4) {
                sendError(channelHandlerContext, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
            }
        }

        public ChannelFuture sendMap(ReduceContext reduceContext) throws Exception {
            ChannelFuture channelFuture = null;
            if (reduceContext.getMapsToSend().get() < reduceContext.getMapIds().size()) {
                String str = reduceContext.getMapIds().get(reduceContext.getMapsToSend().getAndIncrement());
                try {
                    MapOutputInfo mapOutputInfo = reduceContext.getInfoMap().get(str);
                    if (mapOutputInfo == null) {
                        mapOutputInfo = getMapOutputInfo(str, reduceContext.getReduceId(), reduceContext.getJobId(), reduceContext.getUser());
                    }
                    channelFuture = sendMapOutput(reduceContext.getCtx(), reduceContext.getCtx().getChannel(), reduceContext.getUser(), str, reduceContext.getReduceId(), mapOutputInfo);
                    if (null == channelFuture) {
                        sendError(reduceContext.getCtx(), HttpResponseStatus.NOT_FOUND);
                        return null;
                    }
                    channelFuture.addListener(new ReduceMapFileCount(reduceContext));
                } catch (IOException e) {
                    ShuffleHandler.LOG.error("Shuffle error :", e);
                    sendError(reduceContext.getCtx(), getErrorMessage(e), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    return null;
                }
            }
            return channelFuture;
        }

        private String getErrorMessage(Throwable th) {
            StringBuffer stringBuffer = new StringBuffer(th.getMessage());
            while (th.getCause() != null) {
                stringBuffer.append(th.getCause().getMessage());
                th = th.getCause();
            }
            return stringBuffer.toString();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getBaseLocation(String str, String str2) {
            JobID forName = JobID.forName(str);
            return "usercache/" + str2 + "/appcache/" + ConverterUtils.toString(ApplicationId.newInstance(Long.parseLong(forName.getJtIdentifier()), forName.getId())) + "/output/";
        }

        protected MapOutputInfo getMapOutputInfo(String str, int i, String str2, String str3) throws IOException {
            try {
                AttemptPathIdentifier attemptPathIdentifier = new AttemptPathIdentifier(str2, str3, str);
                AttemptPathInfo attemptPathInfo = this.pathCache.get(attemptPathIdentifier);
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("Retrieved pathInfo for " + attemptPathIdentifier + " check for corresponding loaded messages to determine whether it was loaded or cached");
                }
                IndexRecord indexInformation = this.indexCache.getIndexInformation(str, i, attemptPathInfo.indexPath, str3);
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("getMapOutputInfo: jobId=" + str2 + ", mapId=" + str + ",dataFile=" + attemptPathInfo.dataPath + ", indexFile=" + attemptPathInfo.indexPath);
                }
                return new MapOutputInfo(attemptPathInfo.dataPath, indexInformation);
            } catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw ((IOException) e.getCause());
                }
                throw new RuntimeException(e.getCause());
            }
        }

        protected void populateHeaders(List<String> list, String str, String str2, int i, HttpRequest httpRequest, HttpResponse httpResponse, boolean z, Map<String, MapOutputInfo> map) throws IOException {
            long j = 0;
            for (String str3 : list) {
                MapOutputInfo mapOutputInfo = getMapOutputInfo(str3, i, str, str2);
                if (map.size() < ShuffleHandler.this.mapOutputMetaInfoCacheSize) {
                    map.put(str3, mapOutputInfo);
                }
                new ShuffleHeader(str3, mapOutputInfo.indexRecord.partLength, mapOutputInfo.indexRecord.rawLength, i).write(new DataOutputBuffer());
                j = j + mapOutputInfo.indexRecord.partLength + r0.getLength();
            }
            setResponseHeaders(httpResponse, z, j);
        }

        protected void setResponseHeaders(HttpResponse httpResponse, boolean z, long j) {
            if (!ShuffleHandler.this.connectionKeepAliveEnabled && !z) {
                ShuffleHandler.LOG.info("Setting connection close header...");
                httpResponse.setHeader("Connection", "close");
            } else {
                httpResponse.setHeader("Content-Length", String.valueOf(j));
                httpResponse.setHeader("Connection", "Keep-Alive");
                httpResponse.setHeader("Keep-Alive", "timeout=" + ShuffleHandler.this.connectionKeepAliveTimeOut);
                ShuffleHandler.LOG.info("Content Length in shuffle : " + j);
            }
        }

        protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
            SecretKey retrieveTokenSecret = ShuffleHandler.this.secretManager.retrieveTokenSecret(str);
            if (null == retrieveTokenSecret) {
                ShuffleHandler.LOG.info("Request for unknown token " + str);
                throw new IOException("could not find jobid");
            }
            String buildMsgFrom = SecureShuffleUtils.buildMsgFrom(url);
            String header = httpRequest.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
            if (header == null) {
                ShuffleHandler.LOG.info("Missing header hash for " + str);
                throw new IOException("fetcher cannot be authenticated");
            }
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                int length = header.length();
                ShuffleHandler.LOG.debug("verifying request. enc_str=" + buildMsgFrom + "; hash=..." + header.substring(length - (length / 2), length - 1));
            }
            SecureShuffleUtils.verifyReply(header, buildMsgFrom, retrieveTokenSecret);
            String generateHash = SecureShuffleUtils.generateHash(header.getBytes(Charsets.UTF_8), retrieveTokenSecret);
            httpResponse.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, generateHash);
            httpResponse.setHeader("name", ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
            httpResponse.setHeader("version", ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                int length2 = generateHash.length();
                ShuffleHandler.LOG.debug("Fetcher request verfied. enc_str=" + buildMsgFrom + ";reply=" + generateHash.substring(length2 - (length2 / 2), length2 - 1));
            }
        }

        protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, int i, MapOutputInfo mapOutputInfo) throws IOException {
            ChannelFuture write;
            IndexRecord indexRecord = mapOutputInfo.indexRecord;
            ShuffleHeader shuffleHeader = new ShuffleHeader(str2, indexRecord.partLength, indexRecord.rawLength, i);
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            shuffleHeader.write(dataOutputBuffer);
            channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
            File file = new File(mapOutputInfo.mapOutputFileName.toString());
            try {
                RandomAccessFile openForRandomRead = SecureIOUtils.openForRandomRead(file, ReflectionCommand.REFLECTION_COMMAND_NAME, str, null);
                if (channel.getPipeline().get(SslHandler.class) == null) {
                    final FadvisedFileRegion fadvisedFileRegion = new FadvisedFileRegion(openForRandomRead, indexRecord.startOffset, indexRecord.partLength, ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, file.getAbsolutePath(), ShuffleHandler.this.shuffleBufferSize, ShuffleHandler.this.shuffleTransferToAllowed);
                    write = channel.write(fadvisedFileRegion);
                    write.addListener(new ChannelFutureListener() { // from class: org.apache.hadoop.mapred.ShuffleHandler.Shuffle.4
                        @Override // org.jboss.netty.channel.ChannelFutureListener
                        public void operationComplete(ChannelFuture channelFuture) {
                            if (channelFuture.isSuccess()) {
                                fadvisedFileRegion.transferSuccessful();
                            }
                            fadvisedFileRegion.releaseExternalResources();
                        }
                    });
                } else {
                    write = channel.write(new FadvisedChunkedFile(openForRandomRead, indexRecord.startOffset, indexRecord.partLength, ShuffleHandler.this.sslFileBufferSize, ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, file.getAbsolutePath()));
                }
                ShuffleHandler.this.metrics.shuffleConnections.incr();
                ShuffleHandler.this.metrics.shuffleOutputBytes.incr(indexRecord.partLength);
                return write;
            } catch (FileNotFoundException e) {
                ShuffleHandler.LOG.info(file + " not found");
                return null;
            }
        }

        protected void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
            sendError(channelHandlerContext, "", httpResponseStatus);
        }

        protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
            defaultHttpResponse.setHeader("Content-Type", MimeType.TEXT);
            defaultHttpResponse.setHeader("name", ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
            defaultHttpResponse.setHeader("version", ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
            defaultHttpResponse.setContent(ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8));
            channelHandlerContext.getChannel().write(defaultHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }

        @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler, akka.remote.transport.netty.NettyServerHelpers
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            Channel channel = exceptionEvent.getChannel();
            Throwable cause = exceptionEvent.getCause();
            if (cause instanceof TooLongFrameException) {
                sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (cause instanceof IOException) {
                if (cause instanceof ClosedChannelException) {
                    ShuffleHandler.LOG.debug("Ignoring closed channel error", cause);
                    return;
                }
                if (ShuffleHandler.IGNORABLE_ERROR_MESSAGE.matcher(String.valueOf(cause.getMessage())).matches()) {
                    ShuffleHandler.LOG.debug("Ignoring client socket close", cause);
                    return;
                }
            }
            ShuffleHandler.LOG.error("Shuffle error: ", cause);
            if (channel.isConnected()) {
                ShuffleHandler.LOG.error("Shuffle error " + exceptionEvent);
                sendError(channelHandlerContext, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metrics(about = "Shuffle output metrics", context = "mapred")
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleHandler$ShuffleMetrics.class */
    public static class ShuffleMetrics implements ChannelFutureListener {

        @Metric({"Shuffle output in bytes"})
        MutableCounterLong shuffleOutputBytes;

        @Metric({"# of failed shuffle outputs"})
        MutableCounterInt shuffleOutputsFailed;

        @Metric({"# of succeeeded shuffle outputs"})
        MutableCounterInt shuffleOutputsOK;

        @Metric({"# of current shuffle connections"})
        MutableGaugeInt shuffleConnections;

        ShuffleMetrics() {
        }

        @Override // org.jboss.netty.channel.ChannelFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                this.shuffleOutputsOK.incr();
            } else {
                this.shuffleOutputsFailed.incr();
            }
            this.shuffleConnections.decr();
        }
    }

    ShuffleHandler(MetricsSystem metricsSystem) {
        super("httpshuffle");
        this.accepted = new DefaultChannelGroup();
        this.readaheadPool = ReadaheadPool.getInstance();
        this.stateDb = null;
        this.connectionKeepAliveEnabled = false;
        this.metrics = (ShuffleMetrics) metricsSystem.register((MetricsSystem) new ShuffleMetrics());
    }

    public ShuffleHandler() {
        this(DefaultMetricsSystem.instance());
    }

    public static ByteBuffer serializeMetaData(int i) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(i);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    public static int deserializeMetaData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(byteBuffer);
        return dataInputByteBuffer.readInt();
    }

    public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> token) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        token.write(dataOutputBuffer);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(byteBuffer);
        Token<JobTokenIdentifier> token = new Token<>();
        token.readFields(dataInputByteBuffer);
        return token;
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void initializeApplication(ApplicationInitializationContext applicationInitializationContext) {
        String user = applicationInitializationContext.getUser();
        ApplicationId applicationId = applicationInitializationContext.getApplicationId();
        try {
            recordJobShuffleInfo(new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId()), user, deserializeServiceData(applicationInitializationContext.getApplicationDataForService()));
        } catch (IOException e) {
            LOG.error("Error during initApp", e);
        }
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public void stopApplication(ApplicationTerminationContext applicationTerminationContext) {
        ApplicationId applicationId = applicationTerminationContext.getApplicationId();
        try {
            removeJobShuffleInfo(new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId()));
        } catch (IOException e) {
            LOG.error("Error during stopApp", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        this.manageOsCache = configuration.getBoolean(SHUFFLE_MANAGE_OS_CACHE, true);
        this.readaheadLength = configuration.getInt(SHUFFLE_READAHEAD_BYTES, 4194304);
        this.maxShuffleConnections = configuration.getInt(MAX_SHUFFLE_CONNECTIONS, 0);
        int i = configuration.getInt(MAX_SHUFFLE_THREADS, 0);
        if (i == 0) {
            i = 2 * Runtime.getRuntime().availableProcessors();
        }
        this.shuffleBufferSize = configuration.getInt(SHUFFLE_BUFFER_SIZE, 131072);
        this.shuffleTransferToAllowed = configuration.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED, !Shell.WINDOWS);
        this.maxSessionOpenFiles = configuration.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES, 3);
        this.selector = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ShuffleHandler Netty Boss #%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ShuffleHandler Netty Worker #%d").build()), i);
        super.serviceInit(new Configuration(configuration));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        Configuration config = getConfig();
        this.userRsrc = new ConcurrentHashMap();
        this.secretManager = new JobTokenSecretManager();
        recoverState(config);
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.selector);
        try {
            this.pipelineFact = new HttpPipelineFactory(config);
            serverBootstrap.setOption("child.keepAlive", true);
            serverBootstrap.setPipelineFactory(this.pipelineFact);
            this.port = config.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
            Channel bind = serverBootstrap.bind(new InetSocketAddress(this.port));
            this.accepted.add(bind);
            this.port = ((InetSocketAddress) bind.getLocalAddress()).getPort();
            config.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(this.port));
            this.pipelineFact.SHUFFLE.setPort(this.port);
            LOG.info(getName() + " listening on port " + this.port);
            super.serviceStart();
            this.sslFileBufferSize = config.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, 61440);
            this.connectionKeepAliveEnabled = config.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, false);
            this.connectionKeepAliveTimeOut = Math.max(1, config.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, 5));
            this.mapOutputMetaInfoCacheSize = Math.max(1, config.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE, 1000));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        this.accepted.close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
        if (this.selector != null) {
            new ServerBootstrap(this.selector).releaseExternalResources();
        }
        if (this.pipelineFact != null) {
            this.pipelineFact.destroy();
        }
        if (this.stateDb != null) {
            this.stateDb.close();
        }
        super.serviceStop();
    }

    @Override // org.apache.hadoop.yarn.server.api.AuxiliaryService
    public synchronized ByteBuffer getMetaData() {
        try {
            return serializeMetaData(this.port);
        } catch (IOException e) {
            LOG.error("Error during getMeta", e);
            return null;
        }
    }

    protected Shuffle getShuffle(Configuration configuration) {
        return new Shuffle(configuration);
    }

    private void recoverState(Configuration configuration) throws IOException {
        Path recoveryPath = getRecoveryPath();
        if (recoveryPath != null) {
            startStore(recoveryPath);
            Pattern compile = Pattern.compile(org.apache.hadoop.mapreduce.JobID.JOBID_REGEX);
            LeveldbIterator leveldbIterator = null;
            try {
                try {
                    leveldbIterator = new LeveldbIterator(this.stateDb);
                    leveldbIterator.seek(JniDBFactory.bytes(org.apache.hadoop.mapreduce.JobID.JOB));
                    while (leveldbIterator.hasNext()) {
                        Map.Entry<byte[], byte[]> next = leveldbIterator.next();
                        String asString = JniDBFactory.asString(next.getKey());
                        if (!compile.matcher(asString).matches()) {
                            break;
                        } else {
                            recoverJobShuffleInfo(asString, next.getValue());
                        }
                    }
                    if (leveldbIterator != null) {
                        leveldbIterator.close();
                    }
                } catch (DBException e) {
                    throw new IOException("Database error during recovery", e);
                }
            } catch (Throwable th) {
                if (leveldbIterator != null) {
                    leveldbIterator.close();
                }
                throw th;
            }
        }
    }

    private void startStore(Path path) throws IOException {
        Options options = new Options();
        options.createIfMissing(false);
        options.logger(new LevelDBLogger());
        Path path2 = new Path(path, STATE_DB_NAME);
        LOG.info("Using state database at " + path2 + " for recovery");
        File file = new File(path2.toString());
        try {
            this.stateDb = JniDBFactory.factory.open(file, options);
        } catch (NativeDB.DBException e) {
            if (!e.isNotFound() && !e.getMessage().contains(" does not exist ")) {
                throw e;
            }
            LOG.info("Creating state database at " + file);
            options.createIfMissing(true);
            try {
                this.stateDb = JniDBFactory.factory.open(file, options);
                storeVersion();
            } catch (DBException e2) {
                throw new IOException("Unable to create state store", e2);
            }
        }
        checkVersion();
    }

    @VisibleForTesting
    Version loadVersion() throws IOException {
        byte[] bArr = this.stateDb.get(JniDBFactory.bytes(STATE_DB_SCHEMA_VERSION_KEY));
        return (bArr == null || bArr.length == 0) ? getCurrentVersion() : new VersionPBImpl(YarnServerCommonProtos.VersionProto.parseFrom(bArr));
    }

    private void storeSchemaVersion(Version version) throws IOException {
        try {
            this.stateDb.put(JniDBFactory.bytes(STATE_DB_SCHEMA_VERSION_KEY), ((VersionPBImpl) version).getProto().toByteArray());
        } catch (DBException e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    private void storeVersion() throws IOException {
        storeSchemaVersion(CURRENT_VERSION_INFO);
    }

    @VisibleForTesting
    void storeVersion(Version version) throws IOException {
        storeSchemaVersion(version);
    }

    protected Version getCurrentVersion() {
        return CURRENT_VERSION_INFO;
    }

    private void checkVersion() throws IOException {
        Version loadVersion = loadVersion();
        LOG.info("Loaded state DB schema version info " + loadVersion);
        if (loadVersion.equals(getCurrentVersion())) {
            return;
        }
        if (!loadVersion.isCompatibleTo(getCurrentVersion())) {
            throw new IOException("Incompatible version for state DB schema: expecting DB schema version " + getCurrentVersion() + ", but loading version " + loadVersion);
        }
        LOG.info("Storing state DB schedma version info " + getCurrentVersion());
        storeVersion();
    }

    private void addJobToken(JobID jobID, String str, Token<JobTokenIdentifier> token) {
        this.userRsrc.put(jobID.toString(), str);
        this.secretManager.addTokenForJob(jobID.toString(), token);
        LOG.info("Added token for " + jobID.toString());
    }

    private void recoverJobShuffleInfo(String str, byte[] bArr) throws IOException {
        try {
            JobID forName = JobID.forName(str);
            ShuffleHandlerRecoveryProtos.JobShuffleInfoProto parseFrom = ShuffleHandlerRecoveryProtos.JobShuffleInfoProto.parseFrom(bArr);
            String user = parseFrom.getUser();
            SecurityProtos.TokenProto jobToken = parseFrom.getJobToken();
            addJobToken(forName, user, new Token<>(jobToken.getIdentifier().toByteArray(), jobToken.getPassword().toByteArray(), new Text(jobToken.getKind()), new Text(jobToken.getService())));
        } catch (IllegalArgumentException e) {
            throw new IOException("Bad job ID " + str + " in state store", e);
        }
    }

    private void recordJobShuffleInfo(JobID jobID, String str, Token<JobTokenIdentifier> token) throws IOException {
        if (this.stateDb != null) {
            try {
                this.stateDb.put(JniDBFactory.bytes(jobID.toString()), ShuffleHandlerRecoveryProtos.JobShuffleInfoProto.newBuilder().setUser(str).setJobToken(SecurityProtos.TokenProto.newBuilder().setIdentifier(ByteString.copyFrom(token.getIdentifier())).setPassword(ByteString.copyFrom(token.getPassword())).setKind(token.getKind().toString()).setService(token.getService().toString()).build()).build().toByteArray());
            } catch (DBException e) {
                throw new IOException("Error storing " + jobID, e);
            }
        }
        addJobToken(jobID, str, token);
    }

    private void removeJobShuffleInfo(JobID jobID) throws IOException {
        String jobID2 = jobID.toString();
        this.secretManager.removeTokenForJob(jobID2);
        this.userRsrc.remove(jobID2);
        if (this.stateDb != null) {
            try {
                this.stateDb.delete(JniDBFactory.bytes(jobID2));
            } catch (DBException e) {
                throw new IOException("Unable to remove " + jobID + " from state store", e);
            }
        }
    }
}
