package org.apache.flink.runtime.operators.resettable;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.ResettableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/resettable/SpillingResettableIterator.class */
public class SpillingResettableIterator<T> implements ResettableIterator<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpillingResettableIterator.class);
    private T next;
    private T instance;
    protected DataInputView inView;
    protected final TypeSerializer<T> serializer;
    private long elementCount;
    private long currentElementNum;
    protected final SpillingBuffer buffer;
    protected final Iterator<T> input;
    protected final MemoryManager memoryManager;
    private final List<MemorySegment> memorySegments;
    private final boolean releaseMemoryOnClose;

    public SpillingResettableIterator(Iterator<T> it, TypeSerializer<T> typeSerializer, MemoryManager memoryManager, IOManager iOManager, int i, AbstractInvokable abstractInvokable) throws MemoryAllocationException {
        this((Iterator) it, (TypeSerializer) typeSerializer, memoryManager, iOManager, memoryManager.allocatePages(abstractInvokable, i), true);
    }

    public SpillingResettableIterator(Iterator<T> it, TypeSerializer<T> typeSerializer, MemoryManager memoryManager, IOManager iOManager, List<MemorySegment> list) {
        this((Iterator) it, (TypeSerializer) typeSerializer, memoryManager, iOManager, list, false);
    }

    private SpillingResettableIterator(Iterator<T> it, TypeSerializer<T> typeSerializer, MemoryManager memoryManager, IOManager iOManager, List<MemorySegment> list, boolean z) {
        this.memoryManager = memoryManager;
        this.input = it;
        this.instance = typeSerializer.mo4656createInstance();
        this.serializer = typeSerializer;
        this.memorySegments = list;
        this.releaseMemoryOnClose = z;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating spilling resettable iterator with " + list.size() + " pages of memory.");
        }
        this.buffer = new SpillingBuffer(iOManager, new ListMemorySegmentSource(list), memoryManager.getPageSize());
    }

    public void open() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Spilling Resettable Iterator opened.");
        }
    }

    @Override // org.apache.flink.runtime.util.ResettableIterator
    public void reset() throws IOException {
        this.inView = this.buffer.flip();
        this.currentElementNum = 0L;
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if (this.next != null) {
            return true;
        }
        if (this.inView != null) {
            if (this.currentElementNum >= this.elementCount) {
                return false;
            }
            try {
                this.instance = this.serializer.deserialize(this.instance, this.inView);
                this.next = this.instance;
                this.currentElementNum++;
                return true;
            } catch (IOException e) {
                throw new RuntimeException("SpillingIterator: Error reading element from buffer.", e);
            }
        }
        if (!this.input.hasNext()) {
            return false;
        }
        this.next = this.input.next();
        try {
            this.serializer.serialize(this.next, this.buffer);
            this.elementCount++;
            return true;
        } catch (IOException e2) {
            throw new RuntimeException("SpillingIterator: Error writing element to buffer.", e2);
        }
    }

    @Override // java.util.Iterator
    public T next() {
        if (this.next == null && !hasNext()) {
            throw new NoSuchElementException();
        }
        T t = this.next;
        this.next = null;
        return t;
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException();
    }

    public List<MemorySegment> close() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Spilling Resettable Iterator closing. Stored " + this.elementCount + " records.");
        }
        this.inView = null;
        List<MemorySegment> close = this.buffer.close();
        close.addAll(this.memorySegments);
        this.memorySegments.clear();
        if (!this.releaseMemoryOnClose) {
            return close;
        }
        this.memoryManager.release(close);
        return Collections.emptyList();
    }
}
