package org.apache.flink.table.runtime.operators.join.stream;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.MultiJoinedRowData;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.class */
public class StreamingMultiJoinOperator extends AbstractStreamOperatorV2<RowData> implements MultipleInputStreamOperator<RowData> {
    private static final long serialVersionUID = 6573894621504275587L;
    private final Map<Integer, StreamingMultiJoinInput> inputs;
    private final List<InternalTypeInfo<RowData>> inputTypes;
    private final List<JoinInputSideSpec> inputSideSpecs;
    private final long stateRetentionTime;
    private final boolean isColdStateEnabled;
    private final long coldStateRetentionTime;
    private final boolean isOuterJoin;
    private transient StreamRecord<MultiJoinedRowData> outStreamRecord;
    private transient Map<Integer, RowData> nullRecords;
    private transient Map<Integer, JoinRecordStateView> recordStateViews;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator$StreamingMultiJoinInput.class */
    public class StreamingMultiJoinInput extends AbstractInput<RowData, RowData> implements Serializable {
        private static final long serialVersionUID = -6429454607601928139L;

        public StreamingMultiJoinInput(StreamingMultiJoinOperator streamingMultiJoinOperator, int i) {
            super(streamingMultiJoinOperator, i);
        }

        public int getInputId() {
            return this.inputId;
        }

        public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
            RowData rowData = (RowData) streamRecord.getValue();
            boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(rowData);
            RowKind rowKind = rowData.getRowKind();
            rowData.setRowKind(RowKind.INSERT);
            HashMap hashMap = new HashMap();
            for (StreamingMultiJoinInput streamingMultiJoinInput : StreamingMultiJoinOperator.this.getAllInputsExcept(this.inputId)) {
                Iterable<RowData> records = StreamingMultiJoinOperator.this.getRecords(streamingMultiJoinInput.getInputId());
                if ((!StreamingMultiJoinOperator.this.isOuterJoin) && Iterables.isEmpty(records)) {
                    if (isAccumulateMsg) {
                        StreamingMultiJoinOperator.this.addRecord(this.inputId, rowData);
                        return;
                    } else {
                        StreamingMultiJoinOperator.this.retractRecord(this.inputId, rowData);
                        return;
                    }
                }
                hashMap.put(Integer.valueOf(streamingMultiJoinInput.getInputId()), records);
            }
            List<MultiJoinedRowData> cartesianOtherRecords = getCartesianOtherRecords(hashMap);
            if (isAccumulateMsg) {
                if (StreamingMultiJoinOperator.this.isOuterJoin && Iterables.isEmpty(StreamingMultiJoinOperator.this.getRecords(this.inputId))) {
                    sendNullPaddingMessage(cartesianOtherRecords, this.inputId, RowKind.DELETE);
                }
                StreamingMultiJoinOperator.this.addRecord(this.inputId, rowData);
                joinInputRecordWithOthers(this.inputId, rowData, StreamingMultiJoinOperator.this.isOuterJoin ? RowKind.INSERT : rowKind, cartesianOtherRecords).forEach(multiJoinedRowData -> {
                    this.output.collect(StreamingMultiJoinOperator.this.outStreamRecord.replace(multiJoinedRowData));
                });
                return;
            }
            StreamingMultiJoinOperator.this.retractRecord(this.inputId, rowData);
            joinInputRecordWithOthers(this.inputId, rowData, StreamingMultiJoinOperator.this.isOuterJoin ? RowKind.DELETE : rowKind, cartesianOtherRecords).forEach(multiJoinedRowData2 -> {
                this.output.collect(StreamingMultiJoinOperator.this.outStreamRecord.replace(multiJoinedRowData2));
            });
            if (StreamingMultiJoinOperator.this.isOuterJoin && Iterables.isEmpty(StreamingMultiJoinOperator.this.getRecords(this.inputId))) {
                sendNullPaddingMessage(cartesianOtherRecords, this.inputId, RowKind.INSERT);
            }
        }

        private List<MultiJoinedRowData> joinInputRecordWithOthers(int i, RowData rowData, RowKind rowKind, List<MultiJoinedRowData> list) {
            if (list.isEmpty() && StreamingMultiJoinOperator.this.isOuterJoin) {
                MultiJoinedRowData multiJoinedRowData = new MultiJoinedRowData((RowData[]) StreamingMultiJoinOperator.this.nullRecords.values().toArray(new RowData[0]));
                multiJoinedRowData.replace(i - 1, rowData);
                multiJoinedRowData.setRowKind(rowKind);
                list.add(multiJoinedRowData);
            } else {
                for (MultiJoinedRowData multiJoinedRowData2 : list) {
                    multiJoinedRowData2.replace(i - 1, rowData);
                    multiJoinedRowData2.setRowKind(rowKind);
                }
            }
            return list;
        }

        public List<MultiJoinedRowData> getCartesianOtherRecords(Map<Integer, Iterable<RowData>> map) {
            ArrayList<MultiJoinedRowData> arrayList = new ArrayList();
            if (map == null || map.isEmpty()) {
                return arrayList;
            }
            ArrayList<Integer> arrayList2 = new ArrayList();
            for (Map.Entry<Integer, Iterable<RowData>> entry : map.entrySet()) {
                Integer key = entry.getKey();
                Iterable<RowData> value = entry.getValue();
                if (Iterables.isEmpty(value)) {
                    if (!StreamingMultiJoinOperator.this.isOuterJoin) {
                        return Collections.emptyList();
                    }
                    arrayList2.add(key);
                } else if (arrayList.isEmpty()) {
                    for (RowData rowData : value) {
                        MultiJoinedRowData multiJoinedRowData = new MultiJoinedRowData((RowData[]) StreamingMultiJoinOperator.this.nullRecords.values().toArray(new RowData[0]));
                        multiJoinedRowData.replace(key.intValue() - 1, rowData);
                        arrayList.add(multiJoinedRowData);
                    }
                } else {
                    ArrayList arrayList3 = new ArrayList();
                    for (MultiJoinedRowData multiJoinedRowData2 : arrayList) {
                        for (RowData rowData2 : value) {
                            MultiJoinedRowData multiJoinedRowData3 = new MultiJoinedRowData(multiJoinedRowData2);
                            multiJoinedRowData3.replace(key.intValue() - 1, rowData2);
                            arrayList3.add(multiJoinedRowData3);
                        }
                    }
                    arrayList = arrayList3;
                }
            }
            for (Integer num : arrayList2) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((MultiJoinedRowData) it.next()).replace(num.intValue() - 1, (RowData) StreamingMultiJoinOperator.this.nullRecords.get(num));
                }
            }
            return arrayList;
        }

        private void sendNullPaddingMessage(List<MultiJoinedRowData> list, int i, RowKind rowKind) {
            Iterator<MultiJoinedRowData> it = list.iterator();
            while (it.hasNext()) {
                MultiJoinedRowData multiJoinedRowData = new MultiJoinedRowData(rowKind, it.next());
                multiJoinedRowData.replace(i - 1, (RowData) StreamingMultiJoinOperator.this.nullRecords.get(Integer.valueOf(i)));
                this.output.collect(StreamingMultiJoinOperator.this.outStreamRecord.replace(multiJoinedRowData));
            }
        }
    }

    public StreamingMultiJoinOperator(StreamOperatorParameters<RowData> streamOperatorParameters, List<InternalTypeInfo<RowData>> list, List<JoinInputSideSpec> list2, long j, boolean z, long j2, boolean z2) {
        super(streamOperatorParameters, list.size());
        this.inputs = createInputs(list);
        this.inputTypes = list;
        this.inputSideSpecs = list2;
        this.stateRetentionTime = j;
        this.isColdStateEnabled = z;
        this.coldStateRetentionTime = j2;
        this.isOuterJoin = z2;
    }

    private Map<Integer, StreamingMultiJoinInput> createInputs(List<InternalTypeInfo<RowData>> list) {
        return (Map) IntStream.rangeClosed(1, list.size()).mapToObj(i -> {
            return new StreamingMultiJoinInput(this, i);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getInputId();
        }, streamingMultiJoinInput -> {
            return streamingMultiJoinInput;
        }));
    }

    public void open() throws Exception {
        getRuntimeContext().setKeyedStateStore((KeyedStateStore) getKeyedStateStore().orElse(null));
        super.open();
        this.outStreamRecord = new StreamRecord<>((Object) null);
        this.nullRecords = new HashMap();
        this.recordStateViews = new HashMap();
        for (int i = 0; i < this.inputs.size(); i++) {
            this.recordStateViews.put(Integer.valueOf(i + 1), JoinRecordStateViews.create(getRuntimeContext(), "multi-join-records-" + i, this.inputSideSpecs.get(i), this.inputTypes.get(i), this.stateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime));
            this.nullRecords.put(Integer.valueOf(i + 1), new GenericRowData(this.inputTypes.get(i).toRowSize()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addRecord(int i, RowData rowData) throws Exception {
        this.recordStateViews.get(Integer.valueOf(i)).addRecord(rowData);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retractRecord(int i, RowData rowData) {
        try {
            this.recordStateViews.get(Integer.valueOf(i)).retractRecord(rowData);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Iterable<RowData> getRecords(int i) {
        try {
            return ImmutableList.copyOf(this.recordStateViews.get(Integer.valueOf(i)).getRecords());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<StreamingMultiJoinInput> getAllInputsExcept(int i) {
        return (List) this.inputs.values().stream().filter(streamingMultiJoinInput -> {
            return streamingMultiJoinInput.getInputId() != i;
        }).collect(Collectors.toList());
    }

    public List<Input> getInputs() {
        return (List) this.inputs.values().stream().map(streamingMultiJoinInput -> {
            return streamingMultiJoinInput;
        }).collect(Collectors.toList());
    }
}
