package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/RecordWriterOutput.class */
public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
    private RecordWriter<SerializationDelegate<StreamElement>> recordWriter;
    private SerializationDelegate<StreamElement> serializationDelegate;
    private final StreamStatusProvider streamStatusProvider;
    private final OutputTag outputTag;
    private final WatermarkGauge watermarkGauge = new WatermarkGauge();

    public RecordWriterOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, TypeSerializer<OUT> typeSerializer, OutputTag outputTag, StreamStatusProvider streamStatusProvider) {
        Preconditions.checkNotNull(recordWriter);
        this.outputTag = outputTag;
        this.recordWriter = recordWriter;
        StreamElementSerializer streamElementSerializer = new StreamElementSerializer(typeSerializer);
        if (typeSerializer != null) {
            this.serializationDelegate = new SerializationDelegate<>(streamElementSerializer);
        }
        this.streamStatusProvider = (StreamStatusProvider) Preconditions.checkNotNull(streamStatusProvider);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.util.Collector
    public void collect(StreamRecord<OUT> streamRecord) {
        if (this.outputTag != null) {
            return;
        }
        pushToRecordWriter(streamRecord);
    }

    @Override // org.apache.flink.streaming.api.operators.Output
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
        if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
            pushToRecordWriter(streamRecord);
        }
    }

    private <X> void pushToRecordWriter(StreamRecord<X> streamRecord) {
        this.serializationDelegate.setInstance(streamRecord);
        try {
            this.recordWriter.emit(this.serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Output
    public void emitWatermark(Watermark watermark) {
        this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
        this.serializationDelegate.setInstance(watermark);
        if (this.streamStatusProvider.getStreamStatus().isActive()) {
            try {
                this.recordWriter.broadcastEmit(this.serializationDelegate);
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }

    public void emitStreamStatus(StreamStatus streamStatus) {
        this.serializationDelegate.setInstance(streamStatus);
        try {
            this.recordWriter.broadcastEmit(this.serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Output
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        this.serializationDelegate.setInstance(latencyMarker);
        try {
            this.recordWriter.randomEmit(this.serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        this.recordWriter.broadcastEvent(abstractEvent, z);
    }

    public void flush() throws IOException {
        this.recordWriter.flushAll();
    }

    @Override // org.apache.flink.util.Collector
    public void close() {
        this.recordWriter.close();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput
    public Gauge<Long> getWatermarkGauge() {
        return this.watermarkGauge;
    }
}
