package org.apache.flink.streaming.runtime.io;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.class */
public class StreamTwoInputProcessorFactory {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory$StreamStatusTracker.class */
    public static class StreamStatusTracker {
        private StreamStatus firstStatus;
        private StreamStatus secondStatus;

        private StreamStatusTracker() {
            this.firstStatus = StreamStatus.ACTIVE;
            this.secondStatus = StreamStatus.ACTIVE;
        }

        public StreamStatus getFirstStatus() {
            return this.firstStatus;
        }

        public void setFirstStatus(StreamStatus streamStatus) {
            this.firstStatus = streamStatus;
        }

        public StreamStatus getSecondStatus() {
            return this.secondStatus;
        }

        public void setSecondStatus(StreamStatus streamStatus) {
            this.secondStatus = streamStatus;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.class */
    private static class StreamTaskNetworkOutput<T> extends AbstractDataOutput<T> {
        private final TwoInputStreamOperator<?, ?, ?> operator;
        private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;
        private final Counter numRecordsIn;
        private final StreamStatusTracker statusTracker;

        private StreamTaskNetworkOutput(TwoInputStreamOperator<?, ?, ?> twoInputStreamOperator, ThrowingConsumer<StreamRecord<T>, Exception> throwingConsumer, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge watermarkGauge, StreamStatusTracker streamStatusTracker, int i, Counter counter) {
            super(streamStatusMaintainer);
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.recordConsumer = (ThrowingConsumer) Preconditions.checkNotNull(throwingConsumer);
            this.inputWatermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.statusTracker = streamStatusTracker;
            this.inputIndex = i;
            this.numRecordsIn = counter;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            this.numRecordsIn.inc();
            this.recordConsumer.accept(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (this.inputIndex == 0) {
                this.operator.processWatermark1(watermark);
            } else {
                this.operator.processWatermark2(watermark);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.AbstractDataOutput, org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitStreamStatus(StreamStatus streamStatus) {
            StreamStatus firstStatus;
            if (this.inputIndex == 0) {
                this.statusTracker.setFirstStatus(streamStatus);
                firstStatus = this.statusTracker.getSecondStatus();
            } else {
                this.statusTracker.setSecondStatus(streamStatus);
                firstStatus = this.statusTracker.getFirstStatus();
            }
            if (streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                return;
            }
            if (streamStatus.isActive()) {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            } else if (firstStatus.isIdle()) {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            if (this.inputIndex == 0) {
                this.operator.processLatencyMarker1(latencyMarker);
            } else {
                this.operator.processLatencyMarker2(latencyMarker);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v29, types: [org.apache.flink.streaming.runtime.io.StreamTaskInput] */
    /* JADX WARN: Type inference failed for: r0v33, types: [org.apache.flink.streaming.runtime.io.StreamTaskInput] */
    public static <IN1, IN2> StreamTwoInputProcessor<IN1, IN2> create(AbstractInvokable abstractInvokable, CheckpointedInputGate[] checkpointedInputGateArr, IOManager iOManager, MemoryManager memoryManager, TaskIOMetricGroup taskIOMetricGroup, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, WatermarkGauge watermarkGauge, WatermarkGauge watermarkGauge2, BoundedMultiInput boundedMultiInput, StreamConfig streamConfig, Configuration configuration, Configuration configuration2, ExecutionConfig executionConfig, ClassLoader classLoader, Counter counter) {
        Preconditions.checkNotNull(boundedMultiInput);
        StreamStatusTracker streamStatusTracker = new StreamStatusTracker();
        taskIOMetricGroup.reuseRecordsInputCounter(counter);
        TypeSerializer typeSerializerIn = streamConfig.getTypeSerializerIn(0, classLoader);
        StreamTaskNetworkInput streamTaskNetworkInput = new StreamTaskNetworkInput(checkpointedInputGateArr[0], (TypeSerializer<?>) typeSerializerIn, iOManager, new StatusWatermarkValve(checkpointedInputGateArr[0].getNumberOfInputChannels()), 0);
        TypeSerializer typeSerializerIn2 = streamConfig.getTypeSerializerIn(1, classLoader);
        StreamTaskNetworkInput streamTaskNetworkInput2 = new StreamTaskNetworkInput(checkpointedInputGateArr[1], (TypeSerializer<?>) typeSerializerIn2, iOManager, new StatusWatermarkValve(checkpointedInputGateArr[1].getNumberOfInputChannels()), 1);
        InputSelectable inputSelectable = twoInputStreamOperator instanceof InputSelectable ? (InputSelectable) twoInputStreamOperator : null;
        if (streamConfig.shouldSortInputs()) {
            if (inputSelectable != null) {
                throw new IllegalStateException("The InputSelectable interface is not supported with sorting inputs");
            }
            MultiInputSortingDataInput.SelectableSortingInputs wrapInputs = MultiInputSortingDataInput.wrapInputs(abstractInvokable, new StreamTaskInput[]{streamTaskNetworkInput, streamTaskNetworkInput2}, new KeySelector[]{streamConfig.getStatePartitioner(0, classLoader), streamConfig.getStatePartitioner(1, classLoader)}, new TypeSerializer[]{typeSerializerIn, typeSerializerIn2}, streamConfig.getStateKeySerializer(classLoader), memoryManager, iOManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.BATCH_OP, configuration, classLoader), configuration2);
            inputSelectable = wrapInputs.getInputSelectable();
            streamTaskNetworkInput = getSortedInput(wrapInputs.getSortingInputs()[0]);
            streamTaskNetworkInput2 = getSortedInput(wrapInputs.getSortingInputs()[1]);
        }
        return new StreamTwoInputProcessor<>(new TwoInputSelectionHandler(inputSelectable), new StreamOneInputProcessor(streamTaskNetworkInput, new StreamTaskNetworkOutput(twoInputStreamOperator, streamRecord -> {
            processRecord1(streamRecord, twoInputStreamOperator);
        }, streamStatusMaintainer, watermarkGauge, streamStatusTracker, 0, counter), boundedMultiInput), new StreamOneInputProcessor(streamTaskNetworkInput2, new StreamTaskNetworkOutput(twoInputStreamOperator, streamRecord2 -> {
            processRecord2(streamRecord2, twoInputStreamOperator);
        }, streamStatusMaintainer, watermarkGauge2, streamStatusTracker, 1, counter), boundedMultiInput));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <IN1> StreamTaskInput<IN1> getSortedInput(StreamTaskInput<?> streamTaskInput) {
        return streamTaskInput;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void processRecord1(StreamRecord<T> streamRecord, TwoInputStreamOperator<T, ?, ?> twoInputStreamOperator) throws Exception {
        twoInputStreamOperator.setKeyContextElement1(streamRecord);
        twoInputStreamOperator.processElement1(streamRecord);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void processRecord2(StreamRecord<T> streamRecord, TwoInputStreamOperator<?, T, ?> twoInputStreamOperator) throws Exception {
        twoInputStreamOperator.setKeyContextElement2(streamRecord);
        twoInputStreamOperator.processElement2(streamRecord);
    }
}
