package org.apache.flink.runtime.io.network.api.reader;

import java.io.IOException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/reader/AbstractReader.class */
public abstract class AbstractReader implements ReaderBase {
    protected final InputGate inputGate;
    private final TaskEventHandler taskEventHandler = new TaskEventHandler();
    private boolean isIterative;
    private int currentNumberOfEndOfSuperstepEvents;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractReader(InputGate inputGate) {
        this.inputGate = inputGate;
    }

    @Override // org.apache.flink.runtime.io.network.api.reader.ReaderBase
    public boolean isFinished() {
        return this.inputGate.isFinished();
    }

    @Override // org.apache.flink.runtime.io.network.api.reader.ReaderBase
    public void registerTaskEventListener(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> cls) {
        this.taskEventHandler.subscribe(eventListener, cls);
    }

    @Override // org.apache.flink.runtime.io.network.api.reader.ReaderBase
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        this.inputGate.sendTaskEvent(taskEvent);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean handleEvent(AbstractEvent abstractEvent) throws IOException {
        Class<?> cls = abstractEvent.getClass();
        if (cls == EndOfPartitionEvent.class) {
            return true;
        }
        try {
            if (cls == EndOfSuperstepEvent.class) {
                return incrementEndOfSuperstepEventAndCheck();
            }
            if (!(abstractEvent instanceof TaskEvent)) {
                throw new IllegalStateException("Received unexpected event of type " + cls + " at reader.");
            }
            this.taskEventHandler.publish((TaskEvent) abstractEvent);
            return false;
        } catch (Throwable th) {
            throw new IOException("Error while handling event of type " + cls + ": " + th.getMessage(), th);
        }
    }

    public void publish(TaskEvent taskEvent) {
        this.taskEventHandler.publish(taskEvent);
    }

    @Override // org.apache.flink.runtime.io.network.api.reader.ReaderBase
    public void setIterativeReader() {
        this.isIterative = true;
    }

    @Override // org.apache.flink.runtime.io.network.api.reader.ReaderBase
    public void startNextSuperstep() {
        Preconditions.checkState(this.isIterative, "Tried to start next superstep in a non-iterative reader.");
        Preconditions.checkState(this.currentNumberOfEndOfSuperstepEvents == this.inputGate.getNumberOfInputChannels(), "Tried to start next superstep before reaching end of previous superstep.");
        this.currentNumberOfEndOfSuperstepEvents = 0;
    }

    @Override // org.apache.flink.runtime.io.network.api.reader.ReaderBase
    public boolean hasReachedEndOfSuperstep() {
        return this.isIterative && this.currentNumberOfEndOfSuperstepEvents == this.inputGate.getNumberOfInputChannels();
    }

    private boolean incrementEndOfSuperstepEventAndCheck() {
        Preconditions.checkState(this.isIterative, "Tried to increment superstep count in a non-iterative reader.");
        Preconditions.checkState(this.currentNumberOfEndOfSuperstepEvents + 1 <= this.inputGate.getNumberOfInputChannels(), "Received too many (" + this.currentNumberOfEndOfSuperstepEvents + ") end of superstep events.");
        int i = this.currentNumberOfEndOfSuperstepEvents + 1;
        this.currentNumberOfEndOfSuperstepEvents = i;
        return i == this.inputGate.getNumberOfInputChannels();
    }
}
