package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.class */
public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG;
    private FileInputFormat<OUT> format;
    private TypeSerializer<OUT> serializer;
    private transient Object checkpointLock;
    private transient ContinuousFileReaderOperator<OUT>.SplitReader<OUT> reader;
    private transient SourceFunction.SourceContext<OUT> readerContext;
    private transient ListState<TimestampedFileInputSplit> checkpointedState;
    private transient List<TimestampedFileInputSplit> restoredReaderState;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator$SplitReader.class */
    public class SplitReader<OT> extends Thread {
        private volatile boolean shouldClose;
        private volatile boolean isRunning;
        private final FileInputFormat<OT> format;
        private final TypeSerializer<OT> serializer;
        private final Object checkpointLock;
        private final SourceFunction.SourceContext<OT> readerContext;
        private final Queue<TimestampedFileInputSplit> pendingSplits;
        private TimestampedFileInputSplit currentSplit;
        private volatile boolean isSplitOpen;

        private SplitReader(FileInputFormat<OT> fileInputFormat, TypeSerializer<OT> typeSerializer, SourceFunction.SourceContext<OT> sourceContext, Object obj, List<TimestampedFileInputSplit> list) {
            this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat, "Unspecified FileInputFormat.");
            this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer, "Unspecified Serializer.");
            this.readerContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext, "Unspecified Reader Context.");
            this.checkpointLock = Preconditions.checkNotNull(obj, "Unspecified checkpoint lock.");
            this.shouldClose = false;
            this.isRunning = true;
            this.pendingSplits = new PriorityQueue();
            if (list != null) {
                this.pendingSplits.addAll(list);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addSplit(TimestampedFileInputSplit timestampedFileInputSplit) {
            Preconditions.checkNotNull(timestampedFileInputSplit, "Cannot insert a null value in the pending splits queue.");
            synchronized (this.checkpointLock) {
                this.pendingSplits.add(timestampedFileInputSplit);
            }
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    Counter counter = ContinuousFileReaderOperator.this.getMetricGroup().counter("numSplitsProcessed");
                    this.format.openInputFormat();
                    while (this.isRunning) {
                        synchronized (this.checkpointLock) {
                            if (this.currentSplit == null) {
                                this.currentSplit = this.pendingSplits.poll();
                                if (this.currentSplit == null) {
                                    if (this.shouldClose) {
                                        this.isRunning = false;
                                    } else {
                                        this.checkpointLock.wait(50L);
                                    }
                                }
                            }
                            if (!(this.format instanceof CheckpointableInputFormat) || this.currentSplit.getSplitState() == null) {
                                this.format.open((FileInputSplit) this.currentSplit);
                            } else {
                                ((CheckpointableInputFormat) this.format).reopen(this.currentSplit, this.currentSplit.getSplitState());
                            }
                            this.currentSplit.resetSplitState();
                            this.isSplitOpen = true;
                            ContinuousFileReaderOperator.LOG.debug("Reading split: " + this.currentSplit);
                            try {
                                OT mo3688createInstance = this.serializer.mo3688createInstance();
                                while (true) {
                                    if (!this.format.reachedEnd()) {
                                        synchronized (this.checkpointLock) {
                                            mo3688createInstance = this.format.nextRecord(mo3688createInstance);
                                            if (mo3688createInstance != null) {
                                                this.readerContext.collect(mo3688createInstance);
                                            }
                                        }
                                        break;
                                    }
                                    break;
                                }
                                counter.inc();
                                synchronized (this.checkpointLock) {
                                    this.format.close();
                                    this.isSplitOpen = false;
                                    this.currentSplit = null;
                                }
                            } catch (Throwable th) {
                                synchronized (this.checkpointLock) {
                                    this.format.close();
                                    this.isSplitOpen = false;
                                    this.currentSplit = null;
                                    throw th;
                                }
                            }
                        }
                    }
                    synchronized (this.checkpointLock) {
                        ContinuousFileReaderOperator.LOG.debug("Reader terminated, and exiting...");
                        try {
                            this.format.closeInputFormat();
                        } catch (IOException e) {
                            ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e.getMessage(), e);
                        }
                        this.isSplitOpen = false;
                        this.currentSplit = null;
                        this.isRunning = false;
                        this.checkpointLock.notifyAll();
                    }
                } catch (Throwable th2) {
                    ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception when processing split: " + this.currentSplit, th2);
                    synchronized (this.checkpointLock) {
                        ContinuousFileReaderOperator.LOG.debug("Reader terminated, and exiting...");
                        try {
                            this.format.closeInputFormat();
                        } catch (IOException e2) {
                            ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e2.getMessage(), e2);
                        }
                        this.isSplitOpen = false;
                        this.currentSplit = null;
                        this.isRunning = false;
                        this.checkpointLock.notifyAll();
                    }
                }
            } catch (Throwable th3) {
                synchronized (this.checkpointLock) {
                    ContinuousFileReaderOperator.LOG.debug("Reader terminated, and exiting...");
                    try {
                        this.format.closeInputFormat();
                    } catch (IOException e3) {
                        ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e3.getMessage(), e3);
                    }
                    this.isSplitOpen = false;
                    this.currentSplit = null;
                    this.isRunning = false;
                    this.checkpointLock.notifyAll();
                    throw th3;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<TimestampedFileInputSplit> getReaderState() throws IOException {
            ArrayList arrayList = new ArrayList(this.pendingSplits.size());
            if (this.currentSplit != null) {
                if ((this.format instanceof CheckpointableInputFormat) && this.isSplitOpen) {
                    this.currentSplit.setSplitState(((CheckpointableInputFormat) this.format).getCurrentState());
                }
                arrayList.add(this.currentSplit);
            }
            arrayList.addAll(this.pendingSplits);
            return arrayList;
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void close() {
            this.shouldClose = true;
        }
    }

    public ContinuousFileReaderOperator(FileInputFormat<OUT> fileInputFormat) {
        this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat);
    }

    @Override // org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        this.serializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        Preconditions.checkState(this.checkpointedState == null, "The reader state has already been initialized.");
        this.checkpointedState = stateInitializationContext.getOperatorStateStore().getSerializableListState("splits");
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        if (!stateInitializationContext.isRestored()) {
            LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
            return;
        }
        LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
        if (this.restoredReaderState == null) {
            this.restoredReaderState = new ArrayList();
            Iterator it = this.checkpointedState.get().iterator();
            while (it.hasNext()) {
                this.restoredReaderState.add((TimestampedFileInputSplit) it.next());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} (taskIdx={}) restored {}.", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), this.restoredReaderState);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        Preconditions.checkState(this.reader == null, "The reader is already initialized.");
        Preconditions.checkState(this.serializer != null, "The serializer has not been set. Probably the setOutputType() was not called. Please report it.");
        this.format.setRuntimeContext(getRuntimeContext());
        this.format.configure(new Configuration());
        this.checkpointLock = getContainingTask().getCheckpointLock();
        this.readerContext = StreamSourceContexts.getSourceContext(getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), this.checkpointLock, getContainingTask().getStreamStatusMaintainer(), this.output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), -1L);
        this.reader = new SplitReader<>(this.format, this.serializer, this.readerContext, this.checkpointLock, this.restoredReaderState);
        this.restoredReaderState = null;
        this.reader.start();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<TimestampedFileInputSplit> streamRecord) throws Exception {
        this.reader.addSplit(streamRecord.getValue());
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator, org.apache.flink.util.Disposable
    public void dispose() throws Exception {
        super.dispose();
        this.reader.cancel();
        try {
            this.reader.join(200L);
        } catch (InterruptedException e) {
        }
        while (this.reader.isAlive()) {
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : this.reader.getStackTrace()) {
                sb.append(stackTraceElement).append('\n');
            }
            LOG.warn("The reader is stuck in method:\n {}", sb.toString());
            this.reader.interrupt();
            try {
                this.reader.join(50L);
            } catch (InterruptedException e2) {
            }
        }
        this.reader = null;
        this.readerContext = null;
        this.restoredReaderState = null;
        this.format = null;
        this.serializer = null;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        waitSplitReaderFinished();
        this.output.close();
    }

    private void waitSplitReaderFinished() throws InterruptedException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        while (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            this.reader.close();
            this.checkpointLock.wait();
        }
        if (this.readerContext != null) {
            this.readerContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.readerContext.close();
            this.readerContext = null;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        Preconditions.checkState(this.checkpointedState != null, "The operator state has not been properly initialized.");
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.checkpointedState.clear();
        List readerState = this.reader.getReaderState();
        try {
            Iterator it = readerState.iterator();
            while (it.hasNext()) {
                this.checkpointedState.add((TimestampedFileInputSplit) it.next());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), Integer.valueOf(readerState.size()), readerState);
            }
        } catch (Exception e) {
            this.checkpointedState.clear();
            throw new Exception("Could not add timestamped file input splits to to operator state backend of operator " + getOperatorName() + '.', e);
        }
    }

    static {
        $assertionsDisabled = !ContinuousFileReaderOperator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) ContinuousFileReaderOperator.class);
    }
}
