package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.util.ExceptionUtils;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.class */
public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
    private final TwoInputSelectionHandler inputSelectionHandler;
    private final StreamOneInputProcessor<IN1> processor1;
    private final StreamOneInputProcessor<IN2> processor2;
    private InputStatus firstInputStatus = InputStatus.MORE_AVAILABLE;
    private InputStatus secondInputStatus = InputStatus.MORE_AVAILABLE;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    public StreamTwoInputProcessor(TwoInputSelectionHandler twoInputSelectionHandler, StreamOneInputProcessor<IN1> streamOneInputProcessor, StreamOneInputProcessor<IN2> streamOneInputProcessor2) {
        this.inputSelectionHandler = twoInputSelectionHandler;
        this.processor1 = streamOneInputProcessor;
        this.processor2 = streamOneInputProcessor2;
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return isAnyInputAvailable();
        }
        return (this.inputSelectionHandler.isFirstInputSelected() ? this.processor1 : this.processor2).getAvailableFuture();
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public InputStatus processInput() throws Exception {
        int selectNextReadingInputIndex = this.isPrepared ? selectNextReadingInputIndex() : selectFirstReadingInputIndex();
        if (selectNextReadingInputIndex == -1) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.lastReadInputIndex = selectNextReadingInputIndex;
        if (selectNextReadingInputIndex == 0) {
            this.firstInputStatus = this.processor1.processInput();
        } else {
            this.secondInputStatus = this.processor2.processInput();
        }
        this.inputSelectionHandler.nextSelection();
        return getInputStatus();
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws IOException {
        return CompletableFuture.allOf(this.processor1.prepareSnapshot(channelStateWriter, j), this.processor2.prepareSnapshot(channelStateWriter, j));
    }

    private int selectFirstReadingInputIndex() throws IOException {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return selectNextReadingInputIndex();
    }

    private InputStatus getInputStatus() {
        if (this.firstInputStatus == InputStatus.END_OF_INPUT && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            return InputStatus.END_OF_INPUT;
        }
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return (this.firstInputStatus == InputStatus.MORE_AVAILABLE || this.secondInputStatus == InputStatus.MORE_AVAILABLE) ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;
        }
        InputStatus inputStatus = this.inputSelectionHandler.isFirstInputSelected() ? this.firstInputStatus : this.secondInputStatus;
        return inputStatus == InputStatus.END_OF_INPUT ? this.inputSelectionHandler.isFirstInputSelected() ? this.secondInputStatus : this.firstInputStatus : inputStatus;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        try {
            this.processor1.close();
        } catch (IOException e) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, null);
        }
        try {
            this.processor2.close();
        } catch (IOException e2) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e2, iOException);
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    private int selectNextReadingInputIndex() throws IOException {
        updateAvailability();
        checkInputSelectionAgainstIsFinished();
        int selectNextInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex);
        if (selectNextInputIndex == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            checkAndSetAvailable(1 - selectNextInputIndex);
        }
        return selectNextInputIndex;
    }

    private void checkInputSelectionAgainstIsFinished() throws IOException {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return;
        }
        if (this.inputSelectionHandler.isFirstInputSelected() && this.firstInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only first input is selected but it is already finished");
        }
        if (this.inputSelectionHandler.isSecondInputSelected() && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only second input is selected but it is already finished");
        }
    }

    private void updateAvailability() {
        updateAvailability(this.firstInputStatus, this.processor1, 0);
        updateAvailability(this.secondInputStatus, this.processor2, 1);
    }

    private void updateAvailability(InputStatus inputStatus, StreamOneInputProcessor<?> streamOneInputProcessor, int i) {
        if (inputStatus == InputStatus.MORE_AVAILABLE || (inputStatus != InputStatus.END_OF_INPUT && streamOneInputProcessor.isApproximatelyAvailable())) {
            this.inputSelectionHandler.setAvailableInput(i);
        } else {
            this.inputSelectionHandler.setUnavailableInput(i);
        }
    }

    private void checkAndSetAvailable(int i) {
        if ((i == 0 ? this.firstInputStatus : this.secondInputStatus) != InputStatus.END_OF_INPUT && getInput(i).isAvailable()) {
            this.inputSelectionHandler.setAvailableInput(i);
        }
    }

    private CompletableFuture<?> isAnyInputAvailable() {
        return this.firstInputStatus == InputStatus.END_OF_INPUT ? this.processor2.getAvailableFuture() : this.secondInputStatus == InputStatus.END_OF_INPUT ? this.processor1.getAvailableFuture() : AvailabilityProvider.or(this.processor1.getAvailableFuture(), this.processor2.getAvailableFuture());
    }

    private StreamOneInputProcessor<?> getInput(int i) {
        return i == 0 ? this.processor1 : this.processor2;
    }
}
