package org.apache.flink.runtime.state.heap.space;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.state.heap.SpillableOptions;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.xbill.DNS.TTL;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/space/SpaceAllocator.class */
public class SpaceAllocator implements Allocator {
    private volatile Chunk[] totalSpace;
    private final List<Chunk> totalSpaceForNormal;
    private final List<Chunk> totalSpaceForHuge;
    private final AtomicInteger chunkIdGenerator;
    private final ChunkAllocator chunkAllocator;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/space/SpaceAllocator$SpaceType.class */
    public enum SpaceType {
        HEAP,
        OFFHEAP,
        MMAP
    }

    public SpaceAllocator(Configuration configuration, @Nullable File[] fileArr) {
        this.totalSpace = new Chunk[16];
        this.totalSpaceForNormal = new ArrayList();
        this.totalSpaceForHuge = new ArrayList();
        this.chunkIdGenerator = new AtomicInteger(0);
        this.chunkAllocator = createChunkAllocator(configuration, fileArr);
    }

    @VisibleForTesting
    SpaceAllocator(ChunkAllocator chunkAllocator) {
        this.totalSpace = new Chunk[16];
        this.totalSpaceForNormal = new ArrayList();
        this.totalSpaceForHuge = new ArrayList();
        this.chunkIdGenerator = new AtomicInteger(0);
        this.chunkAllocator = (ChunkAllocator) Preconditions.checkNotNull(chunkAllocator);
    }

    @VisibleForTesting
    void addTotalSpace(Chunk chunk, int i) {
        if (i >= this.totalSpace.length) {
            Chunk[] chunkArr = new Chunk[this.totalSpace.length * 2];
            System.arraycopy(this.totalSpace, 0, chunkArr, 0, this.totalSpace.length);
            this.totalSpace = chunkArr;
        }
        this.totalSpace[i] = chunk;
    }

    @Override // org.apache.flink.runtime.state.heap.space.Allocator
    public long allocate(int i) {
        return i >= 1048576 ? doAllocate(this.totalSpaceForHuge, i, AllocateStrategy.HugeBucket) : doAllocate(this.totalSpaceForNormal, i, AllocateStrategy.SmallBucket);
    }

    @Override // org.apache.flink.runtime.state.heap.space.Allocator
    public void free(long j) {
        int chunkIdByAddress = SpaceUtils.getChunkIdByAddress(j);
        getChunkById(chunkIdByAddress).free(SpaceUtils.getChunkOffsetByAddress(j));
    }

    @Override // org.apache.flink.runtime.state.heap.space.Allocator
    public Chunk getChunkById(int i) {
        return this.totalSpace[i];
    }

    private long doAllocate(List<Chunk> list, int i, AllocateStrategy allocateStrategy) {
        Iterator<Chunk> it = list.iterator();
        while (it.hasNext()) {
            int allocate = it.next().allocate(i);
            if (allocate != -1) {
                return ((r0.getChunkId() & 4294967295L) << 32) | (allocate & 4294967295L);
            }
        }
        Chunk createChunk = createChunk(allocateStrategy);
        list.add(createChunk);
        addTotalSpace(createChunk, createChunk.getChunkId());
        int allocate2 = createChunk.allocate(i);
        if (allocate2 != -1) {
            return ((createChunk.getChunkId() & 4294967295L) << 32) | (allocate2 & 4294967295L);
        }
        throw new RuntimeException("There is no space to allocate");
    }

    private Chunk createChunk(AllocateStrategy allocateStrategy) {
        return this.chunkAllocator.createChunk(this.chunkIdGenerator.getAndIncrement(), allocateStrategy);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.chunkAllocator.close();
    }

    @VisibleForTesting
    ChunkAllocator getChunkAllocator() {
        return this.chunkAllocator;
    }

    @VisibleForTesting
    AtomicInteger getChunkIdGenerator() {
        return this.chunkIdGenerator;
    }

    private static ChunkAllocator createChunkAllocator(Configuration configuration, @Nullable File[] fileArr) {
        SpaceType valueOf = SpaceType.valueOf(((String) configuration.get(SpillableOptions.SPACE_TYPE)).toUpperCase());
        long bytes = ((MemorySize) configuration.get(SpillableOptions.CHUNK_SIZE)).getBytes();
        Preconditions.checkArgument(bytes <= TTL.MAX_VALUE, "Chunk size should be less than Integer.MAX_VALUE, but is actually %s", Long.valueOf(bytes));
        Preconditions.checkArgument(MathUtils.isPowerOf2(bytes), "Chunk size should be a power of two, but is actually %s", Long.valueOf(bytes));
        switch (valueOf) {
            case HEAP:
                return new HeapBufferChunkAllocator((int) bytes);
            case OFFHEAP:
                return new DirectBufferChunkAllocator((int) bytes);
            case MMAP:
                return new MmapChunkAllocator((int) bytes, fileArr, configuration);
            default:
                throw new UnsupportedOperationException("Unsupported space type " + valueOf.name());
        }
    }
}
