package com.huawei.hudi.hst.table;

import com.huawei.hudi.hst.source.HoodieSource;
import com.huawei.hudi.hst.source.enumerator.HoodieSplitEnumerator;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hudi.common.model.hst.MetaKafka;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/hudi/hst/table/HoodieStreamTableSource.class */
public class HoodieStreamTableSource implements ScanTableSource, SupportsWatermarkPushDown, SupportsReadingMetadata {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieStreamTableSource.class);
    private final HoodieTableSource hoodieTableSource;
    private final KafkaDynamicSource kafkaTableSource;
    private final String kafkaTopic;

    public HoodieStreamTableSource(HoodieTableSource hoodieTableSource, KafkaDynamicSource kafkaDynamicSource, String str) {
        this.hoodieTableSource = hoodieTableSource;
        this.kafkaTableSource = kafkaDynamicSource;
        this.kafkaTopic = str;
    }

    public ChangelogMode getChangelogMode() {
        return this.hoodieTableSource.getChangelogMode();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        HoodieSource hybrid = HoodieSource.hybrid(this.hoodieTableSource.getInputFormat(false));
        String str = this.kafkaTopic;
        KafkaSourceBuilder createKafkaSourceBuilder = this.kafkaTableSource.createKafkaSourceBuilder(scanContext);
        return SourceProvider.of(HybridSource.builder(hybrid).addSource(sourceSwitchContext -> {
            MetaKafka lastCommitKafkaMetadata = ((HoodieSplitEnumerator) sourceSwitchContext.getPreviousEnumerator()).getLastCommitKafkaMetadata();
            HashMap hashMap = new HashMap();
            if (lastCommitKafkaMetadata != null) {
                for (Map.Entry entry : lastCommitKafkaMetadata.getPartitions().entrySet()) {
                    hashMap.put(new TopicPartition(str, ((Integer) entry.getKey()).intValue()), Long.valueOf(((Long) entry.getValue()).longValue() + 1));
                }
            }
            if (hashMap.isEmpty()) {
                throw new HoodieValidationException("There are no metadata info in Hudi about Kafka offsets.");
            }
            LOG.info("Hudi stream table source switch from Hudi to Kafka. Metadata: {}", lastCommitKafkaMetadata.toJson());
            return createKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(hashMap)).build();
        }, Boundedness.CONTINUOUS_UNBOUNDED).build());
    }

    /* renamed from: copy, reason: merged with bridge method [inline-methods] */
    public HoodieStreamTableSource m10copy() {
        return new HoodieStreamTableSource(this.hoodieTableSource.m69copy(), this.kafkaTableSource.copy(), this.kafkaTopic);
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.kafkaTableSource.applyWatermark(watermarkStrategy);
    }

    public Map<String, DataType> listReadableMetadata() {
        return this.kafkaTableSource.listReadableMetadata();
    }

    public void applyReadableMetadata(List<String> list, DataType dataType) {
        this.kafkaTableSource.applyReadableMetadata(list, dataType);
    }

    public boolean supportsMetadataProjection() {
        return this.kafkaTableSource.supportsMetadataProjection();
    }

    public String asSummaryString() {
        return "HudiStreamTableSource";
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 649741339:
                if (implMethodName.equals("lambda$getScanRuntimeProvider$2deb975e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/base/source/hybrid/HybridSource$SourceFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/connector/base/source/hybrid/HybridSource$SourceSwitchContext;)Lorg/apache/flink/api/connector/source/Source;") && serializedLambda.getImplClass().equals("com/huawei/hudi/hst/table/HoodieStreamTableSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/connector/kafka/source/KafkaSourceBuilder;Lorg/apache/flink/connector/base/source/hybrid/HybridSource$SourceSwitchContext;)Lorg/apache/flink/connector/kafka/source/KafkaSource;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    KafkaSourceBuilder kafkaSourceBuilder = (KafkaSourceBuilder) serializedLambda.getCapturedArg(1);
                    return sourceSwitchContext -> {
                        MetaKafka lastCommitKafkaMetadata = ((HoodieSplitEnumerator) sourceSwitchContext.getPreviousEnumerator()).getLastCommitKafkaMetadata();
                        HashMap hashMap = new HashMap();
                        if (lastCommitKafkaMetadata != null) {
                            for (Map.Entry entry : lastCommitKafkaMetadata.getPartitions().entrySet()) {
                                hashMap.put(new TopicPartition(str, ((Integer) entry.getKey()).intValue()), Long.valueOf(((Long) entry.getValue()).longValue() + 1));
                            }
                        }
                        if (hashMap.isEmpty()) {
                            throw new HoodieValidationException("There are no metadata info in Hudi about Kafka offsets.");
                        }
                        LOG.info("Hudi stream table source switch from Hudi to Kafka. Metadata: {}", lastCommitKafkaMetadata.toJson());
                        return kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(hashMap)).build();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
