package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MultiJoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExecNodeMetadata(name = "stream-exec-multi-join", version = 1, producedTransformations = {StreamExecMultiJoin.MULTI_JOIN_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.class */
public class StreamExecMultiJoin extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamExecMultiJoin.class);
    public static final String MULTI_JOIN_TRANSFORMATION = "multi-join";
    public static final String FIELD_NAME_JOIN_SPEC = "multiJoinSpec";
    public static final String FIELD_NAME_INPUT_UNIQUE_KEYS = "inputUniqueKeys";
    public static final String FILE_NAME_STATE_RETENTION_TIME = "stateRetentionTime";

    @JsonProperty(FIELD_NAME_JOIN_SPEC)
    private final MultiJoinSpec joinSpec;

    @JsonProperty(FIELD_NAME_INPUT_UNIQUE_KEYS)
    private final List<List<int[]>> inputUniqueKeys;
    private final long stateRetentionTime;

    public StreamExecMultiJoin(ReadableConfig readableConfig, MultiJoinSpec multiJoinSpec, List<List<int[]>> list, List<InputProperty> list2, RowType rowType, String str, Long l) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecMultiJoin.class), ExecNodeContext.newPersistedConfig(StreamExecMultiJoin.class, readableConfig), multiJoinSpec, list, list2, rowType, str, l.longValue());
    }

    @JsonCreator
    public StreamExecMultiJoin(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("multiJoinSpec") MultiJoinSpec multiJoinSpec, @JsonProperty("inputUniqueKeys") List<List<int[]>> list, @JsonProperty("inputProperties") List<InputProperty> list2, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str, @JsonProperty("stateRetentionTime") long j) {
        super(i, execNodeContext, readableConfig, list2, rowType, str);
        this.joinSpec = (MultiJoinSpec) Preconditions.checkNotNull(multiJoinSpec);
        this.inputUniqueKeys = list;
        this.stateRetentionTime = j;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        List list = (List) getInputEdges().stream().map(execEdge -> {
            return execEdge.translateToPlan(plannerBase);
        }).collect(Collectors.toList());
        List list2 = (List) ((List) getInputEdges().stream().map(execEdge2 -> {
            return execEdge2.getOutputType();
        }).collect(Collectors.toList())).stream().map(InternalTypeInfo::of).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < list2.size(); i++) {
            int[] iArr = this.joinSpec.getJoinKeys().get(i);
            arrayList.add(i, JoinUtil.analyzeJoinInput(plannerBase.getFlinkContext().getClassLoader(), (InternalTypeInfo) list2.get(i), iArr, this.inputUniqueKeys.get(i)));
            arrayList2.add(i, KeySelectorUtil.getRowDataSelector(plannerBase.getFlinkContext().getClassLoader(), iArr, (InternalTypeInfo) list2.get(i)));
        }
        boolean isColdStateOptionEnabled = execNodeConfig.isColdStateOptionEnabled();
        LOG.info("Ttl of every join side: {}, Is cold state enabled: {}, Cold state retantion time: {}.", new Object[]{Long.valueOf(this.stateRetentionTime), Boolean.valueOf(isColdStateOptionEnabled), Long.valueOf(execNodeConfig.getColdStateRetentionTime())});
        return ExecNodeUtil.createMultipleInputTransformation(list, createTransformationMeta(MULTI_JOIN_TRANSFORMATION, execNodeConfig), new StreamingMultiJoinOperatorFactory(list2, arrayList, this.stateRetentionTime, isColdStateOptionEnabled, 0L, this.joinSpec.getJoinType().isOuter()), InternalTypeInfo.of(getOutputType()), arrayList2, ((Transformation) list.get(0)).getParallelism(), 0L);
    }
}
