package org.apache.flink.table.runtime.operators.join.stream;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewWithAssociationCounter;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewsWithAssociationCounter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.class */
public class StreamingSemiAntiJoinOperator extends AbstractStreamingJoinOperator {
    private static final long serialVersionUID = -3135772379944924519L;
    private final boolean isAntiJoin;
    private transient JoinRecordStateViewWithAssociationCounter leftRecordStateView;
    private transient JoinRecordStateView rightRecordStateView;

    public StreamingSemiAntiJoinOperator(boolean z, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec joinInputSideSpec, JoinInputSideSpec joinInputSideSpec2, boolean[] zArr, long j, long j2) {
        super(internalTypeInfo, internalTypeInfo2, generatedJoinCondition, joinInputSideSpec, joinInputSideSpec2, zArr, j, j2);
        this.isAntiJoin = z;
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator
    public void open() throws Exception {
        super.open();
        this.leftRecordStateView = JoinRecordStateViewsWithAssociationCounter.create(getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime);
        this.rightRecordStateView = JoinRecordStateViews.create(getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime);
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        AbstractStreamingJoinOperator.AssociatedRecords of = AbstractStreamingJoinOperator.AssociatedRecords.of(rowData, true, this.rightRecordStateView, this.joinCondition);
        if (of.isEmpty()) {
            if (this.isAntiJoin) {
                this.collector.collect(rowData);
            }
        } else if (!this.isAntiJoin) {
            this.collector.collect(rowData);
        }
        if (RowDataUtil.isAccumulateMsg(rowData)) {
            rowData.setRowKind(RowKind.INSERT);
            this.leftRecordStateView.addRecord(rowData, of.size());
        } else {
            rowData.setRowKind(RowKind.INSERT);
            this.leftRecordStateView.retractRecord(rowData);
        }
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(rowData);
        RowKind rowKind = rowData.getRowKind();
        rowData.setRowKind(RowKind.INSERT);
        AbstractStreamingJoinOperator.AssociatedRecords of = AbstractStreamingJoinOperator.AssociatedRecords.of(rowData, false, this.leftRecordStateView, this.joinCondition);
        if (isAccumulateMsg) {
            this.rightRecordStateView.addRecord(rowData);
            if (of.isEmpty()) {
                return;
            }
            for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations : of.getRecordsWithAssociations()) {
                RowData rowData2 = recordWithAssociations.record;
                if (recordWithAssociations.numOfAssociations == 0) {
                    if (this.isAntiJoin) {
                        rowData2.setRowKind(RowKind.DELETE);
                    } else {
                        rowData2.setRowKind(rowKind);
                    }
                    this.collector.collect(rowData2);
                    rowData2.setRowKind(RowKind.INSERT);
                }
                this.leftRecordStateView.updateNumOfAssociations(rowData2, recordWithAssociations.numOfAssociations + 1);
            }
            return;
        }
        this.rightRecordStateView.retractRecord(rowData);
        if (of.isEmpty()) {
            return;
        }
        for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations2 : of.getRecordsWithAssociations()) {
            RowData rowData3 = recordWithAssociations2.record;
            if (recordWithAssociations2.numOfAssociations == 1) {
                if (this.isAntiJoin) {
                    rowData3.setRowKind(RowKind.INSERT);
                } else {
                    rowData3.setRowKind(rowKind);
                }
                this.collector.collect(rowData3);
                rowData3.setRowKind(RowKind.INSERT);
            }
            this.leftRecordStateView.updateNumOfAssociations(rowData3, recordWithAssociations2.numOfAssociations - 1);
        }
    }
}
