package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.class */
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
    private final WatermarkGauge inputWatermarkGauge;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTask$StreamTaskNetworkOutput.class */
    public static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {
        private final OneInputStreamOperator<IN, ?> operator;
        private final WatermarkGauge watermarkGauge;
        private final Counter numRecordsIn;

        private StreamTaskNetworkOutput(OneInputStreamOperator<IN, ?> oneInputStreamOperator, StreamStatusMaintainer streamStatusMaintainer, Object obj, WatermarkGauge watermarkGauge, Counter counter) {
            super(streamStatusMaintainer, obj);
            this.operator = (OneInputStreamOperator) Preconditions.checkNotNull(oneInputStreamOperator);
            this.watermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.numRecordsIn = (Counter) Preconditions.checkNotNull(counter);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<IN> streamRecord) throws Exception {
            synchronized (this.lock) {
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(streamRecord);
                this.operator.processElement(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            synchronized (this.lock) {
                this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                this.operator.processWatermark(watermark);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            synchronized (this.lock) {
                this.operator.processLatencyMarker(latencyMarker);
            }
        }
    }

    public OneInputStreamTask(Environment environment) {
        super(environment);
        this.inputWatermarkGauge = new WatermarkGauge();
    }

    @VisibleForTesting
    public OneInputStreamTask(Environment environment, @Nullable TimerService timerService) {
        super(environment, timerService);
        this.inputWatermarkGauge = new WatermarkGauge();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        if (getConfiguration().getNumberOfInputs() > 0) {
            CheckpointedInputGate createCheckpointedInputGate = createCheckpointedInputGate();
            TaskIOMetricGroup iOMetricGroup = getEnvironment().getMetricGroup().getIOMetricGroup();
            createCheckpointedInputGate.getClass();
            iOMetricGroup.gauge("checkpointAlignmentTime", createCheckpointedInputGate::getAlignmentDurationNanos);
            PushingAsyncDataInput.DataOutput<IN> createDataOutput = createDataOutput();
            this.inputProcessor = new StreamOneInputProcessor(createTaskInput(createCheckpointedInputGate, createDataOutput), createDataOutput, getCheckpointLock(), this.operatorChain);
        }
        ((OneInputStreamOperator) this.headOperator).getMetricGroup().gauge("currentInputWatermark", this.inputWatermarkGauge);
        TaskMetricGroup metricGroup = getEnvironment().getMetricGroup();
        WatermarkGauge watermarkGauge = this.inputWatermarkGauge;
        watermarkGauge.getClass();
        metricGroup.gauge("currentInputWatermark", watermarkGauge::m89getValue);
    }

    private CheckpointedInputGate createCheckpointedInputGate() throws IOException {
        return InputProcessorUtil.createCheckpointedInputGate(this, this.configuration.getCheckpointMode(), getEnvironment().getIOManager(), InputGateUtil.createInputGate(getEnvironment().getAllInputGates()), getEnvironment().getTaskManagerInfo().getConfiguration(), getTaskNameWithSubtaskAndId());
    }

    private PushingAsyncDataInput.DataOutput<IN> createDataOutput() {
        return new StreamTaskNetworkOutput((OneInputStreamOperator) this.headOperator, getStreamStatusMaintainer(), getCheckpointLock(), this.inputWatermarkGauge, setupNumRecordsInCounter(this.headOperator));
    }

    private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate checkpointedInputGate, PushingAsyncDataInput.DataOutput<IN> dataOutput) {
        return new StreamTaskNetworkInput(checkpointedInputGate, (TypeSerializer<?>) this.configuration.getTypeSerializerIn1(getUserCodeClassLoader()), getEnvironment().getIOManager(), new StatusWatermarkValve(checkpointedInputGate.getNumberOfInputChannels(), dataOutput), 0);
    }
}
