package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/runtime/operators/TempBarrier.class */
public class TempBarrier<T> implements CloseableInputProvider<T> {
    private final SpillingBuffer buffer;
    private final TypeSerializer<T> serializer;
    private final TempBarrier<T>.TempWritingThread tempWriter;
    private final MemoryManager memManager;
    private final Object lock = new Object();
    private volatile Throwable exception;
    private final ArrayList<MemorySegment> memory;
    private volatile boolean writingDone;
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/TempBarrier$TempWritingThread.class */
    public final class TempWritingThread extends Thread {
        private final MutableObjectIterator<T> input;
        private final TypeSerializer<T> serializer;
        private final SpillingBuffer buffer;
        private volatile boolean running;

        private TempWritingThread(MutableObjectIterator<T> mutableObjectIterator, TypeSerializer<T> typeSerializer, SpillingBuffer spillingBuffer) {
            super("Temp writer");
            this.running = true;
            setDaemon(true);
            this.input = mutableObjectIterator;
            this.serializer = typeSerializer;
            this.buffer = spillingBuffer;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            MutableObjectIterator<T> mutableObjectIterator = this.input;
            TypeSerializer<T> typeSerializer = this.serializer;
            SpillingBuffer spillingBuffer = this.buffer;
            try {
                T mo3688createInstance = typeSerializer.mo3688createInstance();
                while (this.running) {
                    T next = mutableObjectIterator.next(mo3688createInstance);
                    mo3688createInstance = next;
                    if (next == null) {
                        break;
                    } else {
                        typeSerializer.serialize(mo3688createInstance, spillingBuffer);
                    }
                }
                TempBarrier.this.writingDone();
            } catch (Throwable th) {
                TempBarrier.this.setException(th);
            }
        }

        public void shutdown() {
            this.running = false;
            interrupt();
        }
    }

    public TempBarrier(AbstractInvokable abstractInvokable, MutableObjectIterator<T> mutableObjectIterator, TypeSerializerFactory<T> typeSerializerFactory, MemoryManager memoryManager, IOManager iOManager, int i) throws MemoryAllocationException {
        this.serializer = typeSerializerFactory.getSerializer();
        this.memManager = memoryManager;
        this.memory = new ArrayList<>(i);
        memoryManager.allocatePages(abstractInvokable, this.memory, i);
        this.buffer = new SpillingBuffer(iOManager, new ListMemorySegmentSource(this.memory), memoryManager.getPageSize());
        this.tempWriter = new TempWritingThread(mutableObjectIterator, typeSerializerFactory.getSerializer(), this.buffer);
    }

    public void startReading() {
        this.tempWriter.start();
    }

    @Override // org.apache.flink.runtime.operators.util.CloseableInputProvider
    public MutableObjectIterator<T> getIterator() throws InterruptedException, IOException {
        synchronized (this.lock) {
            while (this.exception == null && !this.writingDone) {
                this.lock.wait(5000L);
            }
        }
        if (this.exception != null) {
            throw new RuntimeException("An error occurred creating the temp table.", this.exception);
        }
        if (this.writingDone) {
            return new InputViewIterator(this.buffer.flip(), this.serializer);
        }
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            if (this.exception == null) {
                this.exception = new Exception("The dam has been closed.");
            }
            this.lock.notifyAll();
            try {
                this.tempWriter.shutdown();
                this.tempWriter.join();
                this.memManager.release(this.buffer.close());
                this.memManager.release(this.memory);
            } catch (InterruptedException e) {
                throw new IOException("Interrupted");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setException(Throwable th) {
        synchronized (this.lock) {
            this.exception = th;
            this.lock.notifyAll();
        }
        try {
            close();
        } catch (Throwable th2) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writingDone() throws IOException {
        synchronized (this.lock) {
            this.writingDone = true;
            this.lock.notifyAll();
        }
    }
}
