package org.apache.storm.hbase.state;

import com.google.common.collect.Maps;
import com.google.common.primitives.UnsignedBytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.apache.storm.hbase.common.ColumnList;
import org.apache.storm.hbase.common.HBaseClient;
import org.apache.storm.state.DefaultStateEncoder;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hbase/state/HBaseKeyValueState.class */
public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    public static final int ITERATOR_CHUNK_SIZE = 1000;
    private final byte[] keyNamespace;
    private final byte[] prepareNamespace;
    private final byte[] txidNamespace;
    private final String namespace;
    private final byte[] columnFamily;
    private final DefaultStateEncoder<K, V> encoder;
    private final HBaseClient hBaseClient;
    private ConcurrentNavigableMap<byte[], byte[]> pendingPrepare;
    private NavigableMap<byte[], byte[]> pendingCommit;
    private NavigableMap<byte[], byte[]> txIds;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    public static byte[] STATE_QUALIFIER = "s".getBytes();
    public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap(new TreeMap(UnsignedBytes.lexicographicalComparator()));
    private static byte[] COMMIT_TXID_KEY = "commit".getBytes();
    private static byte[] PREPARE_TXID_KEY = "prepare".getBytes();

    public HBaseKeyValueState(HBaseClient hBaseClient, String str, String str2) {
        this(hBaseClient, str, str2, new DefaultStateSerializer(), new DefaultStateSerializer());
    }

    public HBaseKeyValueState(HBaseClient hBaseClient, String str, String str2, Serializer<K> serializer, Serializer<V> serializer2) {
        this.hBaseClient = hBaseClient;
        this.columnFamily = str.getBytes();
        this.namespace = str2;
        this.keyNamespace = (str2 + "$key:").getBytes();
        this.prepareNamespace = (str2 + "$prepare").getBytes();
        this.txidNamespace = (str2 + "$txid").getBytes();
        this.encoder = new DefaultStateEncoder<>(serializer, serializer2);
        this.pendingPrepare = createPendingPrepareMap();
        initTxids();
        initPendingCommit();
    }

    private void initTxids() {
        HBaseProjectionCriteria hBaseProjectionCriteria = new HBaseProjectionCriteria();
        hBaseProjectionCriteria.addColumnFamily(this.columnFamily);
        try {
            Result result = this.hBaseClient.batchGet(Collections.singletonList(this.hBaseClient.constructGetRequests(this.txidNamespace, hBaseProjectionCriteria)))[0];
            if (result.isEmpty()) {
                this.txIds = new TreeMap(UnsignedBytes.lexicographicalComparator());
            } else {
                this.txIds = new TreeMap((SortedMap) result.getFamilyMap(this.columnFamily));
            }
            LOG.debug("initTxids, txIds {}", this.txIds);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void initPendingCommit() {
        HBaseProjectionCriteria hBaseProjectionCriteria = new HBaseProjectionCriteria();
        hBaseProjectionCriteria.addColumnFamily(this.columnFamily);
        try {
            Result result = this.hBaseClient.batchGet(Collections.singletonList(this.hBaseClient.constructGetRequests(this.prepareNamespace, hBaseProjectionCriteria)))[0];
            if (result.isEmpty()) {
                LOG.debug("No previously prepared commits.");
                this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            } else {
                LOG.debug("Loading previously prepared commit from {}", this.prepareNamespace);
                this.pendingCommit = Maps.unmodifiableNavigableMap(result.getFamilyMap(this.columnFamily));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void put(K k, V v) {
        LOG.debug("put key '{}', value '{}'", k, v);
        this.pendingPrepare.put(this.encoder.encodeKey(k), this.encoder.encodeValue(v));
    }

    public V get(K k) {
        byte[] value;
        LOG.debug("get key '{}'", k);
        byte[] encodeKey = this.encoder.encodeKey(k);
        if (this.pendingPrepare.containsKey(encodeKey)) {
            value = (byte[]) this.pendingPrepare.get(encodeKey);
        } else if (this.pendingCommit.containsKey(encodeKey)) {
            value = (byte[]) this.pendingCommit.get(encodeKey);
        } else {
            HBaseProjectionCriteria hBaseProjectionCriteria = new HBaseProjectionCriteria();
            HBaseProjectionCriteria.ColumnMetaData columnMetaData = new HBaseProjectionCriteria.ColumnMetaData(this.columnFamily, STATE_QUALIFIER);
            hBaseProjectionCriteria.addColumn(columnMetaData);
            try {
                value = this.hBaseClient.batchGet(Collections.singletonList(this.hBaseClient.constructGetRequests(getRowKeyForStateKey(encodeKey), hBaseProjectionCriteria)))[0].getValue(columnMetaData.getColumnFamily(), columnMetaData.getQualifier());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        Object obj = null;
        if (value != null) {
            obj = this.encoder.decodeValue(value);
        }
        LOG.debug("Value for key '{}' is '{}'", k, obj);
        return (V) obj;
    }

    public V get(K k, V v) {
        V v2 = get(k);
        return v2 != null ? v2 : v;
    }

    public V delete(K k) {
        LOG.debug("delete key '{}'", k);
        byte[] encodeKey = this.encoder.encodeKey(k);
        V v = get(k);
        this.pendingPrepare.put(encodeKey, this.encoder.getTombstoneValue());
        return v;
    }

    public Iterator<Map.Entry<K, V>> iterator() {
        return (Iterator<Map.Entry<K, V>>) new HBaseKeyValueStateIterator(this.namespace, this.columnFamily, this.hBaseClient, this.pendingPrepare.entrySet().iterator(), this.pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, this.encoder.getKeySerializer(), this.encoder.getValueSerializer());
    }

    public void prepareCommit(long j) {
        LOG.debug("prepareCommit txid {}", Long.valueOf(j));
        validatePrepareTxid(j);
        try {
            ConcurrentNavigableMap<byte[], byte[]> concurrentNavigableMap = this.pendingPrepare;
            this.pendingPrepare = createPendingPrepareMap();
            if (getColumnFamily(this.prepareNamespace, this.columnFamily).isEmpty()) {
                LOG.debug("Nothing to save for prepareCommit, txid {}.", Long.valueOf(j));
            } else {
                LOG.debug("Prepared txn already exists, will merge", Long.valueOf(j));
                for (Map.Entry<byte[], byte[]> entry : this.pendingCommit.entrySet()) {
                    if (!concurrentNavigableMap.containsKey(entry.getKey())) {
                        concurrentNavigableMap.put(entry.getKey(), entry.getValue());
                    }
                }
            }
            if (concurrentNavigableMap.isEmpty()) {
                LOG.debug("Nothing to save for prepareCommit, txid {}.", Long.valueOf(j));
            } else {
                mutateRow(this.prepareNamespace, this.columnFamily, concurrentNavigableMap);
            }
            this.txIds.put(PREPARE_TXID_KEY, String.valueOf(j).getBytes());
            mutateRow(this.txidNamespace, this.columnFamily, this.txIds);
            this.pendingCommit = Maps.unmodifiableNavigableMap(concurrentNavigableMap);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void commit(long j) {
        LOG.debug("commit txid {}", Long.valueOf(j));
        validateCommitTxid(j);
        try {
            if (this.pendingCommit.isEmpty()) {
                LOG.debug("Nothing to save for commit, txid {}.", Long.valueOf(j));
            } else {
                applyPendingStateToHBase(this.pendingCommit);
            }
            this.txIds.put(COMMIT_TXID_KEY, String.valueOf(j).getBytes());
            mutateRow(this.txidNamespace, this.columnFamily, this.txIds);
            deleteRow(this.prepareNamespace);
            this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void commit() {
        if (this.pendingPrepare.isEmpty()) {
            LOG.debug("Nothing to save for commit");
        } else {
            try {
                applyPendingStateToHBase(this.pendingPrepare);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.pendingPrepare = createPendingPrepareMap();
    }

    public void rollback() {
        LOG.debug("rollback");
        try {
            if (existsRow(this.prepareNamespace)) {
                deleteRow(this.prepareNamespace);
            } else {
                LOG.debug("Nothing to rollback, prepared data is empty");
            }
            Long lastCommittedTxid = lastCommittedTxid();
            if (lastCommittedTxid != null) {
                this.txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedTxid).getBytes());
            } else {
                this.txIds.remove(PREPARE_TXID_KEY);
            }
            if (!this.txIds.isEmpty()) {
                LOG.debug("put txidNamespace {}, txIds {}", this.txidNamespace, this.txIds);
                mutateRow(this.txidNamespace, this.columnFamily, this.txIds);
            }
            this.pendingCommit = EMPTY_PENDING_COMMIT_MAP;
            this.pendingPrepare = createPendingPrepareMap();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void validatePrepareTxid(long j) {
        Long lastCommittedTxid = lastCommittedTxid();
        if (lastCommittedTxid != null && j <= lastCommittedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' for prepare. Txid '" + lastCommittedTxid + "' is already committed");
        }
    }

    private void validateCommitTxid(long j) {
        Long lastCommittedTxid = lastCommittedTxid();
        if (lastCommittedTxid != null && j < lastCommittedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' txid '" + lastCommittedTxid + "' is already committed");
        }
        Long lastPreparedTxid = lastPreparedTxid();
        if (lastPreparedTxid != null && j != lastPreparedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' not same as prepared txid '" + lastPreparedTxid + "'");
        }
    }

    private Long lastCommittedTxid() {
        return lastId(COMMIT_TXID_KEY);
    }

    private Long lastPreparedTxid() {
        return lastId(PREPARE_TXID_KEY);
    }

    private Long lastId(byte[] bArr) {
        Long l = null;
        byte[] bArr2 = (byte[]) this.txIds.get(bArr);
        if (bArr2 != null) {
            l = Long.valueOf(new String(bArr2));
        }
        return l;
    }

    private byte[] getRowKeyForStateKey(byte[] bArr) {
        byte[] bArr2 = new byte[this.keyNamespace.length + bArr.length];
        System.arraycopy(this.keyNamespace, 0, bArr2, 0, this.keyNamespace.length);
        System.arraycopy(bArr, 0, bArr2, this.keyNamespace.length, bArr.length);
        return bArr2;
    }

    private void applyPendingStateToHBase(NavigableMap<byte[], byte[]> navigableMap) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<byte[], byte[]> entry : navigableMap.entrySet()) {
            byte[] key = entry.getKey();
            byte[] value = entry.getValue();
            if (Arrays.equals(value, this.encoder.getTombstoneValue())) {
                arrayList.add(new Delete(getRowKeyForStateKey(key)));
            } else {
                arrayList.addAll(prepareMutateRow(getRowKeyForStateKey(key), this.columnFamily, Collections.singletonMap(STATE_QUALIFIER, value)));
            }
        }
        this.hBaseClient.batchMutate(arrayList);
    }

    private Result getColumnFamily(byte[] bArr, byte[] bArr2) throws Exception {
        HBaseProjectionCriteria hBaseProjectionCriteria = new HBaseProjectionCriteria();
        hBaseProjectionCriteria.addColumnFamily(bArr2);
        return this.hBaseClient.batchGet(Collections.singletonList(this.hBaseClient.constructGetRequests(bArr, hBaseProjectionCriteria)))[0];
    }

    private List<Mutation> prepareMutateRow(byte[] bArr, byte[] bArr2, Map<byte[], byte[]> map) throws IOException {
        return prepareMutateRow(bArr, bArr2, map, Durability.USE_DEFAULT);
    }

    private List<Mutation> prepareMutateRow(byte[] bArr, byte[] bArr2, Map<byte[], byte[]> map, Durability durability) throws IOException {
        return this.hBaseClient.constructMutationReq(bArr, buildColumnList(bArr2, map), durability);
    }

    private void mutateRow(byte[] bArr, byte[] bArr2, Map<byte[], byte[]> map) throws Exception {
        mutateRow(bArr, bArr2, map, Durability.USE_DEFAULT);
    }

    private void mutateRow(byte[] bArr, byte[] bArr2, Map<byte[], byte[]> map, Durability durability) throws Exception {
        this.hBaseClient.batchMutate(prepareMutateRow(bArr, bArr2, map, durability));
    }

    private boolean existsRow(byte[] bArr) throws Exception {
        return this.hBaseClient.exists(new Get(bArr));
    }

    private void deleteRow(byte[] bArr) throws Exception {
        this.hBaseClient.batchMutate(Collections.singletonList(new Delete(bArr)));
    }

    private ColumnList buildColumnList(byte[] bArr, Map<byte[], byte[]> map) {
        ColumnList columnList = new ColumnList();
        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
            columnList.addColumn(bArr, entry.getKey(), entry.getValue());
        }
        return columnList;
    }

    private ConcurrentNavigableMap<byte[], byte[]> createPendingPrepareMap() {
        return new ConcurrentSkipListMap(UnsignedBytes.lexicographicalComparator());
    }
}
