package org.apache.hudi.operator;

import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/operator/KeyedWriteProcessFunction.class */
public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
    private List<HoodieRecord> bufferedRecords = new LinkedList();
    private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
    private int indexOfThisSubtask;
    private String latestInstant;
    private boolean hasRecordsIn;
    private HoodieFlinkStreamer.Config cfg;
    private transient HoodieFlinkWriteClient writeClient;

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.cfg = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        this.writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext())), StreamerUtil.getHoodieClientConfig(this.cfg));
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        List<WriteStatus> upsert;
        List<String> inflightsAndRequestedInstants = this.writeClient.getInflightsAndRequestedInstants(this.cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION);
        this.latestInstant = inflightsAndRequestedInstants.isEmpty() ? null : inflightsAndRequestedInstants.get(0);
        if (this.bufferedRecords.size() <= 0) {
            LOG.info("No data in subtask [{}]", Integer.valueOf(this.indexOfThisSubtask));
            this.hasRecordsIn = false;
            return;
        }
        this.hasRecordsIn = true;
        if (this.output == null || this.latestInstant == null) {
            return;
        }
        String str = this.latestInstant;
        LOG.info("Write records, subtask id = [{}]  checkpoint_id = [{}}] instant = [{}], record size = [{}]", new Object[]{Integer.valueOf(this.indexOfThisSubtask), Long.valueOf(functionSnapshotContext.getCheckpointId()), str, Integer.valueOf(this.bufferedRecords.size())});
        switch (this.cfg.operation) {
            case INSERT:
                upsert = this.writeClient.insert((List) this.bufferedRecords, str);
                break;
            case UPSERT:
                upsert = this.writeClient.upsert((List) this.bufferedRecords, str);
                break;
            default:
                throw new HoodieFlinkStreamerException("Unknown operation : " + this.cfg.operation);
        }
        this.output.collect(new Tuple3(str, upsert, Integer.valueOf(this.indexOfThisSubtask)));
        this.bufferedRecords.clear();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
    }

    public void processElement(HoodieRecord hoodieRecord, KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>>.Context context, Collector<Tuple3<String, List<WriteStatus>, Integer>> collector) {
        if (this.output == null) {
            this.output = collector;
        }
        this.bufferedRecords.add(hoodieRecord);
    }

    public boolean hasRecordsIn() {
        return this.hasRecordsIn;
    }

    public String getLatestInstant() {
        return this.latestInstant;
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((HoodieRecord) obj, (KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>>.Context) context, (Collector<Tuple3<String, List<WriteStatus>, Integer>>) collector);
    }
}
