package org.apache.hudi.common.util.collection;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap.class */
public final class BitCaskDiskMap<T extends Serializable, R extends Serializable> extends DiskMap<T, R> {
    public static final int BUFFER_SIZE = 131072;
    private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
    private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF = ThreadLocal.withInitial(CompressionHandler::new);
    private final Map<T, ValueMetadata> valueMetadataMap;
    private final boolean isCompressionEnabled;
    private final File writeOnlyFile;
    private final SizeAwareDataOutputStream writeOnlyFileHandle;
    private final FileOutputStream fileOutputStream;
    private final AtomicLong filePosition;
    private final String filePath;
    private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile;
    private final Queue<BufferedRandomAccessFile> openedAccessFiles;
    private final List<ClosableIterator<R>> iterators;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap$CompressionHandler.class */
    public static class CompressionHandler implements Serializable {
        private static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576;
        private static final int DECOMPRESS_INTERMEDIATE_BUFFER_SIZE = 8192;
        private final transient ByteArrayOutputStream compressBaos = new ByteArrayOutputStream(1048576);
        private final transient ByteArrayOutputStream decompressBaos = new ByteArrayOutputStream(1048576);
        private final byte[] decompressIntermediateBuffer = new byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE];

        CompressionHandler() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public byte[] compressBytes(byte[] bArr) throws IOException {
            this.compressBaos.reset();
            Deflater deflater = new Deflater(9);
            DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(this.compressBaos, deflater);
            try {
                deflaterOutputStream.write(bArr);
                deflaterOutputStream.close();
                deflater.end();
                return this.compressBaos.toByteArray();
            } catch (Throwable th) {
                deflaterOutputStream.close();
                deflater.end();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public byte[] decompressBytes(byte[] bArr) throws IOException {
            this.decompressBaos.reset();
            InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(bArr));
            while (true) {
                try {
                    int read = inflaterInputStream.read(this.decompressIntermediateBuffer);
                    if (read <= 0) {
                        return this.decompressBaos.toByteArray();
                    }
                    this.decompressBaos.write(this.decompressIntermediateBuffer, 0, read);
                } catch (IOException e) {
                    throw new HoodieIOException("IOException while decompressing bytes", e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap$FileEntry.class */
    public static final class FileEntry {
        private Long crc;
        private Integer sizeOfKey;
        private Integer sizeOfValue;
        private byte[] key;
        private byte[] value;
        private Long timestamp;

        public FileEntry(long j, int i, int i2, byte[] bArr, byte[] bArr2, long j2) {
            this.crc = Long.valueOf(j);
            this.sizeOfKey = Integer.valueOf(i);
            this.sizeOfValue = Integer.valueOf(i2);
            this.key = bArr;
            this.value = bArr2;
            this.timestamp = Long.valueOf(j2);
        }

        public long getCrc() {
            return this.crc.longValue();
        }

        public int getSizeOfKey() {
            return this.sizeOfKey.intValue();
        }

        public int getSizeOfValue() {
            return this.sizeOfValue.intValue();
        }

        public byte[] getKey() {
            return this.key;
        }

        public byte[] getValue() {
            return this.value;
        }

        public long getTimestamp() {
            return this.timestamp.longValue();
        }
    }

    /* loaded from: input_file:org/apache/hudi/common/util/collection/BitCaskDiskMap$ValueMetadata.class */
    public static final class ValueMetadata implements Comparable<ValueMetadata> {
        private String filePath;
        private Integer sizeOfValue;
        private Long offsetOfValue;
        private Long timestamp;

        protected ValueMetadata(String str, int i, long j, long j2) {
            this.filePath = str;
            this.sizeOfValue = Integer.valueOf(i);
            this.offsetOfValue = Long.valueOf(j);
            this.timestamp = Long.valueOf(j2);
        }

        public String getFilePath() {
            return this.filePath;
        }

        public int getSizeOfValue() {
            return this.sizeOfValue.intValue();
        }

        public Long getOffsetOfValue() {
            return this.offsetOfValue;
        }

        public long getTimestamp() {
            return this.timestamp.longValue();
        }

        @Override // java.lang.Comparable
        public int compareTo(ValueMetadata valueMetadata) {
            int compare = Long.compare(this.offsetOfValue.longValue(), valueMetadata.offsetOfValue.longValue());
            return (compare == 0 && this.offsetOfValue.equals(valueMetadata.offsetOfValue)) ? compare : compare;
        }
    }

    public BitCaskDiskMap(String str, boolean z) throws IOException {
        super(str, ExternalSpillableMap.DiskMapType.BITCASK.name());
        this.randomAccessFile = new ThreadLocal<>();
        this.openedAccessFiles = new ConcurrentLinkedQueue();
        this.iterators = new ArrayList();
        this.valueMetadataMap = new ConcurrentHashMap();
        this.isCompressionEnabled = z;
        this.writeOnlyFile = new File(this.diskMapPath, UUID.randomUUID().toString());
        this.filePath = this.writeOnlyFile.getPath();
        initFile(this.writeOnlyFile);
        this.fileOutputStream = new FileOutputStream(this.writeOnlyFile, true);
        this.writeOnlyFileHandle = new SizeAwareDataOutputStream(this.fileOutputStream, BUFFER_SIZE);
        this.filePosition = new AtomicLong(0L);
    }

    public BitCaskDiskMap(String str) throws IOException {
        this(str, false);
    }

    private BufferedRandomAccessFile getRandomAccessFile() {
        try {
            BufferedRandomAccessFile bufferedRandomAccessFile = this.randomAccessFile.get();
            if (bufferedRandomAccessFile == null) {
                bufferedRandomAccessFile = new BufferedRandomAccessFile(this.filePath, "r", BUFFER_SIZE);
                bufferedRandomAccessFile.seek(0L);
                this.randomAccessFile.set(bufferedRandomAccessFile);
                this.openedAccessFiles.offer(bufferedRandomAccessFile);
            }
            return bufferedRandomAccessFile;
        } catch (IOException e) {
            throw new HoodieException(e);
        }
    }

    private void initFile(File file) throws IOException {
        if (file.exists() && !file.delete()) {
            LOG.warn("Failed to delete file, path is " + file.getCanonicalPath());
        }
        File parentFile = file.getParentFile();
        if (!parentFile.exists()) {
            if (!parentFile.mkdir()) {
                throw new HoodieException("Failed to mkdir, path is " + parentFile.getCanonicalPath());
            }
            parentFile.setReadable(true, false);
            if (!parentFile.setWritable(true, false)) {
                throw new HoodieException("Failed to setWritable for dir, path is " + parentFile.getCanonicalPath());
            }
            parentFile.setExecutable(true, false);
        }
        if (!file.createNewFile()) {
            throw new HoodieException("Failed to create file, path is " + file.getCanonicalPath());
        }
        LOG.info("Spilling to file location " + file.getAbsolutePath() + " in host (" + InetAddress.getLocalHost().getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName() + ")");
        file.deleteOnExit();
    }

    private void flushToDisk() {
        try {
            this.writeOnlyFileHandle.flush();
        } catch (IOException e) {
            throw new HoodieIOException("Failed to flush to BitCaskDiskMap file", e);
        }
    }

    @Override // java.lang.Iterable
    public Iterator<R> iterator() {
        ClosableIterator<R> it = new LazyFileIterable(this.filePath, this.valueMetadataMap, this.isCompressionEnabled).iterator();
        this.iterators.add(it);
        return it;
    }

    @Override // org.apache.hudi.common.util.collection.DiskMap
    public long sizeOfFileOnDiskInBytes() {
        return this.filePosition.get();
    }

    @Override // java.util.Map
    public int size() {
        return this.valueMetadataMap.size();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        return this.valueMetadataMap.isEmpty();
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        return this.valueMetadataMap.containsKey(obj);
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        throw new HoodieNotSupportedException("unable to compare values in map");
    }

    @Override // java.util.Map
    public R get(Object obj) {
        ValueMetadata valueMetadata = this.valueMetadataMap.get(obj);
        if (valueMetadata == null) {
            return null;
        }
        return get(valueMetadata);
    }

    private R get(ValueMetadata valueMetadata) {
        return (R) get(valueMetadata, getRandomAccessFile(), this.isCompressionEnabled);
    }

    public static <R> R get(ValueMetadata valueMetadata, RandomAccessFile randomAccessFile, boolean z) {
        try {
            byte[] readBytesFromDisk = SpillableMapUtils.readBytesFromDisk(randomAccessFile, valueMetadata.getOffsetOfValue().longValue(), valueMetadata.getSizeOfValue());
            return z ? (R) SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(readBytesFromDisk)) : (R) SerializationUtils.deserialize(readBytesFromDisk);
        } catch (IOException e) {
            throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
        }
    }

    private synchronized R put(T t, R r, boolean z) {
        try {
            byte[] compressBytes = this.isCompressionEnabled ? DISK_COMPRESSION_REF.get().compressBytes(SerializationUtils.serialize(r)) : SerializationUtils.serialize(r);
            Integer valueOf = Integer.valueOf(compressBytes.length);
            Long valueOf2 = Long.valueOf(System.currentTimeMillis());
            this.valueMetadataMap.put(t, new ValueMetadata(this.filePath, valueOf.intValue(), this.filePosition.get(), valueOf2.longValue()));
            byte[] serialize = SerializationUtils.serialize(t);
            this.filePosition.set(SpillableMapUtils.spillToDisk(this.writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(compressBytes), serialize.length, valueOf.intValue(), serialize, compressBytes, valueOf2.longValue())));
            if (z) {
                flushToDisk();
            }
            return r;
        } catch (IOException e) {
            throw new HoodieIOException("Unable to store data in Disk Based map", e);
        }
    }

    @Override // java.util.Map
    public R put(T t, R r) {
        return put(t, r, true);
    }

    @Override // java.util.Map
    public R remove(Object obj) {
        R r = get(obj);
        this.valueMetadataMap.remove(obj);
        return r;
    }

    @Override // java.util.Map
    public void putAll(Map<? extends T, ? extends R> map) {
        for (Map.Entry<? extends T, ? extends R> entry : map.entrySet()) {
            put(entry.getKey(), entry.getValue(), false);
        }
        flushToDisk();
    }

    @Override // java.util.Map
    public void clear() {
        this.valueMetadataMap.clear();
    }

    @Override // org.apache.hudi.common.util.collection.DiskMap
    public void close() {
        this.valueMetadataMap.clear();
        try {
            if (this.writeOnlyFileHandle != null) {
                this.writeOnlyFileHandle.flush();
                this.fileOutputStream.getChannel().force(false);
                this.writeOnlyFileHandle.close();
            }
            while (!this.openedAccessFiles.isEmpty()) {
                BufferedRandomAccessFile poll = this.openedAccessFiles.poll();
                if (null != poll) {
                    try {
                        poll.close();
                    } catch (IOException e) {
                    }
                }
            }
            if (!this.writeOnlyFile.delete()) {
                LOG.warn("Failed to delete file, path is " + this.writeOnlyFile.getCanonicalPath());
            }
            this.iterators.forEach((v0) -> {
                v0.close();
            });
        } catch (Exception e2) {
            this.writeOnlyFile.delete();
        } finally {
            super.close();
        }
    }

    @Override // java.util.Map
    public Set<T> keySet() {
        return this.valueMetadataMap.keySet();
    }

    @Override // java.util.Map
    public Collection<R> values() {
        throw new HoodieException("Unsupported Operation Exception");
    }

    @Override // org.apache.hudi.common.util.collection.DiskMap
    public Stream<R> valueStream() {
        BufferedRandomAccessFile randomAccessFile = getRandomAccessFile();
        return ((Stream) this.valueMetadataMap.values().stream().sorted().sequential()).map(valueMetadata -> {
            return (Serializable) get(valueMetadata, randomAccessFile, this.isCompressionEnabled);
        });
    }

    @Override // java.util.Map
    public Set<Map.Entry<T, R>> entrySet() {
        HashSet hashSet = new HashSet();
        for (T t : this.valueMetadataMap.keySet()) {
            hashSet.add(new AbstractMap.SimpleEntry(t, get((Object) t)));
        }
        return hashSet;
    }
}
