package org.apache.flink.table.runtime.operators.join.stream;

import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewWithAssociationCounter;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewsWithAssociationCounter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorWithStateElimination.class */
public class StreamingJoinOperatorWithStateElimination extends AbstractStreamingJoinOperator {
    private int leftEliminationThreshold;
    private int rightEliminationThreshold;
    private final boolean leftDuplicate;
    private final boolean rightDuplicate;
    private transient JoinedRowData outRow;
    private transient JoinRecordStateViewWithAssociationCounter leftRecordStateView;
    private transient JoinRecordStateViewWithAssociationCounter rightRecordStateView;

    public StreamingJoinOperatorWithStateElimination(InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec joinInputSideSpec, JoinInputSideSpec joinInputSideSpec2, int i, int i2, boolean[] zArr, long j, long j2, boolean z, long j3, @Nullable StreamOperatorFactory<RowData> streamOperatorFactory, boolean z2, boolean z3) {
        super(internalTypeInfo, internalTypeInfo2, generatedJoinCondition, joinInputSideSpec, joinInputSideSpec2, zArr, j, j2, z, j3, streamOperatorFactory);
        Preconditions.checkState(i > 0 || i2 > 0, "StreamingJoinOperatorWithStateElimination requires either left or right side to support state elimination");
        this.leftEliminationThreshold = i;
        this.rightEliminationThreshold = i2;
        this.leftDuplicate = z2;
        this.rightDuplicate = z3;
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator
    public void open() throws Exception {
        super.open();
        this.outRow = new JoinedRowData();
        this.leftRecordStateView = JoinRecordStateViewsWithAssociationCounter.create(getRuntimeContext(), "left-records", this.leftInputSideSpec, this.leftType, this.leftStateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
        this.rightRecordStateView = JoinRecordStateViewsWithAssociationCounter.create(getRuntimeContext(), "right-records", this.rightInputSideSpec, this.rightType, this.rightStateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator
    public void endInput(int i) throws Exception {
        if (this.batchJoinOperator == null || i != 2) {
            return;
        }
        this.batchJoinOperator.close();
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        processElement((RowData) streamRecord.getValue(), this.leftRecordStateView, this.rightRecordStateView, true);
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        processElement((RowData) streamRecord.getValue(), this.rightRecordStateView, this.leftRecordStateView, false);
    }

    public Configuration updateOperatorConfigs(Configuration configuration) {
        Configuration updateOperatorConfigs;
        synchronized (this.updateConfigsLock) {
            updateOperatorConfigs = super.updateOperatorConfigs(configuration);
            tryUpdateOperatorConfig(configuration, ExecutionConfigOptions.TABLE_EXEC_JOIN_LEFT_ELIMINATE_STATE_THRESHOLD, updateOperatorConfigs).ifPresent((v1) -> {
                setLeftEliminationThreshold(v1);
            });
            tryUpdateOperatorConfig(configuration, ExecutionConfigOptions.TABLE_EXEC_JOIN_RIGHT_ELIMINATE_STATE_THRESHOLD, updateOperatorConfigs).ifPresent((v1) -> {
                setRightEliminationThreshold(v1);
            });
        }
        return updateOperatorConfigs;
    }

    private void setLeftEliminationThreshold(int i) {
        this.leftEliminationThreshold = i;
    }

    private void setRightEliminationThreshold(int i) {
        this.rightEliminationThreshold = i;
    }

    private void processElement(RowData rowData, JoinRecordStateViewWithAssociationCounter joinRecordStateViewWithAssociationCounter, JoinRecordStateViewWithAssociationCounter joinRecordStateViewWithAssociationCounter2, boolean z) throws Exception {
        int i;
        int i2;
        synchronized (this.updateConfigsLock) {
            i = z ? this.leftEliminationThreshold : this.rightEliminationThreshold;
            i2 = z ? this.rightEliminationThreshold : this.leftEliminationThreshold;
        }
        boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(rowData);
        this.outRow.setRowKind(rowData.getRowKind());
        if ((z && this.leftDuplicate) || (!z && this.rightDuplicate)) {
            int hashCode = rowData.hashCode();
            for (RowData rowData2 : joinRecordStateViewWithAssociationCounter.getRecords()) {
                if (rowData2.hashCode() == hashCode && rowData2.equals(rowData)) {
                    LOG.debug("Duplicate record is received in the {}", z ? "leftInput" : "RightInput");
                    return;
                }
            }
        }
        AbstractStreamingJoinOperator.AssociatedRecords of = AbstractStreamingJoinOperator.AssociatedRecords.of(rowData, z, joinRecordStateViewWithAssociationCounter2, this.joinCondition);
        if (!isAccumulateMsg) {
            joinRecordStateViewWithAssociationCounter.retractRecord(rowData);
            if (of.isEmpty()) {
                return;
            }
            for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations : of.getRecordsWithAssociations()) {
                output(rowData, recordWithAssociations.record, z);
                int i3 = recordWithAssociations.numOfAssociations;
                if (i3 > 0) {
                    joinRecordStateViewWithAssociationCounter2.updateNumOfAssociations(recordWithAssociations.record, i3 - 1);
                }
            }
            return;
        }
        if (i == 0 || i > of.size()) {
            joinRecordStateViewWithAssociationCounter.addRecord(rowData, of.size());
        }
        if (of.isEmpty()) {
            return;
        }
        int i4 = 0;
        for (AbstractStreamingJoinOperator.RecordWithAssociations recordWithAssociations2 : of.getRecordsWithAssociations()) {
            if (i > 0 && i4 >= i) {
                return;
            }
            output(rowData, recordWithAssociations2.record, z);
            if (i2 > 0) {
                int i5 = recordWithAssociations2.numOfAssociations;
                if (i5 + 1 >= i2) {
                    joinRecordStateViewWithAssociationCounter2.retractRecord(recordWithAssociations2.record);
                } else {
                    joinRecordStateViewWithAssociationCounter2.updateNumOfAssociations(recordWithAssociations2.record, i5 + 1);
                }
            }
            i4++;
        }
    }

    private void output(RowData rowData, RowData rowData2, boolean z) {
        if (z) {
            this.outRow.replace(rowData, rowData2);
        } else {
            this.outRow.replace(rowData2, rowData);
        }
        this.collector.collect(this.outRow);
    }

    @VisibleForTesting
    public int getLeftEliminationThreshold() {
        return this.leftEliminationThreshold;
    }
}
