package org.apache.flink.streaming.runtime.operators;

import java.util.Set;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.jobgraph.OperationKindTag;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.class */
public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private final WatermarkStrategy<T> watermarkStrategy;
    private transient TimestampAssigner<T> timestampAssigner;
    private transient WatermarkGenerator<T> watermarkGenerator;
    private transient WatermarkOutput wmOutput;
    private transient long watermarkInterval;
    private final boolean emitProgressiveWatermarks;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator$WatermarkEmitter.class */
    public static final class WatermarkEmitter implements WatermarkOutput {
        private final Output<?> output;
        private final StreamStatusMaintainer statusMaintainer;
        private long currentWatermark = Long.MIN_VALUE;
        private boolean idle;

        public WatermarkEmitter(Output<?> output, StreamStatusMaintainer streamStatusMaintainer) {
            this.output = output;
            this.statusMaintainer = streamStatusMaintainer;
        }

        public void emitWatermark(Watermark watermark) {
            long timestamp = watermark.getTimestamp();
            if (timestamp <= this.currentWatermark) {
                return;
            }
            this.currentWatermark = timestamp;
            if (this.idle) {
                this.idle = false;
                this.statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            }
            this.output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(timestamp));
        }

        public void markIdle() {
            this.idle = true;
            this.statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
        }
    }

    public TimestampsAndWatermarksOperator(WatermarkStrategy<T> watermarkStrategy, boolean z) {
        this.watermarkStrategy = (WatermarkStrategy) Preconditions.checkNotNull(watermarkStrategy);
        this.emitProgressiveWatermarks = z;
        this.chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.timestampAssigner = this.watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
        this.watermarkGenerator = this.emitProgressiveWatermarks ? this.watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) : new NoWatermarksGenerator<>();
        this.wmOutput = new WatermarkEmitter(this.output, getContainingTask().getStreamStatusMaintainer());
        this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        if (this.watermarkInterval <= 0 || !this.emitProgressiveWatermarks) {
            return;
        }
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        T value = streamRecord.getValue();
        long extractTimestamp = this.timestampAssigner.extractTimestamp(value, streamRecord.hasTimestamp() ? streamRecord.getTimestamp() : Long.MIN_VALUE);
        streamRecord.setTimestamp(extractTimestamp);
        this.output.collect(streamRecord);
        this.watermarkGenerator.onEvent(value, extractTimestamp, this.wmOutput);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        this.watermarkGenerator.onPeriodicEmit(this.wmOutput);
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark) throws Exception {
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            this.wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.watermarkGenerator.onPeriodicEmit(this.wmOutput);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public Set<OperationKindTag> getOperationKindTags() {
        return OperationKindTag.asSet(new OperationKindTag[]{OperationKindTag.TIMESTAMP_WATERMARKS, OperationKindTag.UDF, OperationKindTag.ONE_INPUT});
    }
}
