package org.apache.flink.connector.file.table.stream;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.FileSystemFactory;
import org.apache.flink.connector.file.table.TableMetaStoreFactory;
import org.apache.flink.connector.file.table.batch.BatchSink;
import org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter;
import org.apache.flink.connector.file.table.stream.compact.CompactCoordinator;
import org.apache.flink.connector.file.table.stream.compact.CompactFileWriter;
import org.apache.flink.connector.file.table.stream.compact.CompactFileWriterWithMaxNumRecords;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.connector.file.table.stream.compact.CompactOperator;
import org.apache.flink.connector.file.table.stream.compact.CompactReader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.util.function.SupplierWithException;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/StreamingSink.class */
public class StreamingSink {
    private StreamingSink() {
    }

    public static <T> DataStream<PartitionCommitInfo> writer(ProviderContext providerContext, DataStream<T> dataStream, long j, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, int i, List<String> list, Configuration configuration, boolean z) {
        return writer(providerContext, dataStream, new StreamingFileWriter(j, bucketsBuilder, list, configuration), i, z);
    }

    public static <T> DataStream<PartitionCommitInfo> writer(ProviderContext providerContext, DataStream<T> dataStream, long j, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, int i, List<String> list, Configuration configuration, boolean z, int i2) {
        return writer(providerContext, dataStream, new StreamingFileWriterWithMaxNumRecords(j, bucketsBuilder, list, configuration, i2), i, z);
    }

    private static <T> DataStream<PartitionCommitInfo> writer(ProviderContext providerContext, DataStream<T> dataStream, StreamingFileWriter<T> streamingFileWriter, int i, boolean z) {
        SingleOutputStreamOperator transform = dataStream.transform(StreamingFileWriter.class.getSimpleName(), TypeInformation.of(PartitionCommitInfo.class), streamingFileWriter);
        transform.getTransformation().setParallelism(i, z);
        Optional generateUid = providerContext.generateUid("streaming-writer");
        Objects.requireNonNull(transform);
        generateUid.ifPresent(transform::uid);
        return transform;
    }

    public static <T> DataStream<PartitionCommitInfo> compactionWriter(ProviderContext providerContext, DataStream<T> dataStream, long j, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, FileSystemFactory fileSystemFactory, Path path, CompactReader.Factory<T> factory, long j2, int i, boolean z) {
        return compactionWriter(providerContext, dataStream, new CompactFileWriter(j, bucketsBuilder), bucketsBuilder, fileSystemFactory, path, factory, j2, i, z);
    }

    public static <T> DataStream<PartitionCommitInfo> compactionWriter(ProviderContext providerContext, DataStream<T> dataStream, long j, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, FileSystemFactory fileSystemFactory, Path path, CompactReader.Factory<T> factory, long j2, int i, boolean z, int i2) {
        return compactionWriter(providerContext, dataStream, new CompactFileWriterWithMaxNumRecords(j, bucketsBuilder, i2), bucketsBuilder, fileSystemFactory, path, factory, j2, i, z);
    }

    private static <T> DataStream<PartitionCommitInfo> compactionWriter(ProviderContext providerContext, DataStream<T> dataStream, CompactFileWriter<T> compactFileWriter, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, FileSystemFactory fileSystemFactory, Path path, CompactReader.Factory<T> factory, long j, int i, boolean z) {
        SupplierWithException supplierWithException = (Serializable) () -> {
            return fileSystemFactory.create(path.toUri());
        };
        CompactCoordinator compactCoordinator = new CompactCoordinator(supplierWithException, j);
        SingleOutputStreamOperator transform = dataStream.transform("streaming-writer", TypeInformation.of(CompactMessages.CoordinatorInput.class), compactFileWriter);
        transform.getTransformation().setParallelism(i, z);
        Optional generateUid = providerContext.generateUid("streaming-writer");
        Objects.requireNonNull(transform);
        generateUid.ifPresent(transform::uid);
        SingleOutputStreamOperator maxParallelism = transform.transform(BatchSink.COORDINATOR_OP_NAME, TypeInformation.of(CompactMessages.CoordinatorOutput.class), compactCoordinator).setParallelism(1).setMaxParallelism(1);
        Optional generateUid2 = providerContext.generateUid(BatchSink.COORDINATOR_OP_NAME);
        Objects.requireNonNull(maxParallelism);
        generateUid2.ifPresent(maxParallelism::uid);
        Objects.requireNonNull(bucketsBuilder);
        SingleOutputStreamOperator transform2 = maxParallelism.broadcast().transform(BatchSink.COMPACT_OP_NAME, TypeInformation.of(PartitionCommitInfo.class), new CompactOperator(supplierWithException, factory, CompactBucketWriter.factory((Serializable) bucketsBuilder::createBucketWriter)));
        transform2.getTransformation().setParallelism(i, z);
        Optional generateUid3 = providerContext.generateUid(BatchSink.COMPACT_OP_NAME);
        Objects.requireNonNull(transform2);
        generateUid3.ifPresent(transform2::uid);
        return transform2;
    }

    public static DataStreamSink<?> sink(ProviderContext providerContext, DataStream<PartitionCommitInfo> dataStream, Path path, ObjectIdentifier objectIdentifier, List<String> list, TableMetaStoreFactory tableMetaStoreFactory, FileSystemFactory fileSystemFactory, Configuration configuration) {
        DataStream<PartitionCommitInfo> dataStream2 = dataStream;
        if (list.size() > 0 && configuration.contains(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND)) {
            DataStream<PartitionCommitInfo> maxParallelism = dataStream.transform(PartitionCommitter.class.getSimpleName(), Types.VOID, new PartitionCommitter(path, objectIdentifier, list, tableMetaStoreFactory, fileSystemFactory, configuration)).setParallelism(1).setMaxParallelism(1);
            Optional generateUid = providerContext.generateUid("partition-committer");
            Objects.requireNonNull(maxParallelism);
            generateUid.ifPresent(maxParallelism::uid);
            dataStream2 = maxParallelism;
        }
        DataStreamSink<?> parallelism = dataStream2.sinkTo(new DiscardingSink()).name("end").setParallelism(1);
        Optional generateUid2 = providerContext.generateUid("discarding-sink");
        Objects.requireNonNull(parallelism);
        generateUid2.ifPresent(parallelism::uid);
        return parallelism;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1543003460:
                if (implMethodName.equals("lambda$compactionWriter$b91f92fa$1")) {
                    z = true;
                    break;
                }
                break;
            case 1968872985:
                if (implMethodName.equals("createBucketWriter")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT /* 0 */:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SupplierWithException") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$BucketsBuilder") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter;")) {
                    StreamingFileSink.BucketsBuilder bucketsBuilder = (StreamingFileSink.BucketsBuilder) serializedLambda.getCapturedArg(0);
                    return bucketsBuilder::createBucketWriter;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SupplierWithException") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/file/table/stream/StreamingSink") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/connector/file/table/FileSystemFactory;Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/core/fs/FileSystem;")) {
                    FileSystemFactory fileSystemFactory = (FileSystemFactory) serializedLambda.getCapturedArg(0);
                    Path path = (Path) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return fileSystemFactory.create(path.toUri());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
