package org.apache.hudi.source;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.prune.PartitionPruner;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/source/StreamReadMonitoringFunction.class */
public class StreamReadMonitoringFunction extends RichSourceFunction<MergeOnReadInputSplit> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
    private static final long serialVersionUID = 1;
    protected final Path path;
    protected final long interval;
    protected final boolean cdcEnabled;
    protected transient Object checkpointLock;
    protected volatile boolean isRunning = true;
    protected String issuedInstant;
    protected String issuedOffset;
    protected transient ListState<String> instantState;
    protected final Configuration conf;
    private HoodieTableMetaClient metaClient;
    protected final IncrementalInputSplits incrementalInputSplits;
    protected transient FlinkStreamReadMetrics readMetrics;

    public StreamReadMonitoringFunction(Configuration configuration, Path path, RowType rowType, long j, @Nullable PartitionPruner partitionPruner) {
        this.conf = configuration;
        this.path = path;
        this.interval = configuration.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
        this.cdcEnabled = configuration.getBoolean(FlinkOptions.CDC_ENABLED);
        this.incrementalInputSplits = IncrementalInputSplits.builder().conf(configuration).path(path).rowType(rowType).maxCompactionMemoryInBytes(j).partitionPruner(partitionPruner).skipCompaction(configuration.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)).skipClustering(configuration.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING)).skipReplace(configuration.getBoolean(FlinkOptions.READ_STREAMING_SKIP_REPLACE)).build();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ValidationUtils.checkState(this.instantState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        registerMetrics();
        this.instantState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("file-monitoring-state", StringSerializer.INSTANCE));
        if (functionInitializationContext.isRestored()) {
            LOG.info("Restoring state for the class {} with table {} and base path {}.", new Object[]{getClass().getSimpleName(), this.conf.getString(FlinkOptions.TABLE_NAME), this.path});
            ArrayList arrayList = new ArrayList();
            Iterator it = ((Iterable) this.instantState.get()).iterator();
            while (it.hasNext()) {
                arrayList.add((String) it.next());
            }
            ValidationUtils.checkArgument(arrayList.size() <= 2, getClass().getSimpleName() + " retrieved invalid state.");
            if (arrayList.size() == 1 && this.issuedInstant != null) {
                throw new IllegalArgumentException("The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");
            }
            if (arrayList.size() == 1) {
                this.issuedInstant = (String) arrayList.get(0);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} retrieved an issued instant of time {} for table {} with path {}.", new Object[]{getClass().getSimpleName(), this.issuedInstant, this.conf.get(FlinkOptions.TABLE_NAME), this.path});
                    return;
                }
                return;
            }
            if (arrayList.size() == 2) {
                this.issuedInstant = (String) arrayList.get(0);
                this.issuedOffset = (String) arrayList.get(1);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.", new Object[]{getClass().getSimpleName(), this.issuedInstant, this.issuedOffset, this.conf.get(FlinkOptions.TABLE_NAME), this.path});
                }
            }
        }
    }

    public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> sourceContext) throws Exception {
        this.checkpointLock = sourceContext.getCheckpointLock();
        while (this.isRunning) {
            synchronized (this.checkpointLock) {
                monitorDirAndForwardSplits(sourceContext);
            }
            TimeUnit.SECONDS.sleep(this.interval);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public HoodieTableMetaClient getOrCreateMetaClient() {
        if (this.metaClient != null) {
            return this.metaClient;
        }
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        if (!StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
            return null;
        }
        this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), hadoopConf);
        return this.metaClient;
    }

    @VisibleForTesting
    public void monitorDirAndForwardSplits(SourceFunction.SourceContext<MergeOnReadInputSplit> sourceContext) {
        HoodieTableMetaClient orCreateMetaClient = getOrCreateMetaClient();
        if (orCreateMetaClient == null) {
            return;
        }
        IncrementalInputSplits.Result inputSplits = this.incrementalInputSplits.inputSplits(orCreateMetaClient, this.issuedInstant, this.issuedOffset, this.cdcEnabled);
        if (inputSplits.isEmpty()) {
            return;
        }
        Iterator<MergeOnReadInputSplit> it = inputSplits.getInputSplits().iterator();
        while (it.hasNext()) {
            sourceContext.collect(it.next());
        }
        this.issuedInstant = inputSplits.getEndInstant();
        this.issuedOffset = inputSplits.getOffset();
        LOG.info("\n------------------------------------------------------------\n----------Table {} is consumed to instant: {}\n------------------------------------------------------------", orCreateMetaClient.getTableConfig().getTableName(), this.issuedInstant);
    }

    public void close() throws Exception {
        super.close();
        if (this.checkpointLock != null) {
            synchronized (this.checkpointLock) {
                this.issuedInstant = null;
                this.isRunning = false;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closed File Monitoring Source for path: " + this.path + ".");
        }
    }

    public void cancel() {
        if (this.checkpointLock == null) {
            this.isRunning = false;
            return;
        }
        synchronized (this.checkpointLock) {
            this.isRunning = false;
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.instantState.clear();
        if (this.issuedInstant != null) {
            this.instantState.add(this.issuedInstant);
            this.readMetrics.setIssuedInstant(this.issuedInstant);
        }
        if (this.issuedOffset != null) {
            this.instantState.add(this.issuedOffset);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerMetrics() {
        this.readMetrics = new FlinkStreamReadMetrics(getRuntimeContext().getMetricGroup());
        this.readMetrics.registerMetrics();
    }
}
