package org.apache.hudi.sink.hbase;

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
import org.apache.flink.connector.hbase.table.HBaseConnectorOptions;
import org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.sink.HBaseAsyncSink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.hbase.converter.HBaseRowConverter;

/* loaded from: input_file:org/apache/hudi/sink/hbase/HBaseSyncOperatorFactory.class */
public class HBaseSyncOperatorFactory extends AbstractStreamOperatorFactory<RowData> implements OneInputStreamOperatorFactory<RowData, RowData>, YieldingOperatorFactory<RowData> {
    private final HBaseRowConverter hBaseRowConverter;
    private final HBaseAsyncSink<RowData> hBaseAsyncSink;

    public HBaseSyncOperatorFactory(Configuration configuration, DataType dataType) {
        this.hBaseRowConverter = HBaseRowConverter.bySyncRowKeyMode(configuration, dataType);
        DataType hbaseDataType = this.hBaseRowConverter.getHbaseDataType();
        Configuration hbaseTableConfiguration = HBaseRowConverter.getHbaseTableConfiguration(configuration);
        this.hBaseAsyncSink = createHbaseSink(new RowDataToMutationConverter(HBaseTableSchema.fromDataType(hbaseDataType), (String) hbaseTableConfiguration.get(HBaseConnectorOptions.NULL_STRING_LITERAL), ((Boolean) hbaseTableConfiguration.get(HBaseConnectorOptions.FILTER_DELETE_RECORDS)).booleanValue()), configuration, hbaseTableConfiguration);
    }

    public <T extends StreamOperator<RowData>> T createStreamOperator(StreamOperatorParameters<RowData> streamOperatorParameters) {
        try {
            HBaseSyncOperator hBaseSyncOperator = new HBaseSyncOperator(this.hBaseRowConverter, this.hBaseAsyncSink, this.processingTimeService, getMailboxExecutor());
            hBaseSyncOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return hBaseSyncOperator;
        } catch (Exception e) {
            throw new IllegalStateException("Cannot create sink operator for " + streamOperatorParameters.getStreamConfig().getOperatorName(), e);
        }
    }

    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return HBaseSyncOperator.class;
    }

    private static HBaseAsyncSink<RowData> createHbaseSink(RowDataToMutationConverter rowDataToMutationConverter, Configuration configuration, Configuration configuration2) {
        HBaseAsyncSink.Builder hTableName = HBaseAsyncSink.builder().setHadoopConf(HBaseConnectorOptionsUtil.getHBaseConfiguration(configuration2)).setMutationConverter(rowDataToMutationConverter).setHTableName((String) configuration.getOptional(FlinkOptions.HBASE_SYNC_TABLE_NAME).orElse(configuration.get(FlinkOptions.TABLE_NAME)));
        hTableName.setMaxBufferedRequests(((Integer) configuration2.getOptional(AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS).orElse(Integer.valueOf(((((Double) configuration.get(FlinkOptions.WRITE_TASK_MAX_SIZE)).intValue() * Opcodes.ACC_ABSTRACT) * Opcodes.ACC_ABSTRACT) / 1048576))).intValue());
        Option ofNullable = Option.ofNullable(configuration2.get(AsyncSinkConnectorOptions.MAX_BATCH_SIZE));
        hTableName.getClass();
        ofNullable.ifPresent((v1) -> {
            r1.setMaxBatchSize(v1);
        });
        Option ofNullable2 = Option.ofNullable(configuration2.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE));
        hTableName.getClass();
        ofNullable2.ifPresent((v1) -> {
            r1.setMaxBatchSizeInBytes(v1);
        });
        Option ofNullable3 = Option.ofNullable(configuration2.get(AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS));
        hTableName.getClass();
        ofNullable3.ifPresent((v1) -> {
            r1.setMaxInFlightRequests(v1);
        });
        Option ofNullable4 = Option.ofNullable(configuration2.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT));
        hTableName.getClass();
        ofNullable4.ifPresent((v1) -> {
            r1.setMaxTimeInBufferMS(v1);
        });
        Option ofNullable5 = Option.ofNullable(configuration2.get(AsyncSinkConnectorOptions.MAX_REQUEST_RETRY_COUNT));
        hTableName.getClass();
        ofNullable5.ifPresent(hTableName::setMaxRetriesPerRequest);
        Option ofNullable6 = Option.ofNullable(Long.valueOf(((Duration) configuration2.get(FlinkOptions.HBASE_SYNC_TTL)).toMillis()));
        hTableName.getClass();
        ofNullable6.ifPresent(hTableName::setWriteTtlMs);
        return hTableName.build();
    }
}
