package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.class */
public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
    private final TwoInputSelectionHandler inputSelectionHandler;
    private final Object lock;
    private final StreamTaskInput<IN1> input1;
    private final StreamTaskInput<IN2> input2;
    private final OperatorChain<?, ?> operatorChain;
    private final PushingAsyncDataInput.DataOutput<IN1> output1;
    private final PushingAsyncDataInput.DataOutput<IN2> output2;
    private InputStatus firstInputStatus = InputStatus.MORE_AVAILABLE;
    private InputStatus secondInputStatus = InputStatus.MORE_AVAILABLE;
    private StreamStatus firstStatus = StreamStatus.ACTIVE;
    private StreamStatus secondStatus = StreamStatus.ACTIVE;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor$StreamTaskNetworkOutput.class */
    private class StreamTaskNetworkOutput<T> extends AbstractDataOutput<T> {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;

        private StreamTaskNetworkOutput(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, ThrowingConsumer<StreamRecord<T>, Exception> throwingConsumer, Object obj, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge watermarkGauge, int i) {
            super(streamStatusMaintainer, obj);
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.recordConsumer = (ThrowingConsumer) Preconditions.checkNotNull(throwingConsumer);
            this.inputWatermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.inputIndex = i;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            synchronized (this.lock) {
                this.recordConsumer.accept(streamRecord);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            synchronized (this.lock) {
                this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                if (this.inputIndex == 0) {
                    this.operator.processWatermark1(watermark);
                } else {
                    this.operator.processWatermark2(watermark);
                }
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.AbstractDataOutput, org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitStreamStatus(StreamStatus streamStatus) {
            StreamStatus streamStatus2;
            synchronized (this.lock) {
                if (this.inputIndex == 0) {
                    StreamTwoInputProcessor.this.firstStatus = streamStatus;
                    streamStatus2 = StreamTwoInputProcessor.this.secondStatus;
                } else {
                    StreamTwoInputProcessor.this.secondStatus = streamStatus;
                    streamStatus2 = StreamTwoInputProcessor.this.firstStatus;
                }
                if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                    if (streamStatus.isActive()) {
                        this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                    } else if (streamStatus2.isIdle()) {
                        this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                    }
                }
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            synchronized (this.lock) {
                if (this.inputIndex == 0) {
                    this.operator.processLatencyMarker1(latencyMarker);
                } else {
                    this.operator.processLatencyMarker2(latencyMarker);
                }
            }
        }
    }

    public StreamTwoInputProcessor(CheckpointedInputGate[] checkpointedInputGateArr, TypeSerializer<IN1> typeSerializer, TypeSerializer<IN2> typeSerializer2, Object obj, IOManager iOManager, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, TwoInputSelectionHandler twoInputSelectionHandler, WatermarkGauge watermarkGauge, WatermarkGauge watermarkGauge2, OperatorChain<?, ?> operatorChain, Counter counter) {
        this.lock = Preconditions.checkNotNull(obj);
        this.inputSelectionHandler = (TwoInputSelectionHandler) Preconditions.checkNotNull(twoInputSelectionHandler);
        this.output1 = new StreamTaskNetworkOutput(twoInputStreamOperator, streamRecord -> {
            processRecord1(streamRecord, twoInputStreamOperator, counter);
        }, obj, streamStatusMaintainer, watermarkGauge, 0);
        this.output2 = new StreamTaskNetworkOutput(twoInputStreamOperator, streamRecord2 -> {
            processRecord2(streamRecord2, twoInputStreamOperator, counter);
        }, obj, streamStatusMaintainer, watermarkGauge2, 1);
        this.input1 = new StreamTaskNetworkInput(checkpointedInputGateArr[0], (TypeSerializer<?>) typeSerializer, iOManager, new StatusWatermarkValve(checkpointedInputGateArr[0].getNumberOfInputChannels(), this.output1), 0);
        this.input2 = new StreamTaskNetworkInput(checkpointedInputGateArr[1], (TypeSerializer<?>) typeSerializer2, iOManager, new StatusWatermarkValve(checkpointedInputGateArr[1].getNumberOfInputChannels(), this.output2), 1);
        this.operatorChain = (OperatorChain) Preconditions.checkNotNull(operatorChain);
    }

    private void processRecord1(StreamRecord<IN1> streamRecord, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Counter counter) throws Exception {
        twoInputStreamOperator.setKeyContextElement1(streamRecord);
        twoInputStreamOperator.processElement1(streamRecord);
        postProcessRecord(counter);
    }

    private void processRecord2(StreamRecord<IN2> streamRecord, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Counter counter) throws Exception {
        twoInputStreamOperator.setKeyContextElement2(streamRecord);
        twoInputStreamOperator.processElement2(streamRecord);
        postProcessRecord(counter);
    }

    private void postProcessRecord(Counter counter) {
        counter.inc();
        this.inputSelectionHandler.nextSelection();
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return isAnyInputAvailable();
        }
        return (this.inputSelectionHandler.isFirstInputSelected() ? this.input1 : this.input2).getAvailableFuture();
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public InputStatus processInput() throws Exception {
        int selectFirstReadingInputIndex;
        if (this.isPrepared) {
            selectFirstReadingInputIndex = selectNextReadingInputIndex();
            if (!$assertionsDisabled && selectFirstReadingInputIndex == -1) {
                throw new AssertionError();
            }
        } else {
            selectFirstReadingInputIndex = selectFirstReadingInputIndex();
            if (selectFirstReadingInputIndex == -1) {
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
        this.lastReadInputIndex = selectFirstReadingInputIndex;
        if (selectFirstReadingInputIndex == 0) {
            this.firstInputStatus = this.input1.emitNext(this.output1);
            checkFinished(this.firstInputStatus, this.lastReadInputIndex);
        } else {
            this.secondInputStatus = this.input2.emitNext(this.output2);
            checkFinished(this.secondInputStatus, this.lastReadInputIndex);
        }
        return getInputStatus();
    }

    private int selectFirstReadingInputIndex() throws IOException {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return selectNextReadingInputIndex();
    }

    private void checkFinished(InputStatus inputStatus, int i) throws Exception {
        if (inputStatus == InputStatus.END_OF_INPUT) {
            synchronized (this.lock) {
                this.operatorChain.endHeadOperatorInput(getInputId(i));
                this.inputSelectionHandler.nextSelection();
            }
        }
    }

    private InputStatus getInputStatus() {
        if (this.firstInputStatus == InputStatus.END_OF_INPUT && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            return InputStatus.END_OF_INPUT;
        }
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return (this.firstInputStatus == InputStatus.MORE_AVAILABLE || this.secondInputStatus == InputStatus.MORE_AVAILABLE) ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;
        }
        InputStatus inputStatus = this.inputSelectionHandler.isFirstInputSelected() ? this.firstInputStatus : this.secondInputStatus;
        return inputStatus == InputStatus.END_OF_INPUT ? this.inputSelectionHandler.isFirstInputSelected() ? this.secondInputStatus : this.firstInputStatus : inputStatus;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        try {
            this.input1.close();
        } catch (IOException e) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
        }
        try {
            this.input2.close();
        } catch (IOException e2) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e2, iOException);
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    private int selectNextReadingInputIndex() throws IOException {
        updateAvailability();
        checkInputSelectionAgainstIsFinished();
        int selectNextInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex);
        if (selectNextInputIndex == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            checkAndSetAvailable(1 - selectNextInputIndex);
        }
        return selectNextInputIndex;
    }

    private void checkInputSelectionAgainstIsFinished() throws IOException {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return;
        }
        if (this.inputSelectionHandler.isFirstInputSelected() && this.firstInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only first input is selected but it is already finished");
        }
        if (this.inputSelectionHandler.isSecondInputSelected() && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only second input is selected but it is already finished");
        }
    }

    private void updateAvailability() {
        updateAvailability(this.firstInputStatus, this.input1);
        updateAvailability(this.secondInputStatus, this.input2);
    }

    private void updateAvailability(InputStatus inputStatus, StreamTaskInput streamTaskInput) {
        if (inputStatus == InputStatus.MORE_AVAILABLE || (inputStatus != InputStatus.END_OF_INPUT && streamTaskInput.isApproximatelyAvailable())) {
            this.inputSelectionHandler.setAvailableInput(streamTaskInput.getInputIndex());
        } else {
            this.inputSelectionHandler.setUnavailableInput(streamTaskInput.getInputIndex());
        }
    }

    private void checkAndSetAvailable(int i) {
        if ((i == 0 ? this.firstInputStatus : this.secondInputStatus) != InputStatus.END_OF_INPUT && getInput(i).isAvailable()) {
            this.inputSelectionHandler.setAvailableInput(i);
        }
    }

    private CompletableFuture<?> isAnyInputAvailable() {
        return this.firstInputStatus == InputStatus.END_OF_INPUT ? this.input2.getAvailableFuture() : this.secondInputStatus == InputStatus.END_OF_INPUT ? this.input1.getAvailableFuture() : (this.input1.isApproximatelyAvailable() || this.input2.isApproximatelyAvailable()) ? AVAILABLE : CompletableFuture.anyOf(this.input1.getAvailableFuture(), this.input2.getAvailableFuture());
    }

    private StreamTaskInput getInput(int i) {
        return i == 0 ? this.input1 : this.input2;
    }

    private int getInputId(int i) {
        return i + 1;
    }

    static {
        $assertionsDisabled = !StreamTwoInputProcessor.class.desiredAssertionStatus();
    }
}
