package org.apache.flink.table.runtime.operators.join.stream;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewWithAssociationCounter;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViewsWithAssociationCounter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperatorWithStateElimination.class */
public class StreamingMultiJoinOperatorWithStateElimination extends AbstractStreamingMultiJoinOperator<JoinRecordStateViewWithAssociationCounter> {
    private static final long serialVersionUID = -3909081023236071918L;
    private final int stateEliminationThreshold;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperatorWithStateElimination$StreamingMultiJoinOperatorWithStateEliminationInput.class */
    private class StreamingMultiJoinOperatorWithStateEliminationInput extends AbstractStreamingMultiJoinOperator<JoinRecordStateViewWithAssociationCounter>.AbstractStreamingMultiJoinInput {
        private static final long serialVersionUID = 5643213707866199928L;

        public StreamingMultiJoinOperatorWithStateEliminationInput(StreamingMultiJoinOperatorWithStateElimination streamingMultiJoinOperatorWithStateElimination, int i) {
            super(streamingMultiJoinOperatorWithStateElimination, i);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator.AbstractStreamingMultiJoinInput
        protected void addInputRecord(int i, RowData rowData) throws Exception {
            if (StreamingMultiJoinOperatorWithStateElimination.this.stateEliminationThreshold > 1) {
                ((JoinRecordStateViewWithAssociationCounter) StreamingMultiJoinOperatorWithStateElimination.this.recordStateViews.get(Integer.valueOf(i))).addRecord(rowData, 1);
            }
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator.AbstractStreamingMultiJoinInput
        protected void changeOtherRecordsState(boolean z, Map<Integer, Iterable<AbstractStreamingMultiJoinOperator.RowDataContainer>> map) throws Exception {
            Iterator<Integer> it = map.keySet().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                for (AbstractStreamingMultiJoinOperator.RowDataContainer rowDataContainer : map.get(Integer.valueOf(intValue))) {
                    int joinCounter = rowDataContainer.getJoinCounter();
                    if (z) {
                        if (joinCounter == StreamingMultiJoinOperatorWithStateElimination.this.stateEliminationThreshold - 1) {
                            ((JoinRecordStateViewWithAssociationCounter) StreamingMultiJoinOperatorWithStateElimination.this.recordStateViews.get(Integer.valueOf(intValue))).retractRecord(rowDataContainer.getRowData());
                        } else {
                            ((JoinRecordStateViewWithAssociationCounter) StreamingMultiJoinOperatorWithStateElimination.this.recordStateViews.get(Integer.valueOf(intValue))).updateNumOfAssociations(rowDataContainer.getRowData(), joinCounter + 1);
                        }
                    } else if (joinCounter > 0) {
                        ((JoinRecordStateViewWithAssociationCounter) StreamingMultiJoinOperatorWithStateElimination.this.recordStateViews.get(Integer.valueOf(intValue))).updateNumOfAssociations(rowDataContainer.getRowData(), joinCounter - 1);
                    } else {
                        StreamingMultiJoinOperatorWithStateElimination.LOG.error("The association counter is trying to become less than 0.");
                    }
                }
            }
        }
    }

    public StreamingMultiJoinOperatorWithStateElimination(StreamOperatorParameters<RowData> streamOperatorParameters, List<InternalTypeInfo<RowData>> list, List<JoinInputSideSpec> list2, long j, boolean z, long j2, boolean z2, int i) {
        super(streamOperatorParameters, list, list2, j, z, j2, z2);
        Preconditions.checkState(!z2, "StreamingMultiJoinOperatorWithStateElimination supports only INNER JOIN.");
        Preconditions.checkState(i > 0, "StreamingMultiJoinOperatorWithStateElimination requires state elimination.");
        this.stateEliminationThreshold = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator
    public JoinRecordStateViewWithAssociationCounter createJoinRecordStateView(int i) {
        return JoinRecordStateViewsWithAssociationCounter.create(getRuntimeContext(), "multi-join-records-" + i, this.inputSideSpecs.get(i), this.inputTypes.get(i), this.stateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator
    protected Iterator<? extends AbstractStreamingMultiJoinOperator.RowDataContainer> getRecordsIterator(int i) throws Exception {
        return StreamSupport.stream(((JoinRecordStateViewWithAssociationCounter) this.recordStateViews.get(Integer.valueOf(i))).getRecordsAndNumOfAssociations().spliterator(), false).map(tuple2 -> {
            return new AbstractStreamingMultiJoinOperator.RowDataContainer() { // from class: org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperatorWithStateElimination.1
                @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator.RowDataContainer
                public RowData getRowData() {
                    return (RowData) tuple2.f0;
                }

                @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator.RowDataContainer
                public int getJoinCounter() {
                    return ((Integer) tuple2.f1).intValue();
                }
            };
        }).iterator();
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator
    protected void addRecord(int i, RowData rowData) throws Exception {
        ((JoinRecordStateViewWithAssociationCounter) this.recordStateViews.get(Integer.valueOf(i))).addRecord(rowData, 0);
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingMultiJoinOperator
    protected Map<Integer, StreamingMultiJoinOperatorWithStateEliminationInput> createInputs(List<InternalTypeInfo<RowData>> list) {
        return (Map) IntStream.rangeClosed(1, list.size()).mapToObj(i -> {
            return new StreamingMultiJoinOperatorWithStateEliminationInput(this, i);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getInputId();
        }, streamingMultiJoinOperatorWithStateEliminationInput -> {
            return streamingMultiJoinOperatorWithStateEliminationInput;
        }));
    }
}
