package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.class */
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> implements MultipleTransformationTranslator<RowData> {
    public static final String SOURCE_TRANSFORMATION = "source";
    public static final String FIELD_NAME_SCAN_TABLE_SOURCE = "scanTableSource";

    @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE)
    private final DynamicTableSourceSpec tableSourceSpec;

    /* JADX INFO: Access modifiers changed from: protected */
    public CommonExecTableSourceScan(int i, ExecNodeContext execNodeContext, ReadableConfig readableConfig, DynamicTableSourceSpec dynamicTableSourceSpec, List<InputProperty> list, LogicalType logicalType, String str) {
        super(i, execNodeContext, readableConfig, list, logicalType, str);
        this.tableSourceSpec = dynamicTableSourceSpec;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    public String getSimplifiedName() {
        return this.tableSourceSpec.getContextResolvedTable().getIdentifier().getObjectName();
    }

    public DynamicTableSourceSpec getTableSourceSpec() {
        return this.tableSourceSpec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    public Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        StreamExecutionEnvironment execEnv = plannerBase.getExecEnv();
        TransformationMetadata createTransformationMeta = createTransformationMeta(SOURCE_TRANSFORMATION, execNodeConfig);
        InternalTypeInfo of = InternalTypeInfo.of(getOutputType());
        SourceFunctionProvider scanRuntimeProvider = this.tableSourceSpec.getScanTableSource(plannerBase.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(plannerBase)).getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
        if (scanRuntimeProvider instanceof SourceFunctionProvider) {
            SourceFunctionProvider sourceFunctionProvider = scanRuntimeProvider;
            return createTransformationMeta.fill(createSourceFunctionTransformation(execEnv, sourceFunctionProvider.createSourceFunction(), sourceFunctionProvider.isBounded(), createTransformationMeta.getName(), of, scanRuntimeProvider.getParallelism()));
        }
        if (scanRuntimeProvider instanceof InputFormatProvider) {
            return createTransformationMeta.fill(createInputFormatTransformation(execEnv, ((InputFormatProvider) scanRuntimeProvider).createInputFormat(), of, createTransformationMeta.getName(), scanRuntimeProvider.getParallelism().isPresent() ? ((Integer) scanRuntimeProvider.getParallelism().get()).intValue() : -1));
        }
        if (scanRuntimeProvider instanceof SourceProvider) {
            Transformation transformation = execEnv.fromSource(((SourceProvider) scanRuntimeProvider).createSource(), WatermarkStrategy.noWatermarks(), createTransformationMeta.getName(), of).getTransformation();
            Optional parallelism = scanRuntimeProvider.getParallelism();
            transformation.getClass();
            parallelism.ifPresent((v1) -> {
                r1.setParallelism(v1);
            });
            return createTransformationMeta.fill(transformation);
        }
        if (scanRuntimeProvider instanceof DataStreamScanProvider) {
            Transformation<RowData> transformation2 = ((DataStreamScanProvider) scanRuntimeProvider).produceDataStream(createProviderContext(execNodeConfig), execEnv).getTransformation();
            createTransformationMeta.fill(transformation2);
            transformation2.setOutputType(of);
            return transformation2;
        }
        if (!(scanRuntimeProvider instanceof TransformationScanProvider)) {
            throw new UnsupportedOperationException(scanRuntimeProvider.getClass().getSimpleName() + " is unsupported now.");
        }
        Transformation<RowData> createTransformation = ((TransformationScanProvider) scanRuntimeProvider).createTransformation(createProviderContext(execNodeConfig));
        createTransformationMeta.fill(createTransformation);
        createTransformation.setOutputType(of);
        return createTransformation;
    }

    private ProviderContext createProviderContext(ExecNodeConfig execNodeConfig) {
        return str -> {
            return ((this instanceof StreamExecNode) && execNodeConfig.shouldSetUid()) ? Optional.of(createTransformationUid(str, execNodeConfig)) : Optional.empty();
        };
    }

    protected Transformation<RowData> createSourceFunctionTransformation(StreamExecutionEnvironment streamExecutionEnvironment, SourceFunction<RowData> sourceFunction, boolean z, String str, TypeInformation<RowData> typeInformation, Optional<Integer> optional) {
        int i;
        streamExecutionEnvironment.clean(sourceFunction);
        boolean z2 = false;
        if (sourceFunction instanceof ParallelSourceFunction) {
            i = streamExecutionEnvironment.getParallelism();
        } else {
            i = 1;
            z2 = true;
        }
        Boundedness boundedness = z ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
        StreamSource streamSource = new StreamSource(sourceFunction, !z);
        if (streamExecutionEnvironment.getConfig().isDisabledSourceSinkChaining()) {
            streamSource.setChainingStrategy(ChainingStrategy.NEVER);
        }
        return ((sourceFunction instanceof ParallelSourceFunction) && optional.isPresent()) ? new LegacySourceTransformation(str, streamSource, typeInformation, i, optional.get().intValue(), boundedness, z2) : new LegacySourceTransformation(str, streamSource, typeInformation, i, boundedness, z2);
    }

    protected abstract Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment streamExecutionEnvironment, InputFormat<RowData, ?> inputFormat, InternalTypeInfo<RowData> internalTypeInfo, String str, int i);
}
