package org.apache.flink.streaming.api.operators.source;

import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.class */
public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
    private final TimestampAssigner<T> timestampAssigner;
    private final WatermarkGeneratorSupplier<T> watermarksFactory;
    private final WatermarkGeneratorSupplier.Context watermarksContext;
    private final ProcessingTimeService timeService;
    private final long periodicWatermarkInterval;

    @Nullable
    private SplitLocalOutputs<T> currentPerSplitOutputs;

    @Nullable
    private StreamingReaderOutput<T> currentMainOutput;

    @Nullable
    private ScheduledFuture<?> periodicEmitHandle;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$IdlenessManager.class */
    public static class IdlenessManager {
        private final WatermarkOutput underlyingOutput;
        private final IdlenessAwareWatermarkOutput splitLocalOutput;
        private final IdlenessAwareWatermarkOutput mainOutput;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$IdlenessManager$IdlenessAwareWatermarkOutput.class */
        public class IdlenessAwareWatermarkOutput implements WatermarkOutput {
            private final WatermarkOutput underlyingOutput;
            private boolean isIdle;

            private IdlenessAwareWatermarkOutput(WatermarkOutput watermarkOutput) {
                this.isIdle = true;
                this.underlyingOutput = watermarkOutput;
            }

            public void emitWatermark(Watermark watermark) {
                this.underlyingOutput.emitWatermark(watermark);
                this.isIdle = false;
            }

            public void markIdle() {
                this.isIdle = true;
                IdlenessManager.this.maybeMarkUnderlyingOutputAsIdle();
            }

            public void markActive() {
                this.isIdle = false;
                this.underlyingOutput.markActive();
            }
        }

        IdlenessManager(WatermarkOutput watermarkOutput) {
            this.underlyingOutput = watermarkOutput;
            this.splitLocalOutput = new IdlenessAwareWatermarkOutput(watermarkOutput);
            this.mainOutput = new IdlenessAwareWatermarkOutput(watermarkOutput);
        }

        IdlenessAwareWatermarkOutput getSplitLocalOutput() {
            return this.splitLocalOutput;
        }

        IdlenessAwareWatermarkOutput getMainOutput() {
            return this.mainOutput;
        }

        void maybeMarkUnderlyingOutputAsIdle() {
            if (this.splitLocalOutput.isIdle && this.mainOutput.isIdle) {
                this.underlyingOutput.markIdle();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$SplitLocalOutputs.class */
    private static final class SplitLocalOutputs<T> {
        private final WatermarkOutputMultiplexer watermarkMultiplexer;
        private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
        private final PushingAsyncDataInput.DataOutput<T> recordOutput;
        private final TimestampAssigner<T> timestampAssigner;
        private final WatermarkGeneratorSupplier<T> watermarksFactory;
        private final WatermarkGeneratorSupplier.Context watermarkContext;
        private final TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener;

        private SplitLocalOutputs(PushingAsyncDataInput.DataOutput<T> dataOutput, WatermarkOutput watermarkOutput, TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener, TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarkGeneratorSupplier, WatermarkGeneratorSupplier.Context context) {
            this.recordOutput = dataOutput;
            this.timestampAssigner = timestampAssigner;
            this.watermarksFactory = watermarkGeneratorSupplier;
            this.watermarkContext = context;
            this.watermarkUpdateListener = watermarkUpdateListener;
            this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
            this.localOutputs = new LinkedHashMap();
        }

        SourceOutput<T> createOutputForSplit(String str) {
            SourceOutputWithWatermarks<T> sourceOutputWithWatermarks = this.localOutputs.get(str);
            if (sourceOutputWithWatermarks != null) {
                return sourceOutputWithWatermarks;
            }
            this.watermarkMultiplexer.registerNewOutput(str, j -> {
                this.watermarkUpdateListener.updateCurrentSplitWatermark(str, j);
            });
            SourceOutputWithWatermarks<T> createWithSeparateOutputs = SourceOutputWithWatermarks.createWithSeparateOutputs(this.recordOutput, this.watermarkMultiplexer.getImmediateOutput(str), this.watermarkMultiplexer.getDeferredOutput(str), this.timestampAssigner, this.watermarksFactory.createWatermarkGenerator(this.watermarkContext));
            this.localOutputs.put(str, createWithSeparateOutputs);
            return createWithSeparateOutputs;
        }

        void releaseOutputForSplit(String str) {
            this.localOutputs.remove(str);
            this.watermarkMultiplexer.unregisterOutput(str);
        }

        void emitPeriodicWatermark() {
            Iterator<SourceOutputWithWatermarks<T>> it = this.localOutputs.values().iterator();
            while (it.hasNext()) {
                it.next().emitPeriodicWatermark();
            }
            this.watermarkMultiplexer.onPeriodicEmit();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks$StreamingReaderOutput.class */
    private static final class StreamingReaderOutput<T> extends SourceOutputWithWatermarks<T> implements ReaderOutput<T> {
        private final SplitLocalOutputs<T> splitLocalOutputs;

        StreamingReaderOutput(PushingAsyncDataInput.DataOutput<T> dataOutput, WatermarkOutput watermarkOutput, TimestampAssigner<T> timestampAssigner, WatermarkGenerator<T> watermarkGenerator, SplitLocalOutputs<T> splitLocalOutputs) {
            super(dataOutput, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
            this.splitLocalOutputs = splitLocalOutputs;
        }

        public SourceOutput<T> createOutputForSplit(String str) {
            return this.splitLocalOutputs.createOutputForSplit(str);
        }

        public void releaseOutputForSplit(String str) {
            this.splitLocalOutputs.releaseOutputForSplit(str);
        }
    }

    public ProgressiveTimestampsAndWatermarks(TimestampAssigner<T> timestampAssigner, WatermarkGeneratorSupplier<T> watermarkGeneratorSupplier, WatermarkGeneratorSupplier.Context context, ProcessingTimeService processingTimeService, Duration duration) {
        long j;
        this.timestampAssigner = timestampAssigner;
        this.watermarksFactory = watermarkGeneratorSupplier;
        this.watermarksContext = context;
        this.timeService = processingTimeService;
        try {
            j = duration.toMillis();
        } catch (ArithmeticException e) {
            j = Long.MAX_VALUE;
        }
        this.periodicWatermarkInterval = j;
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> dataOutput, TimestampsAndWatermarks.WatermarkUpdateListener watermarkUpdateListener) {
        Preconditions.checkState(this.currentMainOutput == null && this.currentPerSplitOutputs == null, "already created a main output");
        IdlenessManager idlenessManager = new IdlenessManager(new WatermarkToDataOutput(dataOutput, watermarkUpdateListener));
        WatermarkGenerator createWatermarkGenerator = this.watermarksFactory.createWatermarkGenerator(this.watermarksContext);
        this.currentPerSplitOutputs = new SplitLocalOutputs<>(dataOutput, idlenessManager.getSplitLocalOutput(), watermarkUpdateListener, this.timestampAssigner, this.watermarksFactory, this.watermarksContext);
        this.currentMainOutput = new StreamingReaderOutput<>(dataOutput, idlenessManager.getMainOutput(), this.timestampAssigner, createWatermarkGenerator, this.currentPerSplitOutputs);
        return this.currentMainOutput;
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void startPeriodicWatermarkEmits() {
        Preconditions.checkState(this.periodicEmitHandle == null, "periodic emitter already started");
        if (this.periodicWatermarkInterval == 0) {
            return;
        }
        this.periodicEmitHandle = this.timeService.scheduleWithFixedDelay(this::triggerPeriodicEmit, this.periodicWatermarkInterval, this.periodicWatermarkInterval);
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void stopPeriodicWatermarkEmits() {
        if (this.periodicEmitHandle != null) {
            this.periodicEmitHandle.cancel(false);
            this.periodicEmitHandle = null;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks
    public void triggerPeriodicEmit(long j) {
        if (this.currentPerSplitOutputs != null) {
            this.currentPerSplitOutputs.emitPeriodicWatermark();
        }
        if (this.currentMainOutput != null) {
            this.currentMainOutput.emitPeriodicWatermark();
        }
    }
}
