package org.apache.flink.table.runtime.operators.join.stream;

import java.util.Iterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewWithAssociationCounter;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewsWithAssociationCounter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.class */
public class StreamingJoinOperator extends AbstractStreamingJoinOperator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJoinOperator.class);
    private static final long serialVersionUID = -376944622236540545L;
    private final boolean leftIsOuter;
    private final boolean rightIsOuter;
    private final boolean leftDuplicate;
    private final boolean rightDuplicate;
    private final boolean isSkipNullUpdateInOuterJoin;
    private transient JoinedRowData outRow;
    private transient RowData leftNullRow;
    private transient RowData rightNullRow;
    private transient JoinRecordStateView leftRecordStateView;
    private transient JoinRecordStateView rightRecordStateView;

    public StreamingJoinOperator(InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec joinInputSideSpec, JoinInputSideSpec joinInputSideSpec2, boolean z, boolean z2, boolean[] zArr, long j, long j2, boolean z3, long j3, boolean z4, boolean z5, boolean z6) {
        super(internalTypeInfo, internalTypeInfo2, generatedJoinCondition, joinInputSideSpec, joinInputSideSpec2, zArr, j, j2, z3, j3);
        this.leftIsOuter = z;
        this.rightIsOuter = z2;
        this.leftDuplicate = z4;
        this.rightDuplicate = z5;
        this.isSkipNullUpdateInOuterJoin = z6;
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator
    public void open() throws Exception {
        super.open();
        this.outRow = new JoinedRowData();
        this.leftNullRow = new GenericRowData(this.leftType.toRowSize());
        this.rightNullRow = new GenericRowData(this.rightType.toRowSize());
        if (this.leftIsOuter) {
            this.leftRecordStateView = JoinRecordStateViewsWithAssociationCounter.create(getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
        } else {
            this.leftRecordStateView = JoinRecordStateViews.create(getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
        }
        if (this.rightIsOuter) {
            this.rightRecordStateView = JoinRecordStateViewsWithAssociationCounter.create(getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
        } else {
            this.rightRecordStateView = JoinRecordStateViews.create(getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
        }
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        processElement((RowData) streamRecord.getValue(), this.leftRecordStateView, this.rightRecordStateView, true);
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        processElement((RowData) streamRecord.getValue(), this.rightRecordStateView, this.leftRecordStateView, false);
    }

    private void processElement(RowData rowData, JoinRecordStateView joinRecordStateView, JoinRecordStateView joinRecordStateView2, boolean z) throws Exception {
        boolean z2 = z ? this.leftIsOuter : this.rightIsOuter;
        boolean z3 = z ? this.rightIsOuter : this.leftIsOuter;
        boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(rowData);
        RowKind rowKind = rowData.getRowKind();
        rowData.setRowKind(RowKind.INSERT);
        if ((z && this.leftDuplicate) || (!z && this.rightDuplicate)) {
            int hashCode = rowData.hashCode();
            for (RowData rowData2 : joinRecordStateView.getRecords()) {
                if (rowData2.hashCode() == hashCode && rowData2.equals(rowData)) {
                    LOG.debug("Duplicate record is received in the {}", z ? "leftInput" : "RightInput");
                    return;
                }
            }
        }
        AbstractStreamingJoinOperator.AssociatedRecords of = AbstractStreamingJoinOperator.AssociatedRecords.of(rowData, z, joinRecordStateView2, this.joinCondition);
        if (!isAccumulateMsg) {
            joinRecordStateView.retractRecord(rowData);
            if (of.isEmpty()) {
                if (z2) {
                    this.outRow.setRowKind(RowKind.DELETE);
                    outputNullPadding(rowData, z);
                    return;
                }
                return;
            }
            if (z2 || z3) {
                this.outRow.setRowKind(RowKind.DELETE);
            } else {
                this.outRow.setRowKind(rowKind);
            }
            Iterator<RowData> it = of.getRecords().iterator();
            while (it.hasNext()) {
                output(rowData, it.next(), z);
            }
            if (z3) {
                JoinRecordStateViewWithAssociationCounter joinRecordStateViewWithAssociationCounter = (JoinRecordStateViewWithAssociationCounter) joinRecordStateView2;
                for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations : of.getRecordsWithAssociations()) {
                    if (recordWithAssociations.numOfAssociations == 1 && !skipNullRetract(rowKind)) {
                        this.outRow.setRowKind(RowKind.INSERT);
                        outputNullPadding(recordWithAssociations.record, !z);
                    }
                    joinRecordStateViewWithAssociationCounter.updateNumOfAssociations(recordWithAssociations.record, recordWithAssociations.numOfAssociations - 1);
                }
                return;
            }
            return;
        }
        if (!z2) {
            joinRecordStateView.addRecord(rowData);
            if (of.isEmpty()) {
                return;
            }
            if (z3) {
                JoinRecordStateViewWithAssociationCounter joinRecordStateViewWithAssociationCounter2 = (JoinRecordStateViewWithAssociationCounter) joinRecordStateView2;
                for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations2 : of.getRecordsWithAssociations()) {
                    if (recordWithAssociations2.numOfAssociations == 0 && !skipNullAccumulate(rowKind)) {
                        this.outRow.setRowKind(RowKind.DELETE);
                        outputNullPadding(recordWithAssociations2.record, !z);
                    }
                    joinRecordStateViewWithAssociationCounter2.updateNumOfAssociations(recordWithAssociations2.record, recordWithAssociations2.numOfAssociations + 1);
                }
                this.outRow.setRowKind(RowKind.INSERT);
            } else {
                this.outRow.setRowKind(rowKind);
            }
            Iterator<RowData> it2 = of.getRecords().iterator();
            while (it2.hasNext()) {
                output(rowData, it2.next(), z);
            }
            return;
        }
        JoinRecordStateViewWithAssociationCounter joinRecordStateViewWithAssociationCounter3 = (JoinRecordStateViewWithAssociationCounter) joinRecordStateView;
        if (of.isEmpty()) {
            this.outRow.setRowKind(RowKind.INSERT);
            outputNullPadding(rowData, z);
            joinRecordStateViewWithAssociationCounter3.addRecord(rowData, 0);
            return;
        }
        if (z3) {
            JoinRecordStateViewWithAssociationCounter joinRecordStateViewWithAssociationCounter4 = (JoinRecordStateViewWithAssociationCounter) joinRecordStateView2;
            for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations3 : of.getRecordsWithAssociations()) {
                RowData rowData3 = recordWithAssociations3.record;
                if (recordWithAssociations3.numOfAssociations == 0 && !skipNullAccumulate(rowKind)) {
                    this.outRow.setRowKind(RowKind.DELETE);
                    outputNullPadding(rowData3, !z);
                }
                joinRecordStateViewWithAssociationCounter4.updateNumOfAssociations(rowData3, recordWithAssociations3.numOfAssociations + 1);
            }
        }
        this.outRow.setRowKind(RowKind.INSERT);
        Iterator<RowData> it3 = of.getRecords().iterator();
        while (it3.hasNext()) {
            output(rowData, it3.next(), z);
        }
        joinRecordStateViewWithAssociationCounter3.addRecord(rowData, of.size());
    }

    private void output(RowData rowData, RowData rowData2, boolean z) {
        if (z) {
            this.outRow.replace(rowData, rowData2);
        } else {
            this.outRow.replace(rowData2, rowData);
        }
        this.collector.collect(this.outRow);
    }

    private void outputNullPadding(RowData rowData, boolean z) {
        if (z) {
            this.outRow.replace(rowData, this.rightNullRow);
        } else {
            this.outRow.replace(this.leftNullRow, rowData);
        }
        this.collector.collect(this.outRow);
    }

    private boolean skipNullAccumulate(RowKind rowKind) {
        return this.isSkipNullUpdateInOuterJoin && rowKind == RowKind.UPDATE_AFTER;
    }

    private boolean skipNullRetract(RowKind rowKind) {
        return this.isSkipNullUpdateInOuterJoin && rowKind == RowKind.UPDATE_BEFORE;
    }
}
