package org.lemon.ipc;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.lemon.common.Configurations;
import org.lemon.common.HBaseUtils;
import org.lemon.common.IndexCellWrapper;
import org.lemon.common.LemonConstants;
import org.lemon.filter.ListKeysFilter;
import org.lemon.filter.PagingQueryFilter;
import org.lemon.index.ShardServiceException;
import org.lemon.protobuf.LemonServices;
import org.lemon.shard.QueryService;
import org.lemon.shard.Shard;
import org.lemon.shard.ShardContainer;
import org.lemon.shard.WriteStatus;

/* loaded from: input_file:org/lemon/ipc/IndexCoprocessor.class */
public class IndexCoprocessor extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(IndexCoprocessor.class);
    private static final ShardContainer shardContainer = new ShardContainer();
    private static final Map<byte[], Region> key2Regions = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR);
    private TableName indexTableName;
    private Shard shard;
    private byte[] shardKey;
    private boolean traceEnabled;
    private int queryRequestKeyLength;

    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
            Configuration configuration = coprocessorEnvironment.getConfiguration();
            if (!shardContainer.isInitialized()) {
                shardContainer.initialize(configuration);
            }
            HRegionInfo regionInfo = ((RegionCoprocessorEnvironment) coprocessorEnvironment).getRegionInfo();
            shardContainer.ensureBitmapDataDirExist(regionInfo);
            this.indexTableName = regionInfo.getTable();
            this.shardKey = HBaseUtils.getShardKey(regionInfo.getStartKey());
            this.queryRequestKeyLength = this.shardKey.length + 8 + RpcRequestWrapper.QUERY_REQUEST_SUFFIX.length;
        }
    }

    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
        this.shard = shardContainer.openShard(observerContext.getEnvironment().getRegion());
        this.traceEnabled = observerContext.getEnvironment().getConfiguration().getBoolean(Configurations.Optional.QUERY_DUMP_TRACING, true);
        if (this.traceEnabled) {
            LOG.info("Tracing log is enabled");
        }
    }

    public void prePut(ObserverContext<RegionCoprocessorEnvironment> observerContext, Put put, WALEdit wALEdit, Durability durability) throws IOException {
        try {
            handleIndexPut(observerContext, put);
        } catch (RuntimeException e) {
            throw new DoNotRetryIOException(e);
        }
    }

    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> observerContext, Get get, List<Cell> list) throws IOException {
        Map familyMap;
        NavigableSet navigableSet;
        try {
            byte[] row = get.getRow();
            if (row.length != this.queryRequestKeyLength || !org.lemon.common.Bytes.endOf(row, RpcRequestWrapper.QUERY_REQUEST_SUFFIX) || get == null || (familyMap = get.getFamilyMap()) == null || (navigableSet = (NavigableSet) familyMap.get(LemonConstants.INDEX_FAMILY)) == null) {
                return;
            }
            observerContext.bypass();
            long j = Bytes.toLong(row, this.shardKey.length, 8);
            if (navigableSet.contains(RpcRequestWrapper.QUERY_HINT)) {
                if (this.traceEnabled) {
                    LOG.info("Receiving one normal query request " + j);
                }
                handleQueryRequest(j, get, list, navigableSet.contains(RpcRequestWrapper.QUERY_PACKED_HINT));
            } else if (navigableSet.contains(RpcRequestWrapper.LISTKEYS_HINT)) {
                if (this.traceEnabled) {
                    LOG.info("Receiving one list keys request " + j);
                }
                handleListKeysRequest(j, get, list);
            }
        } catch (RuntimeException e) {
            throw new DoNotRetryIOException(e);
        }
    }

    public void postClose(ObserverContext<RegionCoprocessorEnvironment> observerContext, boolean z) {
        key2Regions.remove(HBaseUtils.getShardKey(observerContext.getEnvironment().getRegion().getRegionInfo().getStartKey()));
        shardContainer.closeShard((HRegion) observerContext.getEnvironment().getRegion());
    }

    private void handleIndexPut(ObserverContext<RegionCoprocessorEnvironment> observerContext, Put put) throws IOException {
        if (put.get(LemonConstants.INDEX_FAMILY, RpcRequestWrapper.TERMS_QUALIFIER).isEmpty()) {
            return;
        }
        if (put.has(LemonConstants.INDEX_FAMILY, RpcRequestWrapper.PACKED_INDEX_HINT)) {
            handleBatchIndexRequest(put);
        } else {
            handleSingleIndexRequest(put);
        }
        observerContext.bypass();
    }

    private void handleBatchIndexRequest(Put put) throws IOException {
        int size;
        Optional findFirst = put.get(LemonConstants.INDEX_FAMILY, RpcRequestWrapper.TERMS_QUALIFIER).stream().findFirst();
        if (!findFirst.isPresent()) {
            LOG.warn("Illegal packed index put without find any term cells");
            return;
        }
        try {
            List dataPerRegionList = LemonServices.PackedIndexData.parseFrom(CellUtil.cloneValue((Cell) findFirst.get())).getDataPerRegionList();
            if (dataPerRegionList == null || (size = dataPerRegionList.size()) == 0) {
                LOG.warn("Illegal packed index put without find data for any region");
                return;
            }
            Shard[] shardArr = new Shard[size];
            for (int i = 0; i < size; i++) {
                byte[] byteArray = ((LemonServices.IndexDataPerRegion) dataPerRegionList.get(i)).getRegionStartKey().toByteArray();
                Optional<Shard> shard = shardContainer.getShard(this.indexTableName, byteArray);
                if (!shard.isPresent()) {
                    ShardServiceException shardServiceException = new ShardServiceException("Shard with start key " + Bytes.toString(byteArray) + " of table " + this.indexTableName + "does not exist");
                    LOG.error(shardServiceException);
                    throw shardServiceException;
                }
                shardArr[i] = shard.get();
            }
            if (size == 1) {
                try {
                    shardArr[0].write((List<WriteStatus>) ((LemonServices.IndexDataPerRegion) dataPerRegionList.get(0)).getIndexRecordList().stream().map(indexData -> {
                        return new WriteStatus(indexData.getRow().toByteArray(), (List) indexData.getTermList().stream().map(byteString -> {
                            return byteString.toByteArray();
                        }).collect(Collectors.toList()));
                    }).collect(Collectors.toList()));
                    return;
                } catch (IOException e) {
                    LOG.error("Write shard " + shardArr[0].getShardName() + " failed", e);
                    throw new ShardServiceException(e);
                }
            }
            ArrayList arrayList = new ArrayList(size);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            for (int i2 = 0; i2 < size; i2++) {
                CompletableFuture<Void> writeAsync = shardArr[i2].writeAsync((List) ((LemonServices.IndexDataPerRegion) dataPerRegionList.get(i2)).getIndexRecordList().stream().map(indexData2 -> {
                    return new WriteStatus(indexData2.getRow().toByteArray(), (List) indexData2.getTermList().stream().map(byteString -> {
                        return byteString.toByteArray();
                    }).collect(Collectors.toList()));
                }).collect(Collectors.toList()));
                arrayList.add(writeAsync);
                writeAsync.exceptionally(th -> {
                    atomicInteger.incrementAndGet();
                    return null;
                });
            }
            try {
                CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).join();
            } catch (CompletionException e2) {
                LOG.error("Got " + atomicInteger.get() + " shard write failures", e2);
                throw new ShardServiceException(e2);
            }
        } catch (InvalidProtocolBufferException e3) {
            LOG.error("Illegal packed index data that cannot readable", e3);
        }
    }

    private void handleSingleIndexRequest(Put put) throws IOException {
        if (this.shard == null) {
            throw new DoNotRetryIOException("Shard is un-initialized, shard Key: " + Bytes.toString(this.shardKey));
        }
        Optional findFirst = put.get(LemonConstants.INDEX_FAMILY, RpcRequestWrapper.TERMS_QUALIFIER).stream().findFirst();
        if (!findFirst.isPresent()) {
            LOG.warn("Illegal single index put without find any term cells");
            return;
        }
        ByteBuffer wrap = ByteBuffer.wrap(CellUtil.cloneValue((Cell) findFirst.get()));
        int i = wrap.getShort();
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            byte[] bArr = new byte[wrap.getShort()];
            wrap.get(bArr);
            arrayList.add(bArr);
        }
        this.shard.write(new WriteStatus(put.getRow(), arrayList));
    }

    private void handleQueryRequest(long j, Get get, List<Cell> list, boolean z) {
        LemonServices.QueryRequest extractQueryRequest = extractQueryRequest(get);
        if (extractQueryRequest == null) {
            if (this.traceEnabled) {
                tracing("Seems like a query request but could not build query request");
                return;
            }
            return;
        }
        if (!z) {
            QueryService.ResponsePerShard query = this.shard.query(extractQueryRequest);
            if (query != null) {
                buildResultCells(extractQueryRequest, query, list);
                return;
            }
            return;
        }
        if (this.traceEnabled) {
            LOG.info("Packed query request: " + j);
        }
        Optional<Map<byte[], Shard>> shards = shardContainer.getShards(this.indexTableName);
        if (!shards.isPresent()) {
            LOG.warn("Shard of index table " + this.indexTableName + " does not exist on this server");
            return;
        }
        Map<byte[], Shard> map = shards.get();
        ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR);
        Collection<Shard> values = map.values();
        CountDownLatch countDownLatch = new CountDownLatch(values.size());
        values.forEach(shard -> {
            shard.queryAsync(extractQueryRequest).whenComplete((responsePerShard, th) -> {
                if (responsePerShard != null) {
                    concurrentSkipListMap.put(shard.getShardKey(), responsePerShard);
                }
                if (this.traceEnabled) {
                    LOG.info("Query response from " + shard.getShardName() + ": " + responsePerShard + ". Request ID: " + j);
                }
                countDownLatch.countDown();
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOG.warn("Latch on shard queries got interrupted");
        }
        buildResultCells(j, extractQueryRequest, concurrentSkipListMap, list);
    }

    private void handleListKeysRequest(long j, Get get, List<Cell> list) {
        SingleColumnValueFilter filter = get.getFilter();
        if (filter == null || !(filter instanceof SingleColumnValueFilter)) {
            LOG.warn("Filter was not set correctly for list keys request");
            return;
        }
        SingleColumnValueFilter singleColumnValueFilter = filter;
        ByteArrayComparable comparator = singleColumnValueFilter.getComparator();
        if (comparator == null || comparator.getValue() == null) {
            LOG.warn("Query condition was not set correctly");
            return;
        }
        byte[] value = comparator.getValue();
        byte[] qualifier = singleColumnValueFilter.getQualifier();
        if (Bytes.equals(RpcRequestWrapper.LISTKEYS_HINT, qualifier)) {
            List<Integer> resolve = ListKeysFilter.resolve(value);
            if (resolve == null) {
                LOG.warn("Could not resolve list key request");
                return;
            }
            try {
                buildEntityKeysCell(j, this.shard.getEntityKeys(resolve), list);
                return;
            } catch (IOException e) {
                LOG.warn("List entity keys failed, request id: " + j, e);
                return;
            }
        }
        if (!Bytes.equals(RpcRequestWrapper.PAGING_QUERY_HINT, qualifier)) {
            LOG.warn("Unknown list keys request");
            return;
        }
        LemonServices.PagingQueryRequest resolve2 = PagingQueryFilter.resolve(value);
        if (resolve2 == null) {
            LOG.warn("Could not resolve paging query request");
            return;
        }
        try {
            buildEntityKeysCell(j, this.shard.getEntityKeys(resolve2.getQuery(), resolve2.getStart(), resolve2.getLimit()), list);
        } catch (IOException e2) {
            LOG.warn("Paging query failed, request id: " + j, e2);
        }
    }

    private void buildEntityKeysCell(long j, byte[][] bArr, List<Cell> list) {
        IndexCellWrapper.EntityKeysBuilder entityKeysBuilder = IndexCellWrapper.entityKeysBuilder();
        entityKeysBuilder.setRequestId(j);
        entityKeysBuilder.setKeys(bArr);
        list.add(entityKeysBuilder.build());
    }

    void buildResultCells(LemonServices.QueryRequest queryRequest, QueryService.ResponsePerShard responsePerShard, List<Cell> list) {
        long requestID = queryRequest.getRequestID();
        IndexCellWrapper.CountBuilder countBuilder = IndexCellWrapper.countBuilder();
        countBuilder.setRequestId(requestID);
        countBuilder.setCount(responsePerShard.getShard(), Integer.valueOf(responsePerShard.getCount()), responsePerShard.isFailed());
        list.add(countBuilder.build());
        if (responsePerShard.isFailed()) {
            LOG.error("Query failed, shard name is [" + this.shard.getShardName() + "], shardId is [" + this.shard.getShardId() + "]");
            return;
        }
        if (this.traceEnabled) {
            tracing("Found " + responsePerShard.getCount() + " from shard " + this.shard.getShardName());
        }
        if (queryRequest.getCountOnly()) {
            return;
        }
        if (!queryRequest.getSampling()) {
            IndexCellWrapper.EntityListBuilder entityListBuilder = IndexCellWrapper.entityListBuilder();
            entityListBuilder.setRequestId(requestID);
            entityListBuilder.add(responsePerShard.getShard(), responsePerShard.getEntities());
            list.add(entityListBuilder.build());
        }
        byte[][] keys = responsePerShard.getKeys();
        if (keys != null) {
            IndexCellWrapper.EntityKeysBuilder entityKeysBuilder = IndexCellWrapper.entityKeysBuilder();
            entityKeysBuilder.setRequestId(requestID);
            entityKeysBuilder.setKeys(keys);
            list.add(entityKeysBuilder.build());
        }
    }

    static void buildResultCells(long j, LemonServices.QueryRequest queryRequest, ConcurrentSkipListMap<byte[], QueryService.ResponsePerShard> concurrentSkipListMap, List<Cell> list) {
        Map.Entry<byte[], QueryService.ResponsePerShard> ceilingEntry;
        int i = 0;
        ArrayList arrayList = new ArrayList(concurrentSkipListMap.entrySet());
        ByteString startShard = queryRequest.getStartShard();
        if (startShard != null && (ceilingEntry = concurrentSkipListMap.ceilingEntry(startShard.toByteArray())) != null) {
            i = arrayList.indexOf(ceilingEntry);
        }
        int caching = queryRequest.getCaching();
        int i2 = caching > 0 ? caching : 32;
        IndexCellWrapper.CountBuilder countBuilder = IndexCellWrapper.countBuilder();
        countBuilder.setRequestId(queryRequest.getRequestID());
        IndexCellWrapper.EntityListBuilder entityListBuilder = null;
        int i3 = 0;
        if (!queryRequest.getCountOnly()) {
            entityListBuilder = IndexCellWrapper.entityListBuilder();
            entityListBuilder.setRequestId(j);
        }
        int size = arrayList.size();
        for (int i4 = 0; i4 < size; i4++) {
            Map.Entry entry = (Map.Entry) arrayList.get((i + i4) % size);
            int shard = ((QueryService.ResponsePerShard) entry.getValue()).getShard();
            QueryService.ResponsePerShard responsePerShard = (QueryService.ResponsePerShard) entry.getValue();
            if (responsePerShard.isFailed()) {
                LOG.error("Query failed, shardId is [" + shard + "]");
            }
            countBuilder.setCount(shard, Integer.valueOf(responsePerShard.getCount()), responsePerShard.isFailed());
            if (!queryRequest.getCountOnly() && responsePerShard.getCount() != 0 && !responsePerShard.isFailed() && i3 < i2) {
                List<Integer> entities = responsePerShard.getEntities();
                int size2 = entities.size();
                int i5 = i2 - i3;
                if (i5 >= size2) {
                    entityListBuilder.add(shard, entities);
                    i3 += size2;
                } else {
                    entityListBuilder.add(shard, entities.subList(0, i5));
                    i3 += i5;
                }
            }
        }
        list.add(countBuilder.build());
        if (entityListBuilder == null || i3 <= 0) {
            return;
        }
        list.add(entityListBuilder.build());
    }

    private static void tracing(String str) {
        LOG.info(str);
    }

    private static LemonServices.QueryRequest extractQueryRequest(Get get) {
        ByteArrayComparable comparator;
        SingleColumnValueFilter filter = get.getFilter();
        if (filter == null || !(filter instanceof SingleColumnValueFilter)) {
            LOG.warn("Filter was not set correctly for index based query");
            return null;
        }
        SingleColumnValueFilter singleColumnValueFilter = filter;
        if (!Bytes.equals(RpcRequestWrapper.QUERY_HINT, singleColumnValueFilter.getQualifier()) || (comparator = singleColumnValueFilter.getComparator()) == null) {
            LOG.warn("Qualifier or query condition was not set correctly");
            return null;
        }
        try {
            return LemonServices.QueryRequest.parseFrom(comparator.getValue());
        } catch (InvalidProtocolBufferException e) {
            LOG.error("Invalid Protocol Buffer bytes");
            return null;
        }
    }
}
