package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate({MRConfig.YARN_FRAMEWORK_NAME})
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.class */
public abstract class AbstractReservationSystem extends AbstractService implements ReservationSystem {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractReservationSystem.class);
    private final ReentrantReadWriteLock readWriteLock;
    private final Lock readLock;
    private final Lock writeLock;
    private boolean initialized;
    private final Clock clock;
    private AtomicLong resCounter;
    private Map<String, Plan> plans;
    private Map<ReservationId, String> resQMap;
    private RMContext rmContext;
    private ResourceScheduler scheduler;
    private ScheduledExecutorService scheduledExecutorService;
    protected Configuration conf;
    protected long planStepSize;
    private PlanFollower planFollower;

    public AbstractReservationSystem(String str) {
        super(str);
        this.readWriteLock = new ReentrantReadWriteLock(true);
        this.readLock = this.readWriteLock.readLock();
        this.writeLock = this.readWriteLock.writeLock();
        this.initialized = false;
        this.clock = new UTCClock();
        this.resCounter = new AtomicLong();
        this.plans = new HashMap();
        this.resQMap = new HashMap();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public void setRMContext(RMContext rMContext) {
        this.writeLock.lock();
        try {
            this.rmContext = rMContext;
            this.writeLock.unlock();
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public void reinitialize(Configuration configuration, RMContext rMContext) throws YarnException {
        this.writeLock.lock();
        try {
            if (this.initialized) {
                initializeNewPlans(configuration);
            } else {
                initialize(configuration);
                this.initialized = true;
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    private void initialize(Configuration configuration) throws YarnException {
        LOG.info("Initializing Reservation system");
        this.conf = configuration;
        this.scheduler = this.rmContext.getScheduler();
        this.planStepSize = configuration.getTimeDuration(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 1000L, TimeUnit.MILLISECONDS);
        if (this.planStepSize < 0) {
            this.planStepSize = 1000L;
        }
        for (String str : this.scheduler.getPlanQueues()) {
            this.plans.put(str, initializePlan(str));
        }
    }

    private void initializeNewPlans(Configuration configuration) {
        LOG.info("Refreshing Reservation system");
        this.writeLock.lock();
        try {
            try {
                for (String str : this.scheduler.getPlanQueues()) {
                    if (this.plans.containsKey(str)) {
                        LOG.warn("Plan based on reservation queue {0} already exists.", str);
                    } else {
                        this.plans.put(str, initializePlan(str));
                    }
                }
                if (this.planFollower != null) {
                    this.planFollower.setPlans(this.plans.values());
                }
                this.writeLock.unlock();
            } catch (YarnException e) {
                LOG.warn("Exception while trying to refresh reservable queues", (Throwable) e);
                this.writeLock.unlock();
            }
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    private PlanFollower createPlanFollower() {
        String str = this.conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER, getDefaultPlanFollower());
        if (str == null) {
            return null;
        }
        LOG.info("Using PlanFollowerPolicy: " + str);
        try {
            Class<?> classByName = this.conf.getClassByName(str);
            if (PlanFollower.class.isAssignableFrom(classByName)) {
                return (PlanFollower) ReflectionUtils.newInstance(classByName, this.conf);
            }
            throw new YarnRuntimeException("Class: " + str + " not instance of " + PlanFollower.class.getCanonicalName());
        } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Could not instantiate PlanFollowerPolicy: " + str, e);
        }
    }

    private String getDefaultPlanFollower() {
        if (this.scheduler instanceof CapacityScheduler) {
            return CapacitySchedulerPlanFollower.class.getName();
        }
        if (this.scheduler instanceof FairScheduler) {
            return FairSchedulerPlanFollower.class.getName();
        }
        return null;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public Plan getPlan(String str) {
        this.readLock.lock();
        try {
            Plan plan = this.plans.get(str);
            this.readLock.unlock();
            return plan;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public long getPlanFollowerTimeStep() {
        this.readLock.lock();
        try {
            long j = this.planStepSize;
            this.readLock.unlock();
            return j;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public void synchronizePlan(String str) {
        this.writeLock.lock();
        try {
            Plan plan = this.plans.get(str);
            if (plan != null) {
                this.planFollower.synchronizePlan(plan);
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        reinitialize(new Configuration(configuration), this.rmContext);
        this.planFollower = createPlanFollower();
        if (this.planFollower != null) {
            this.planFollower.init(this.clock, this.scheduler, this.plans.values());
        }
        super.serviceInit(configuration);
    }

    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        if (this.planFollower != null) {
            this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
            this.scheduledExecutorService.scheduleWithFixedDelay(this.planFollower, 0L, this.planStepSize, TimeUnit.MILLISECONDS);
        }
        super.serviceStart();
    }

    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() {
        if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) {
            this.scheduledExecutorService.shutdown();
        }
        this.plans.clear();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public String getQueueForReservation(ReservationId reservationId) {
        this.readLock.lock();
        try {
            String str = this.resQMap.get(reservationId);
            this.readLock.unlock();
            return str;
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public void setQueueForReservation(ReservationId reservationId, String str) {
        this.writeLock.lock();
        try {
            this.resQMap.put(reservationId, str);
            this.writeLock.unlock();
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public ReservationId getNewReservationId() {
        this.writeLock.lock();
        try {
            ReservationId newInstance = ReservationId.newInstance(ResourceManager.getClusterTimeStamp(), this.resCounter.incrementAndGet());
            LOG.info("Allocated new reservationId: " + newInstance);
            this.writeLock.unlock();
            return newInstance;
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem
    public Map<String, Plan> getAllPlans() {
        return this.plans;
    }

    public static String getDefaultReservationSystem(ResourceScheduler resourceScheduler) {
        if (resourceScheduler instanceof CapacityScheduler) {
            return CapacityReservationSystem.class.getName();
        }
        if (resourceScheduler instanceof FairScheduler) {
            return FairReservationSystem.class.getName();
        }
        return null;
    }

    protected Plan initializePlan(String str) throws YarnException {
        String planQueuePath = getPlanQueuePath(str);
        SharingPolicy admissionPolicy = getAdmissionPolicy(planQueuePath);
        admissionPolicy.init(planQueuePath, getReservationSchedulerConfiguration());
        Resource minAllocation = getMinAllocation();
        Resource maxAllocation = getMaxAllocation();
        ResourceCalculator resourceCalculator = getResourceCalculator();
        InMemoryPlan inMemoryPlan = new InMemoryPlan(getRootQueueMetrics(), admissionPolicy, getAgent(planQueuePath), getPlanQueueCapacity(str), this.planStepSize, resourceCalculator, minAllocation, maxAllocation, str, getReplanner(planQueuePath), getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath));
        LOG.info("Intialized plan {0} based on reservable queue {1}", inMemoryPlan.toString(), str);
        return inMemoryPlan;
    }

    protected Planner getReplanner(String str) {
        ReservationSchedulerConfiguration reservationSchedulerConfiguration = getReservationSchedulerConfiguration();
        String replanner = reservationSchedulerConfiguration.getReplanner(str);
        LOG.info("Using Replanner: " + replanner + " for queue: " + str);
        try {
            Class<?> classByName = this.conf.getClassByName(replanner);
            if (!Planner.class.isAssignableFrom(classByName)) {
                throw new YarnRuntimeException("Class: " + classByName + " not instance of " + Planner.class.getCanonicalName());
            }
            Planner planner = (Planner) ReflectionUtils.newInstance(classByName, this.conf);
            planner.init(str, reservationSchedulerConfiguration);
            return planner;
        } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Could not instantiate Planner: " + replanner + " for queue: " + str, e);
        }
    }

    protected ReservationAgent getAgent(String str) {
        String reservationAgent = getReservationSchedulerConfiguration().getReservationAgent(str);
        LOG.info("Using Agent: " + reservationAgent + " for queue: " + str);
        try {
            Class<?> classByName = this.conf.getClassByName(reservationAgent);
            if (ReservationAgent.class.isAssignableFrom(classByName)) {
                return (ReservationAgent) ReflectionUtils.newInstance(classByName, this.conf);
            }
            throw new YarnRuntimeException("Class: " + reservationAgent + " not instance of " + ReservationAgent.class.getCanonicalName());
        } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Could not instantiate Agent: " + reservationAgent + " for queue: " + str, e);
        }
    }

    protected SharingPolicy getAdmissionPolicy(String str) {
        String reservationAdmissionPolicy = getReservationSchedulerConfiguration().getReservationAdmissionPolicy(str);
        LOG.info("Using AdmissionPolicy: " + reservationAdmissionPolicy + " for queue: " + str);
        try {
            Class<?> classByName = this.conf.getClassByName(reservationAdmissionPolicy);
            if (SharingPolicy.class.isAssignableFrom(classByName)) {
                return (SharingPolicy) ReflectionUtils.newInstance(classByName, this.conf);
            }
            throw new YarnRuntimeException("Class: " + reservationAdmissionPolicy + " not instance of " + SharingPolicy.class.getCanonicalName());
        } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + reservationAdmissionPolicy + " for queue: " + str, e);
        }
    }

    protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration();

    protected abstract String getPlanQueuePath(String str);

    protected abstract Resource getPlanQueueCapacity(String str);

    protected abstract Resource getMinAllocation();

    protected abstract Resource getMaxAllocation();

    protected abstract ResourceCalculator getResourceCalculator();

    protected abstract QueueMetrics getRootQueueMetrics();
}
