package org.apache.hudi.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/operator/InstantGenerateOperator.class */
public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord> implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
    public static final String NAME = "InstantGenerateOperator";
    private HoodieFlinkStreamer.Config cfg;
    private HoodieFlinkWriteClient writeClient;
    private SerializableConfiguration serializableHadoopConf;
    private transient FileSystem fs;
    private transient ListState<String> latestInstantState;
    private transient ListState<StreamRecord> recordsState;
    private Integer retryTimes;
    private Integer retryInterval;
    private String latestInstant = "";
    private List<String> latestInstantList = new ArrayList(1);
    private List<StreamRecord> bufferedRecords = new LinkedList();

    public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
        if (streamRecord.getValue() != null) {
            this.bufferedRecords.add(streamRecord);
            this.output.collect(streamRecord);
        }
    }

    public void open() throws Exception {
        super.open();
        this.cfg = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        this.retryTimes = Integer.valueOf(this.cfg.blockRetryTime);
        this.retryInterval = Integer.valueOf(this.cfg.blockRetryInterval);
        this.serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
        this.fs = FSUtils.getFs(this.cfg.targetBasePath, this.serializableHadoopConf.get());
        this.writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(this.cfg), true);
        initTable();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        if (!StringUtils.isNullOrEmpty(this.latestInstant)) {
            doCheck();
            this.latestInstant = "";
        }
        if (this.bufferedRecords.isEmpty()) {
            return;
        }
        this.latestInstant = startNewInstant(j);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.latestInstantState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("latestInstant", String.class));
        this.recordsState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("recordsState", StreamRecord.class));
        if (stateInitializationContext.isRestored()) {
            ((Iterable) this.latestInstantState.get()).iterator().forEachRemaining(str -> {
                this.latestInstant = str;
            });
            LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", this.latestInstant);
            Iterator it = ((Iterable) this.recordsState.get()).iterator();
            this.bufferedRecords.clear();
            it.forEachRemaining(streamRecord -> {
                this.bufferedRecords.add(streamRecord);
            });
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        if (this.latestInstantList.isEmpty()) {
            this.latestInstantList.add(this.latestInstant);
        } else {
            this.latestInstantList.set(0, this.latestInstant);
        }
        this.latestInstantState.update(this.latestInstantList);
        LOG.info("Update latest instant [{}]", this.latestInstant);
        this.recordsState.update(this.bufferedRecords);
        LOG.info("Update records state size = [{}]", Integer.valueOf(this.bufferedRecords.size()));
        this.bufferedRecords.clear();
    }

    private String startNewInstant(long j) {
        String startCommit = this.writeClient.startCommit();
        LOG.info("create instant [{}], at checkpoint [{}]", startCommit, Long.valueOf(j));
        return startCommit;
    }

    private void doCheck() throws InterruptedException {
        String str = this.cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
        LOG.info("Query latest instant [{}]", this.latestInstant);
        List<String> inflightsAndRequestedInstants = this.writeClient.getInflightsAndRequestedInstants(str);
        int i = 0;
        while (i < this.retryTimes.intValue()) {
            i++;
            StringBuffer stringBuffer = new StringBuffer();
            if (!inflightsAndRequestedInstants.contains(this.latestInstant)) {
                LOG.warn("Latest transaction [{}] is completed! Completed transaction, try times [{}]", this.latestInstant, Integer.valueOf(i));
                return;
            }
            inflightsAndRequestedInstants.forEach(str2 -> {
                stringBuffer.append(str2).append(",");
            });
            LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", new Object[]{this.latestInstant, stringBuffer.toString(), Integer.valueOf(i)});
            TimeUnit.SECONDS.sleep(this.retryInterval.intValue());
            inflightsAndRequestedInstants = this.writeClient.getInflightsAndRequestedInstants(str);
        }
        throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", Integer.valueOf(this.retryTimes.intValue() * this.retryInterval.intValue())));
    }

    private void initTable() throws IOException {
        if (this.fs.exists(new Path(this.cfg.targetBasePath))) {
            LOG.info("Table already [{}/{}] exists, do nothing here", this.cfg.targetBasePath, this.cfg.targetTableName);
        } else {
            HoodieTableMetaClient.initTableType(new Configuration(this.serializableHadoopConf.get()), this.cfg.targetBasePath, HoodieTableType.valueOf(this.cfg.tableType), this.cfg.targetTableName, "archived", this.cfg.payloadClassName, (Integer) 1);
            LOG.info("Table initialized");
        }
    }

    public void close() throws Exception {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        if (this.fs != null) {
            this.fs.close();
        }
    }
}
