package com.huawei.huaweichain.shard;

import com.huawei.huaweichain.Flow;
import com.huawei.huaweichain.FlowException;
import com.huawei.huaweichain.FutureResult;
import com.huawei.huaweichain.Iterator;
import com.huawei.huaweichain.Result;
import com.huawei.huaweichain.Stub;
import com.huawei.huaweichain.shard.ConfigFlow;
import com.huawei.huaweichain.shard.Wrapper;
import com.huawei.huaweichain.user.EventFilter;
import com.huawei.wienerchain.proto.common.Ledger;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/* loaded from: input_file:com/huawei/huaweichain/shard/EventFlow.class */
public class EventFlow extends Flow {
    private String chain;
    private String node;
    private long startNum;
    private List<Wrapper.BlockNumber> startNumOfShards;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/huawei/huaweichain/shard/EventFlow$MergeIterator.class */
    public static class MergeIterator<T> implements Iterator<T> {
        private final Queue<Iterator<T>> queue = new LinkedList();

        public MergeIterator(List<Iterator<T>> list) {
            java.util.Iterator<Iterator<T>> it = list.iterator();
            while (it.hasNext()) {
                this.queue.offer(it.next());
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (!this.queue.isEmpty()) {
                if (this.queue.peek().hasNext()) {
                    return true;
                }
                this.queue.poll();
            }
            return false;
        }

        @Override // java.util.Iterator
        public T next() {
            Iterator<T> poll = this.queue.poll();
            if (poll == null) {
                throw new NoSuchElementException();
            }
            T next = poll.next();
            if (poll.hasNext()) {
                this.queue.offer(poll);
            }
            return next;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            java.util.Iterator<Iterator<T>> it = this.queue.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }

    public EventFlow(Stub stub) {
        super(stub);
        this.startNum = -1L;
    }

    public EventFlow setTarget(String str, String str2) {
        this.chain = str;
        this.node = str2;
        return this;
    }

    public EventFlow setStartNum(long j) {
        this.startNum = j;
        return this;
    }

    public EventFlow setStartNumOfShards(List<Wrapper.BlockNumber> list) {
        this.startNumOfShards = list;
        return this;
    }

    public FutureResult<List<Wrapper.Event>> get(EventFilter eventFilter) {
        return allShardsFilters(eventFilter).flatMap(this::getOfShards);
    }

    public FutureResult<List<Wrapper.Event>> getOfShards(List<Wrapper.EventFilter> list) {
        return FutureResult.convert(CompletableFuture.supplyAsync(() -> {
            List<Result> list2 = (List) ((Stream) list.stream().parallel()).map(eventFilter -> {
                return getOfShard(eventFilter).res();
            }).collect(Collectors.toList());
            for (Result result : list2) {
                if (!result.ok()) {
                    return Result.failure(result.exception());
                }
            }
            return Result.success((List) list2.stream().map((v0) -> {
                return v0.value();
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
        }));
    }

    private FutureResult<List<Wrapper.Event>> getOfShard(Wrapper.EventFilter eventFilter) {
        try {
            return user().event().target(this.chain, this.node).setShardID(eventFilter.shardID).get(eventFilter.filter).map(list -> {
                return (List) list.stream().map(event -> {
                    return new Wrapper.Event(eventFilter.shardID, event);
                }).collect(Collectors.toList());
            });
        } catch (FlowException e) {
            return FutureResult.failure(e);
        }
    }

    public Iterator<Result<Wrapper.Event>> iterator(EventFilter eventFilter) throws FlowException {
        return (Iterator) allShardsFilters(eventFilter).res().flatMap(list -> {
            try {
                return Result.success(iteratorOfShards(list));
            } catch (FlowException e) {
                return Result.failure(e);
            }
        }).orElseThrow();
    }

    public void listen(EventFilter eventFilter, StreamObserver<Result<Wrapper.Event>> streamObserver) throws FlowException {
        Result<List<Wrapper.EventFilter>> res = allShardsFilters(eventFilter).res();
        if (!res.ok()) {
            res.throwException();
        }
        listenOfShards(res.value(), streamObserver);
    }

    public void listenOfShards(List<Wrapper.EventFilter> list, final StreamObserver<Result<Wrapper.Event>> streamObserver) throws FlowException {
        final int size = list.size();
        final Semaphore semaphore = new Semaphore(size);
        if (!semaphore.tryAcquire(size)) {
            throw new FlowException(String.format(Locale.ENGLISH, "failed to listen event of shards with permits %d", Integer.valueOf(size)));
        }
        for (final Wrapper.EventFilter eventFilter : list) {
            user().event().target(this.chain, this.node).setShardID(eventFilter.shardID).listen(eventFilter.filter, new StreamObserver<Result<Ledger.Event>>() { // from class: com.huawei.huaweichain.shard.EventFlow.1
                public void onNext(Result<Ledger.Event> result) {
                    StreamObserver streamObserver2 = streamObserver;
                    Wrapper.EventFilter eventFilter2 = eventFilter;
                    streamObserver2.onNext(result.map(event -> {
                        return new Wrapper.Event(eventFilter2.shardID, event);
                    }));
                }

                public void onError(Throwable th) {
                    semaphore.release();
                    streamObserver.onError(th);
                }

                public void onCompleted() {
                    semaphore.release();
                    if (semaphore.availablePermits() == size) {
                        streamObserver.onCompleted();
                    }
                }
            });
        }
    }

    public Iterator<Result<Wrapper.Event>> iteratorOfShards(List<Wrapper.EventFilter> list) throws FlowException {
        ArrayList arrayList = new ArrayList(list.size());
        for (Wrapper.EventFilter eventFilter : list) {
            arrayList.add(new Iterator.IteratorWrapper(user().event().target(this.chain, this.node).setShardID(eventFilter.shardID).iterator(eventFilter.filter), result -> {
                return result.map(event -> {
                    return new Wrapper.Event(eventFilter.shardID, event);
                });
            }));
        }
        return new MergeIterator(arrayList);
    }

    private FutureResult<List<Wrapper.EventFilter>> allShardsFilters(EventFilter eventFilter) {
        return new ShardFlow(this.stub).config(this.chain).reader(this.node).sharding(0).map(sharding -> {
            return (List) IntStream.range(0, sharding.getShardNum()).mapToObj(i -> {
                return new Wrapper.EventFilter(i, eventFilter);
            }).collect(Collectors.toList());
        });
    }

    public FutureResult<Iterator<Wrapper.Block>> blockIterator() {
        if (!this.startNumOfShards.isEmpty()) {
            return blockIteratorOfShards(this.startNumOfShards, true);
        }
        ConfigFlow.ConfigReaderFlow reader = new ShardFlow(this.stub).config(this.chain).reader(this.node);
        return this.startNum >= 0 ? reader.allShardNumber(this.startNum).flatMap(list -> {
            return blockIteratorOfShards(list, true);
        }) : reader.allShardNumber(0L).flatMap(list2 -> {
            return blockIteratorOfShards(list2, false);
        });
    }

    private FutureResult<Iterator<Wrapper.Block>> blockIteratorOfShards(List<Wrapper.BlockNumber> list, boolean z) {
        ArrayList arrayList = new ArrayList(list.size());
        for (Wrapper.BlockNumber blockNumber : list) {
            com.huawei.huaweichain.user.EventFlow shardID = user().event().target(this.chain, this.node).setShardID(blockNumber.shardID);
            if (z) {
                shardID.setStartNum(blockNumber.blockNum);
            }
            arrayList.add(shardID.blockIterator().map(iterator -> {
                return new Iterator.IteratorWrapper(iterator, block -> {
                    return new Wrapper.Block(blockNumber.shardID, block);
                });
            }));
        }
        return FutureResult.wrap(() -> {
            return FutureResult.unwrap(arrayList).map(MergeIterator::new);
        });
    }

    public FutureResult<Iterator<Wrapper.BlockResult>> blockResultIterator() {
        if (!this.startNumOfShards.isEmpty()) {
            return blockResultIteratorOfShards(this.startNumOfShards, true);
        }
        ConfigFlow.ConfigReaderFlow reader = new ShardFlow(this.stub).config(this.chain).reader(this.node);
        return this.startNum >= 0 ? reader.allShardNumber(this.startNum).flatMap(list -> {
            return blockResultIteratorOfShards(list, true);
        }) : reader.allShardNumber(0L).flatMap(list2 -> {
            return blockResultIteratorOfShards(list2, false);
        });
    }

    private FutureResult<Iterator<Wrapper.BlockResult>> blockResultIteratorOfShards(List<Wrapper.BlockNumber> list, boolean z) {
        ArrayList arrayList = new ArrayList(list.size());
        for (Wrapper.BlockNumber blockNumber : list) {
            com.huawei.huaweichain.user.EventFlow shardID = user().event().target(this.chain, this.node).setShardID(blockNumber.shardID);
            if (z) {
                shardID.setStartNum(blockNumber.blockNum);
            }
            arrayList.add(shardID.blockResultIterator().map(iterator -> {
                return new Iterator.IteratorWrapper(iterator, blockResult -> {
                    return new Wrapper.BlockResult(blockNumber.shardID, blockResult);
                });
            }));
        }
        return FutureResult.wrap(() -> {
            return FutureResult.unwrap(arrayList).map(MergeIterator::new);
        });
    }

    public FutureResult<Iterator<Wrapper.BlockAndResult>> blockWithResultIterator() {
        if (!this.startNumOfShards.isEmpty()) {
            return blockWithResultIteratorOfShards(this.startNumOfShards, true);
        }
        ConfigFlow.ConfigReaderFlow reader = new ShardFlow(this.stub).config(this.chain).reader(this.node);
        return this.startNum >= 0 ? reader.allShardNumber(this.startNum).flatMap(list -> {
            return blockWithResultIteratorOfShards(list, true);
        }) : reader.allShardNumber(0L).flatMap(list2 -> {
            return blockWithResultIteratorOfShards(list2, false);
        });
    }

    private FutureResult<Iterator<Wrapper.BlockAndResult>> blockWithResultIteratorOfShards(List<Wrapper.BlockNumber> list, boolean z) {
        ArrayList arrayList = new ArrayList(list.size());
        for (Wrapper.BlockNumber blockNumber : list) {
            com.huawei.huaweichain.user.EventFlow shardID = user().event().target(this.chain, this.node).setShardID(blockNumber.shardID);
            if (z) {
                shardID.setStartNum(blockNumber.blockNum);
            }
            arrayList.add(shardID.blockWithResultIterator().map(iterator -> {
                return new Iterator.IteratorWrapper(iterator, blockAndResult -> {
                    return new Wrapper.BlockAndResult(blockNumber.shardID, blockAndResult);
                });
            }));
        }
        return FutureResult.wrap(() -> {
            return FutureResult.unwrap(arrayList).map(MergeIterator::new);
        });
    }

    public FutureResult<Iterator<Wrapper.BlockNumber>> blockNumberIterator() {
        if (!this.startNumOfShards.isEmpty()) {
            return blockNumberIteratorOfShards(this.startNumOfShards, true);
        }
        ConfigFlow.ConfigReaderFlow reader = new ShardFlow(this.stub).config(this.chain).reader(this.node);
        return this.startNum >= 0 ? reader.allShardNumber(this.startNum).flatMap(list -> {
            return blockNumberIteratorOfShards(list, true);
        }) : reader.allShardNumber(0L).flatMap(list2 -> {
            return blockNumberIteratorOfShards(list2, false);
        });
    }

    private FutureResult<Iterator<Wrapper.BlockNumber>> blockNumberIteratorOfShards(List<Wrapper.BlockNumber> list, boolean z) {
        ArrayList arrayList = new ArrayList(list.size());
        for (Wrapper.BlockNumber blockNumber : list) {
            com.huawei.huaweichain.user.EventFlow shardID = user().event().target(this.chain, this.node).setShardID(blockNumber.shardID);
            if (z) {
                shardID.setStartNum(blockNumber.blockNum);
            }
            arrayList.add(shardID.blockNumberIterator().map(iterator -> {
                return new Iterator.IteratorWrapper(iterator, l -> {
                    return new Wrapper.BlockNumber(blockNumber.shardID, l.longValue());
                });
            }));
        }
        return FutureResult.wrap(() -> {
            return FutureResult.unwrap(arrayList).map(MergeIterator::new);
        });
    }
}
