package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.class */
public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> {
    private final CheckpointedInputGate checkpointedInputGate;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private final StatusWatermarkValve statusWatermarkValve;
    private final int inputIndex;
    private final Map<InputChannelInfo, Integer> channelIndexes;
    private int lastChannel = -1;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer = null;

    public StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> typeSerializer, IOManager iOManager, StatusWatermarkValve statusWatermarkValve, int i) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
        for (int i2 = 0; i2 < this.recordDeserializers.length; i2++) {
            this.recordDeserializers[i2] = new SpillingAdaptiveSpanningRecordDeserializer(iOManager.getSpillingDirectoriesPaths());
        }
        this.statusWatermarkValve = (StatusWatermarkValve) Preconditions.checkNotNull(statusWatermarkValve);
        this.inputIndex = i;
        this.channelIndexes = getChannelIndexes(checkpointedInputGate);
    }

    @Nonnull
    private static Map<InputChannelInfo, Integer> getChannelIndexes(CheckpointedInputGate checkpointedInputGate) {
        int i = 0;
        List<InputChannelInfo> channelInfos = checkpointedInputGate.getChannelInfos();
        HashMap hashMap = new HashMap(channelInfos.size());
        Iterator<InputChannelInfo> it = channelInfos.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            hashMap.put(it.next(), Integer.valueOf(i2));
        }
        return hashMap;
    }

    @VisibleForTesting
    StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> typeSerializer, StatusWatermarkValve statusWatermarkValve, int i, RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializerArr) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(typeSerializer));
        this.recordDeserializers = recordDeserializerArr;
        this.statusWatermarkValve = statusWatermarkValve;
        this.inputIndex = i;
        this.channelIndexes = getChannelIndexes(checkpointedInputGate);
    }

    @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput
    public InputStatus emitNext(PushingAsyncDataInput.DataOutput<T> dataOutput) throws Exception {
        while (true) {
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    processElement(this.deserializationDelegate.getInstance(), dataOutput);
                    return InputStatus.MORE_AVAILABLE;
                }
            }
            Optional<BufferOrEvent> pollNext = this.checkpointedInputGate.pollNext();
            if (!pollNext.isPresent()) {
                if (!this.checkpointedInputGate.isFinished()) {
                    return InputStatus.NOTHING_AVAILABLE;
                }
                Preconditions.checkState(this.checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
                return InputStatus.END_OF_INPUT;
            }
            if (!pollNext.get().isBuffer()) {
                processEvent(pollNext.get());
                return InputStatus.MORE_AVAILABLE;
            }
            processBuffer(pollNext.get());
        }
    }

    private void processElement(StreamElement streamElement, PushingAsyncDataInput.DataOutput<T> dataOutput) throws Exception {
        if (streamElement.isRecord()) {
            dataOutput.emitRecord(streamElement.asRecord());
            return;
        }
        if (streamElement.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(streamElement.asWatermark(), this.lastChannel, dataOutput);
        } else if (streamElement.isLatencyMarker()) {
            dataOutput.emitLatencyMarker(streamElement.asLatencyMarker());
        } else {
            if (!streamElement.isStreamStatus()) {
                throw new UnsupportedOperationException("Unknown type of StreamElement");
            }
            this.statusWatermarkValve.inputStreamStatus(streamElement.asStreamStatus(), this.lastChannel, dataOutput);
        }
    }

    private void processEvent(BufferOrEvent bufferOrEvent) {
        if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
            releaseDeserializer(this.channelIndexes.get(bufferOrEvent.getChannelInfo()).intValue());
        }
    }

    private void processBuffer(BufferOrEvent bufferOrEvent) throws IOException {
        this.lastChannel = this.channelIndexes.get(bufferOrEvent.getChannelInfo()).intValue();
        Preconditions.checkState(this.lastChannel != -1);
        this.currentRecordDeserializer = this.recordDeserializers[this.lastChannel];
        Preconditions.checkState(this.currentRecordDeserializer != null, "currentRecordDeserializer has already been released");
        this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public int getInputIndex() {
        return this.inputIndex;
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return this.currentRecordDeserializer != null ? AVAILABLE : this.checkpointedInputGate.getAvailableFuture();
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws IOException {
        for (int i = 0; i < this.recordDeserializers.length; i++) {
            RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer = this.recordDeserializers[i];
            if (recordDeserializer != null) {
                channelStateWriter.addInputData(j, this.checkpointedInputGate.getChannel(i).getChannelInfo(), -2, recordDeserializer.getUnconsumedBuffer());
            }
        }
        return this.checkpointedInputGate.getAllBarriersReceivedFuture(j);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        for (int i = 0; i < this.recordDeserializers.length; i++) {
            releaseDeserializer(i);
        }
        this.checkpointedInputGate.close();
    }

    private void releaseDeserializer(int i) {
        RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer = this.recordDeserializers[i];
        if (recordDeserializer != null) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
            this.recordDeserializers[i] = null;
        }
    }
}
