package com.huawei.streaming.process.join;

import com.huawei.streaming.common.MultiKey;
import com.huawei.streaming.common.Pair;
import com.huawei.streaming.event.IEvent;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/process/join/CrossBiJoinComposer.class */
public class CrossBiJoinComposer implements IJoinComposer {
    private static final long serialVersionUID = -705647270478065074L;
    private static final Logger LOG = LoggerFactory.getLogger(CrossBiJoinComposer.class);
    private static final int STREAM_NUM = 2;
    private SimpleEventCollection leftStream;
    private SimpleEventCollection rightStream;
    private Set<MultiKey> oldResults = new LinkedHashSet();
    private Set<MultiKey> newResults = new LinkedHashSet();
    private boolean joinRStream;

    public CrossBiJoinComposer(SimpleEventCollection simpleEventCollection, SimpleEventCollection simpleEventCollection2, boolean z) {
        LOG.debug("Initiate CrossBiJoinComposer. Left stream name={}, Right stream name={}, joinRStream={}.", new Object[]{simpleEventCollection.getStreamName(), simpleEventCollection2.getStreamName(), Boolean.valueOf(z)});
        this.leftStream = simpleEventCollection;
        this.rightStream = simpleEventCollection2;
        this.joinRStream = z;
    }

    @Override // com.huawei.streaming.process.join.IJoinComposer
    public Pair<Set<MultiKey>, Set<MultiKey>> join(IEvent[][] iEventArr, IEvent[][] iEventArr2) {
        if (null == iEventArr && null == iEventArr2) {
            throw new RuntimeException("Mission impossible.");
        }
        if ((null != iEventArr && iEventArr.length != 2) || (null != iEventArr2 && iEventArr2.length != 2)) {
            LOG.error("This updated streams numbers is not for Cross Bi Stream Join.");
            throw new RuntimeException("This updated streams numbers is not for Bi Stream Join.");
        }
        this.newResults.clear();
        this.oldResults.clear();
        if (this.joinRStream && null != iEventArr2) {
            for (int i = 0; i < iEventArr2.length; i++) {
                compose(iEventArr2[i], i, this.oldResults);
            }
        }
        if (null != iEventArr) {
            for (int i2 = 0; i2 < iEventArr.length; i2++) {
                compose(iEventArr[i2], i2, this.newResults);
            }
        }
        return new Pair<>(this.newResults, this.oldResults);
    }

    private void compose(IEvent[] iEventArr, int i, Set<MultiKey> set) {
        if (iEventArr == null || iEventArr.length == 0) {
            return;
        }
        ArrayDeque arrayDeque = new ArrayDeque();
        for (IEvent iEvent : iEventArr) {
            perEventCompose(iEvent, i, arrayDeque);
            Iterator it = arrayDeque.iterator();
            while (it.hasNext()) {
                set.add(new MultiKey((IEvent[]) it.next()));
            }
            arrayDeque.clear();
        }
    }

    private void perEventCompose(IEvent iEvent, int i, Collection<IEvent[]> collection) {
        if (null == iEvent) {
            return;
        }
        for (IEvent iEvent2 : i == 0 ? this.rightStream.lookupAllWithNull() : this.leftStream.lookupAllWithNull()) {
            IEvent[] iEventArr = new IEvent[2];
            iEventArr[i] = iEvent;
            iEventArr[(2 - i) - 1] = iEvent2;
            collection.add(iEventArr);
        }
    }

    @Override // com.huawei.streaming.process.join.IJoinComposer
    public void maintainData(IEvent[][] iEventArr, IEvent[][] iEventArr2) {
        if (iEventArr.length != 2 || iEventArr2.length != 2) {
            LOG.error("This updated streams numbers is not for Bi Stream Join.");
            throw new RuntimeException("This updated streams numbers is not for Bi Stream Join.");
        }
        this.leftStream.addRemove(iEventArr[0], iEventArr2[0]);
        this.rightStream.addRemove(iEventArr[1], iEventArr2[1]);
    }

    @Override // com.huawei.streaming.process.join.IJoinComposer
    public int getStreamsSize() {
        return 2;
    }

    public SimpleEventCollection getLeftStream() {
        return this.leftStream;
    }

    public SimpleEventCollection getRightStream() {
        return this.rightStream;
    }
}
