package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain.class */
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) OperatorChain.class);
    private final StreamOperator<?>[] allOperators;
    private final RecordWriterOutput<?>[] streamOutputs;
    private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
    private final OP headOperator;
    private StreamStatus streamStatus = StreamStatus.ACTIVE;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$BroadcastingOutputCollector.class */
    public static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final Output<StreamRecord<T>>[] outputs;
        private final StreamStatusProvider streamStatusProvider;
        private final Random random = new XORShiftRandom();
        private final WatermarkGauge watermarkGauge = new WatermarkGauge();

        public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputArr, StreamStatusProvider streamStatusProvider) {
            this.outputs = outputArr;
            this.streamStatusProvider = streamStatusProvider;
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (this.streamStatusProvider.getStreamStatus().isActive()) {
                for (Output<StreamRecord<T>> output : this.outputs) {
                    output.emitWatermark(watermark);
                }
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            if (this.outputs.length <= 0) {
                return;
            }
            if (this.outputs.length == 1) {
                this.outputs[0].emitLatencyMarker(latencyMarker);
            } else {
                this.outputs[this.random.nextInt(this.outputs.length)].emitLatencyMarker(latencyMarker);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }

        @Override // org.apache.flink.util.Collector
        public void collect(StreamRecord<T> streamRecord) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(outputTag, streamRecord);
            }
        }

        @Override // org.apache.flink.util.Collector
        public void close() {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainingOutput.class */
    public static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final OneInputStreamOperator<T, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;

        @Nullable
        protected final OutputTag<T> outputTag;

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v13, types: [org.apache.flink.metrics.Counter] */
        public ChainingOutput(OneInputStreamOperator<T, ?> oneInputStreamOperator, StreamStatusProvider streamStatusProvider, @Nullable OutputTag<T> outputTag) {
            SimpleCounter simpleCounter;
            this.operator = oneInputStreamOperator;
            try {
                simpleCounter = ((OperatorMetricGroup) oneInputStreamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                OperatorChain.LOG.warn("An exception occurred during the metrics setup.", (Throwable) e);
                simpleCounter = new SimpleCounter();
            }
            this.numRecordsIn = simpleCounter;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = outputTag;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.util.Collector
        public void collect(StreamRecord<T> streamRecord) {
            if (this.outputTag != null) {
                return;
            }
            pushToOperator(streamRecord);
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            pushToOperator(streamRecord);
        }

        protected <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(streamRecord);
                this.operator.processElement(streamRecord);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitWatermark(Watermark watermark) {
            try {
                this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark(watermark);
                }
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.Output
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker(latencyMarker);
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.util.Collector
        public void close() {
            try {
                this.operator.close();
            } catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingBroadcastingOutputCollector.class */
    public static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
        public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputArr, StreamStatusProvider streamStatusProvider) {
            super(outputArr, streamStatusProvider);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector, org.apache.flink.util.Collector
        public void collect(StreamRecord<T> streamRecord) {
            for (int i = 0; i < this.outputs.length - 1; i++) {
                this.outputs[i].collect(streamRecord.copy(streamRecord.getValue()));
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector, org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            for (int i = 0; i < this.outputs.length - 1; i++) {
                this.outputs[i].collect(outputTag, streamRecord.copy(streamRecord.getValue()));
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(outputTag, streamRecord);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$CopyingChainingOutput.class */
    public static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingOutput(OneInputStreamOperator<T, ?> oneInputStreamOperator, TypeSerializer<T> typeSerializer, OutputTag<T> outputTag, StreamStatusProvider streamStatusProvider) {
            super(oneInputStreamOperator, streamStatusProvider, outputTag);
            this.serializer = typeSerializer;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput, org.apache.flink.util.Collector
        public void collect(StreamRecord<T> streamRecord) {
            if (this.outputTag != null) {
                return;
            }
            pushToOperator(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput, org.apache.flink.streaming.api.operators.Output
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            pushToOperator(streamRecord);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput
        protected <X> void pushToOperator(StreamRecord<X> streamRecord) {
            try {
                this.numRecordsIn.inc();
                StreamRecord<?> copy = streamRecord.copy(this.serializer.copy(streamRecord.getValue()));
                this.operator.setKeyContextElement1(copy);
                this.operator.processElement(copy);
            } catch (ClassCastException e) {
                if (this.outputTag == null) {
                    throw new ExceptionInChainedOperatorException(e);
                }
                throw new ExceptionInChainedOperatorException(new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId())));
            } catch (Exception e2) {
                throw new ExceptionInChainedOperatorException(e2);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$WatermarkGaugeExposingOutput.class */
    public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
        Gauge<Long> getWatermarkGauge();
    }

    public OperatorChain(StreamTask<OUT, OP> streamTask, List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> list) {
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        StreamConfig configuration = streamTask.getConfiguration();
        this.headOperator = (OP) configuration.getStreamOperator(userCodeClassLoader);
        Map<Integer, StreamConfig> transitiveChainedTaskConfigsWithSelf = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassLoader);
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassLoader);
        Map<StreamEdge, RecordWriterOutput<?>> hashMap = new HashMap<>(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outEdgesInOrder.size()];
        boolean z = false;
        for (int i = 0; i < outEdgesInOrder.size(); i++) {
            try {
                StreamEdge streamEdge = outEdgesInOrder.get(i);
                RecordWriterOutput<?> createStreamOutput = createStreamOutput(list.get(i), streamEdge, transitiveChainedTaskConfigsWithSelf.get(Integer.valueOf(streamEdge.getSourceId())), streamTask.getEnvironment());
                this.streamOutputs[i] = createStreamOutput;
                hashMap.put(streamEdge, createStreamOutput);
            } catch (Throwable th) {
                if (!z) {
                    for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                        if (recordWriterOutput != null) {
                            recordWriterOutput.close();
                        }
                    }
                }
                throw th;
            }
        }
        List<StreamOperator<?>> arrayList = new ArrayList<>(transitiveChainedTaskConfigsWithSelf.size());
        this.chainEntryPoint = createOutputCollector(streamTask, configuration, transitiveChainedTaskConfigsWithSelf, userCodeClassLoader, hashMap, arrayList);
        if (this.headOperator != null) {
            WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint = getChainEntryPoint();
            this.headOperator.setup(streamTask, configuration, chainEntryPoint);
            this.headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, (String) chainEntryPoint.getWatermarkGauge());
        }
        arrayList.add(this.headOperator);
        this.allOperators = (StreamOperator[]) arrayList.toArray(new StreamOperator[arrayList.size()]);
        z = true;
        if (1 == 0) {
            for (RecordWriterOutput<?> recordWriterOutput2 : this.streamOutputs) {
                if (recordWriterOutput2 != null) {
                    recordWriterOutput2.close();
                }
            }
        }
    }

    @VisibleForTesting
    OperatorChain(StreamOperator<?>[] streamOperatorArr, RecordWriterOutput<?>[] recordWriterOutputArr, WatermarkGaugeExposingOutput<StreamRecord<OUT>> watermarkGaugeExposingOutput, OP op) {
        this.allOperators = (StreamOperator[]) Preconditions.checkNotNull(streamOperatorArr);
        this.streamOutputs = (RecordWriterOutput[]) Preconditions.checkNotNull(recordWriterOutputArr);
        this.chainEntryPoint = (WatermarkGaugeExposingOutput) Preconditions.checkNotNull(watermarkGaugeExposingOutput);
        this.headOperator = (OP) Preconditions.checkNotNull(op);
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider
    public StreamStatus getStreamStatus() {
        return this.streamStatus;
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer
    public void toggleStreamStatus(StreamStatus streamStatus) {
        if (streamStatus.equals(this.streamStatus)) {
            return;
        }
        this.streamStatus = streamStatus;
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.emitStreamStatus(streamStatus);
        }
    }

    public void broadcastCheckpointBarrier(long j, long j2, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(j, j2, checkpointOptions);
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.broadcastEvent(checkpointBarrier);
        }
    }

    public void broadcastCheckpointCancelMarker(long j) throws IOException {
        CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(j);
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.broadcastEvent(cancelCheckpointMarker);
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        StreamOperator<?>[] streamOperatorArr = this.allOperators;
        for (int length = streamOperatorArr.length - 1; length >= 0; length--) {
            StreamOperator<?> streamOperator = streamOperatorArr[length];
            if (streamOperator != null) {
                streamOperator.prepareSnapshotPreBarrier(j);
            }
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public StreamOperator<?>[] getAllOperators() {
        return this.allOperators;
    }

    public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getChainEntryPoint() {
        return this.chainEntryPoint;
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : getStreamOutputs()) {
            recordWriterOutput.flush();
        }
    }

    public void releaseOutputs() {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.close();
        }
    }

    public OP getHeadOperator() {
        return this.headOperator;
    }

    public int getChainLength() {
        if (this.allOperators == null) {
            return 0;
        }
        return this.allOperators.length;
    }

    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, List<StreamOperator<?>> list) {
        ArrayList arrayList = new ArrayList(4);
        for (StreamEdge streamEdge : streamConfig.getNonChainedOutputs(classLoader)) {
            arrayList.add(new Tuple2(map2.get(streamEdge), streamEdge));
        }
        for (StreamEdge streamEdge2 : streamConfig.getChainedOutputs(classLoader)) {
            arrayList.add(new Tuple2(createChainedOperator(streamTask, map.get(Integer.valueOf(streamEdge2.getTargetId())), map, classLoader, map2, list, streamEdge2.getOutputTag()), streamEdge2));
        }
        List<OutputSelector<T>> outputSelectors = streamConfig.getOutputSelectors(classLoader);
        if (outputSelectors != null && !outputSelectors.isEmpty()) {
            return streamTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingDirectedOutput(outputSelectors, arrayList) : new DirectedOutput(outputSelectors, arrayList);
        }
        if (arrayList.size() == 1) {
            return (WatermarkGaugeExposingOutput) ((Tuple2) arrayList.get(0)).f0;
        }
        Output[] outputArr = new Output[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            outputArr[i] = (Output) ((Tuple2) arrayList.get(i)).f0;
        }
        return streamTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingBroadcastingOutputCollector(outputArr, this) : new BroadcastingOutputCollector(outputArr, this);
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, List<StreamOperator<?>> list, OutputTag<IN> outputTag) {
        WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector = createOutputCollector(streamTask, streamConfig, map, classLoader, map2, list);
        OneInputStreamOperator oneInputStreamOperator = (OneInputStreamOperator) streamConfig.getStreamOperator(classLoader);
        oneInputStreamOperator.setup(streamTask, streamConfig, createOutputCollector);
        list.add(oneInputStreamOperator);
        WatermarkGaugeExposingOutput chainingOutput = streamTask.getExecutionConfig().isObjectReuseEnabled() ? new ChainingOutput(oneInputStreamOperator, this, outputTag) : new CopyingChainingOutput(oneInputStreamOperator, streamConfig.getTypeSerializerIn1(classLoader), outputTag, this);
        MetricGroup metricGroup = oneInputStreamOperator.getMetricGroup();
        Gauge<Long> watermarkGauge = chainingOutput.getWatermarkGauge();
        watermarkGauge.getClass();
        metricGroup.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, (String) watermarkGauge::mo2889getValue);
        MetricGroup metricGroup2 = oneInputStreamOperator.getMetricGroup();
        Gauge<Long> watermarkGauge2 = createOutputCollector.getWatermarkGauge();
        watermarkGauge2.getClass();
        metricGroup2.gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, (String) watermarkGauge2::mo2889getValue);
        return chainingOutput;
    }

    private RecordWriterOutput<OUT> createStreamOutput(StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter, StreamEdge streamEdge, StreamConfig streamConfig, Environment environment) {
        return new RecordWriterOutput<>(streamRecordWriter, streamEdge.getOutputTag() != null ? streamConfig.getTypeSerializerSideOut(streamEdge.getOutputTag(), environment.getUserClassLoader()) : streamConfig.getTypeSerializerOut(environment.getUserClassLoader()), streamEdge.getOutputTag(), this);
    }
}
