package org.apache.flink.yarn;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.ReservationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/ZookeeperResourceReservationProvider.class */
public class ZookeeperResourceReservationProvider implements ReservationProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperResourceReservationProvider.class);
    private final YarnClient yarnClient;
    private final MapSerializer<String, Tuple2<Double, Long>> resourceMapSerializer;
    private final String clusterId;
    private static final String sharedCounterPath = "/bucket-counter";
    private static final String reservedResourcesPath = "/reserved";
    private final int jobReservationCount;
    private final Duration jobReservationTimeout;
    private final SharedCount rescaleSharedCounter;
    private final SharedValue sharedResourceMap;
    private final InterProcessMutex rescaleCounterLock;
    private final String queueId;

    public ZookeeperResourceReservationProvider(@Nonnull CuratorFramework curatorFramework, YarnClient yarnClient, int i, Duration duration, String str, String str2) {
        Preconditions.checkNotNull(curatorFramework, "Curator framework cannot be null");
        Preconditions.checkNotNull(yarnClient, "YARN client cannot be null");
        Preconditions.checkNotNull(str, "Cluster ID cannot be null");
        LOG.info("ZookeeperResourceReservationProvider initializing with job reservation timeout: {} and bucket rescaling size: {} for cluster {}.", new Object[]{duration, Integer.valueOf(i), str});
        this.yarnClient = yarnClient;
        try {
            this.queueId = yarnClient.getApplicationReport(ApplicationId.fromString(str)).getQueue();
            this.resourceMapSerializer = createResourceMapSerializer();
            this.jobReservationCount = i;
            this.jobReservationTimeout = duration;
            this.clusterId = str;
            String str3 = str2 + "-" + this.queueId;
            this.rescaleCounterLock = new InterProcessMutex(curatorFramework, str3);
            this.rescaleSharedCounter = new SharedCount(curatorFramework, str3 + sharedCounterPath, 0);
            ByteBuffer allocate = ByteBuffer.allocate(4);
            allocate.putInt(0);
            this.sharedResourceMap = new SharedValue(curatorFramework, str3 + reservedResourcesPath, allocate.array());
            try {
                this.rescaleSharedCounter.start();
                this.sharedResourceMap.start();
            } catch (Exception e) {
                throw new FlinkRuntimeException(String.format("Error during initializing %s", getClass().getSimpleName()), e);
            }
        } catch (YarnException | IOException e2) {
            throw new FlinkRuntimeException(String.format("Error during initializing %s", getClass().getSimpleName()), e2);
        }
    }

    @VisibleForTesting
    static MapSerializer<String, Tuple2<Double, Long>> createResourceMapSerializer() {
        return new MapSerializer<>(StringSerializer.INSTANCE, new TupleSerializer(Tuple2.class, new TypeSerializer[]{DoubleSerializer.INSTANCE, LongSerializer.INSTANCE}));
    }

    @Override // org.apache.flink.yarn.ReservationProvider
    public ReservationProvider.AvailableResources getYarnAvailableResources(String str) throws Exception {
        QueueStatistics queueStatistics = this.yarnClient.getQueueInfo(str).getQueueStatistics();
        LOG.debug("Try to reserve yarn resources with available YARN cpu: {}, memory: {}mb", Long.valueOf(queueStatistics.getAvailableVCores()), Long.valueOf(queueStatistics.getAvailableMemoryMB()));
        return new ReservationProvider.AvailableResources(queueStatistics.getAvailableVCores(), queueStatistics.getAvailableMemoryMB());
    }

    @Override // org.apache.flink.yarn.ReservationProvider
    public int tryReserveResources(int i, double d, MemorySize memorySize) {
        double doubleValue;
        long longValue;
        try {
            if (!this.rescaleCounterLock.acquire(this.jobReservationTimeout.getSeconds(), TimeUnit.SECONDS)) {
                LOG.warn("Timeout for waiting lock for rescaling exceeded.");
                return 0;
            }
            try {
                try {
                    int count = this.rescaleSharedCounter.getCount();
                    LOG.info("Current rescaling bucket size is {}.", Integer.valueOf(count));
                    if (count >= this.jobReservationCount) {
                        return 0;
                    }
                    Map deserialize = this.resourceMapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(this.sharedResourceMap.getValue())));
                    double d2 = 0.0d;
                    long j = 0;
                    for (Tuple2 tuple2 : deserialize.values()) {
                        d2 += ((Double) tuple2.f0).doubleValue();
                        j += ((Long) tuple2.f1).longValue();
                    }
                    LOG.debug("Already reserved cpu: {}, memory: {}mb", Double.valueOf(d2), Long.valueOf(j));
                    double availableVCores = r0.getAvailableVCores() - d2;
                    long availableMemoryMB = getYarnAvailableResources(this.queueId).getAvailableMemoryMB() - j;
                    LOG.info("Try to reserve resources with available cpu: {}, memory: {}mb", Double.valueOf(availableVCores), Long.valueOf(availableMemoryMB));
                    int min = Math.min((int) Math.min(availableVCores / d, availableMemoryMB / memorySize.getMebiBytes()), i);
                    if (min > 0) {
                        Tuple2 tuple22 = (Tuple2) deserialize.get(this.clusterId);
                        if (tuple22 == null) {
                            this.rescaleSharedCounter.setCount(count + 1);
                            doubleValue = min * d;
                            longValue = min * memorySize.getMebiBytes();
                        } else {
                            doubleValue = ((Double) tuple22.f0).doubleValue() + (min * d);
                            longValue = ((Long) tuple22.f1).longValue() + (min * memorySize.getMebiBytes());
                        }
                        LOG.info("Reservation resources (cpu: {}, memory: {}) for cluster {}", new Object[]{Double.valueOf(doubleValue), Long.valueOf(longValue), this.clusterId});
                        deserialize.put(this.clusterId, new Tuple2(Double.valueOf(doubleValue), Long.valueOf(longValue)));
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        this.resourceMapSerializer.serialize(deserialize, new DataOutputViewStreamWrapper(byteArrayOutputStream));
                        this.sharedResourceMap.setValue(byteArrayOutputStream.toByteArray());
                        LOG.info("Succeed reservation resources for cluster {}", this.clusterId);
                    }
                    this.rescaleCounterLock.release();
                    return min;
                } catch (Exception e) {
                    throw new FlinkRuntimeException("Resource reservation error for yarn cluster " + this.clusterId, e);
                }
            } finally {
                this.rescaleCounterLock.release();
            }
        } catch (Exception e2) {
            throw new FlinkRuntimeException("Mutex acquire error during resource reservation for yarn cluster " + this.clusterId, e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.flink.yarn.ReservationProvider
    public void clearResourceReservation() {
        try {
            if (this.rescaleCounterLock.acquire(this.jobReservationTimeout.getSeconds(), TimeUnit.SECONDS)) {
                try {
                    try {
                        int count = this.rescaleSharedCounter.getCount();
                        Map deserialize = this.resourceMapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(this.sharedResourceMap.getValue())));
                        if (deserialize.remove(this.clusterId) != null) {
                            this.rescaleSharedCounter.setCount(count - 1);
                            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                            this.resourceMapSerializer.serialize(deserialize, new DataOutputViewStreamWrapper(byteArrayOutputStream));
                            this.sharedResourceMap.setValue(byteArrayOutputStream.toByteArray());
                            LOG.info("Resource reservation for cluster {} cleared", this.clusterId);
                        }
                        this.rescaleCounterLock.release();
                    } catch (Exception e) {
                        throw new FlinkRuntimeException("Resource reservation clearing error for yarn cluster " + this.clusterId, e);
                    }
                } catch (Throwable th) {
                    this.rescaleCounterLock.release();
                    throw th;
                }
            }
        } catch (Exception e2) {
            throw new FlinkRuntimeException("Mutex acquire error during resource reservation clearing for yarn cluster " + this.clusterId, e2);
        }
    }
}
