package com.huawei.streaming.processor;

import com.huawei.streaming.common.MultiKey;
import com.huawei.streaming.common.Pair;
import com.huawei.streaming.event.IEvent;
import com.huawei.streaming.lock.ILock;
import com.huawei.streaming.lock.LockImpl;
import com.huawei.streaming.output.IOutput;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.join.IJoinComposer;
import com.huawei.streaming.process.join.IJoinSetProcessor;
import com.huawei.streaming.process.join.JoinFilterProcessor;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/processor/JoinProcessor.class */
public class JoinProcessor extends ProcessorImpl {
    private static final long serialVersionUID = -3565183563328255491L;
    private static final Logger LOG = LoggerFactory.getLogger(JoinProcessor.class);
    private IJoinComposer joinComposer;
    private JoinFilterProcessor joinFilter;
    private IJoinSetProcessor joinSetProcess;
    private final IOutput output;
    private final OutputType outputType;
    private String[] streamNames;
    private int streamNum;
    private boolean unidirectional = false;
    private int uniStreamIndex = 0;
    private boolean selfJoin = false;
    private boolean synFlag = false;
    private ILock lock = new LockImpl();

    public JoinProcessor(IJoinComposer iJoinComposer, String[] strArr, JoinFilterProcessor joinFilterProcessor, IJoinSetProcessor iJoinSetProcessor, IOutput iOutput, OutputType outputType) {
        LOG.debug("Initiate JoinProcessor.");
        if (null == iJoinComposer) {
            LOG.error("The join composer is null.");
            throw new RuntimeException("The join composer is null.");
        }
        if (null == iJoinSetProcessor) {
            LOG.error("The selct process is null.");
            throw new RuntimeException("The selct process is null.");
        }
        if (null == iOutput) {
            LOG.error("The output process is null.");
            throw new RuntimeException("The output process is null.");
        }
        if (strArr.length != iJoinComposer.getStreamsSize()) {
            LOG.error("The streams number are not match. Composer size is {}, and stream names size is {}.", Integer.valueOf(iJoinComposer.getStreamsSize()), Integer.valueOf(strArr.length));
            throw new RuntimeException("The streams number are not match.");
        }
        this.joinComposer = iJoinComposer;
        this.joinFilter = joinFilterProcessor;
        this.joinSetProcess = iJoinSetProcessor;
        this.streamNames = strArr;
        this.streamNum = strArr.length;
        this.output = iOutput;
        if (null != outputType) {
            this.outputType = outputType;
        } else {
            this.outputType = OutputType.I;
        }
    }

    public void setUnidirectional(Boolean bool) {
        this.unidirectional = bool.booleanValue();
    }

    public void setUniStreamIndex(int i) {
        this.uniStreamIndex = i;
    }

    public void setSelfJoin(Boolean bool) {
        this.selfJoin = bool.booleanValue();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [com.huawei.streaming.event.IEvent[], com.huawei.streaming.event.IEvent[][]] */
    /* JADX WARN: Type inference failed for: r0v18, types: [com.huawei.streaming.event.IEvent[], com.huawei.streaming.event.IEvent[][]] */
    @Override // com.huawei.streaming.processor.IProcessor
    public void process(IEvent[] iEventArr, IEvent[] iEventArr2) {
        boolean isLocked;
        IEvent currentEvent = getCurrentEvent(iEventArr, iEventArr2);
        if (null == currentEvent) {
            return;
        }
        try {
            this.lock.lock();
            int streamIndex = getStreamIndex(currentEvent);
            ?? r0 = new IEvent[this.streamNum];
            ?? r02 = new IEvent[this.streamNum];
            r0[streamIndex] = iEventArr;
            r02[streamIndex] = iEventArr2;
            this.joinComposer.maintainData(r0, r02);
            if (this.selfJoin) {
                if (!this.synFlag) {
                    this.synFlag = true;
                    if (this.lock.isLocked()) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                this.synFlag = false;
            }
            if (this.unidirectional && !this.selfJoin && streamIndex != this.uniStreamIndex) {
                if (isLocked) {
                    return;
                } else {
                    return;
                }
            }
            Pair<Set<MultiKey>, Set<MultiKey>> join = this.joinComposer.join(r0, r02);
            if (null != this.joinFilter) {
                this.joinFilter.filter(join.getFirst());
                this.joinFilter.filter(join.getSecond());
            }
            this.output.output(this.joinSetProcess.processJoinResult(join.getFirst(), join.getSecond(), this.outputType));
            if (this.lock.isLocked()) {
                this.lock.unlock();
            }
        } finally {
            if (this.lock.isLocked()) {
                this.lock.unlock();
            }
        }
    }

    private int getStreamIndex(IEvent iEvent) {
        String streamName = iEvent.getStreamName();
        for (int i = 0; i < this.streamNum; i++) {
            if (StringUtils.equals(streamName, this.streamNames[i])) {
                return i;
            }
        }
        LOG.error("Wrong stream name. name={}.", streamName);
        throw new RuntimeException("Wrong stream name.");
    }

    private IEvent getCurrentEvent(IEvent[] iEventArr, IEvent[] iEventArr2) {
        IEvent iEvent;
        if (null == iEventArr && null == iEventArr2) {
            return null;
        }
        return (null == iEventArr || null == (iEvent = iEventArr[0])) ? iEventArr2[0] : iEvent;
    }
}
