package org.apache.flink.python.api.streaming.data;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;

/* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender.class */
public abstract class PythonSender implements Serializable {
    private static final long serialVersionUID = -2004095650353962110L;
    public static final byte TYPE_ARRAY = 63;
    public static final byte TYPE_KEY_VALUE = 62;
    public static final byte TYPE_VALUE_VALUE = 61;
    private transient RandomAccessFile outputRAF;
    private transient FileChannel outputChannel;
    private transient MappedByteBuffer fileBuffer;
    private final long mappedFileSizeBytes;
    private final Configuration config;

    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$ArraySerializer.class */
    private static class ArraySerializer extends Serializer<byte[]> {
        private ArraySerializer() {
        }

        @Override // org.apache.flink.python.api.streaming.data.PythonSender.Serializer
        public void serializeInternal(byte[] bArr) {
            this.buffer = ByteBuffer.allocate(bArr.length + 1);
            this.buffer.put((byte) 63);
            this.buffer.put(bArr);
        }
    }

    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$KeyValuePairSerializer.class */
    private static class KeyValuePairSerializer extends Serializer<Tuple2<Tuple, byte[]>> {
        private KeyValuePairSerializer() {
        }

        @Override // org.apache.flink.python.api.streaming.data.PythonSender.Serializer
        public void serializeInternal(Tuple2<Tuple, byte[]> tuple2) {
            int i = 0;
            for (int i2 = 0; i2 < ((Tuple) tuple2.f0).getArity(); i2++) {
                i += ((byte[]) ((Tuple) tuple2.f0).getField(i2)).length;
            }
            this.buffer = ByteBuffer.allocate(5 + i + ((byte[]) tuple2.f1).length);
            this.buffer.put((byte) 62);
            this.buffer.put((byte) ((Tuple) tuple2.f0).getArity());
            for (int i3 = 0; i3 < ((Tuple) tuple2.f0).getArity(); i3++) {
                this.buffer.put((byte[]) ((Tuple) tuple2.f0).getField(i3));
            }
            this.buffer.put((byte[]) tuple2.f1);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$Serializer.class */
    public static abstract class Serializer<T> {
        protected ByteBuffer buffer;

        protected Serializer() {
        }

        public ByteBuffer serialize(T t) {
            serializeInternal(t);
            this.buffer.flip();
            return this.buffer;
        }

        protected abstract void serializeInternal(T t);
    }

    /* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonSender$ValuePairSerializer.class */
    private static class ValuePairSerializer extends Serializer<Tuple2<byte[], byte[]>> {
        private ValuePairSerializer() {
        }

        @Override // org.apache.flink.python.api.streaming.data.PythonSender.Serializer
        public void serializeInternal(Tuple2<byte[], byte[]> tuple2) {
            this.buffer = ByteBuffer.allocate(1 + ((byte[]) tuple2.f0).length + ((byte[]) tuple2.f1).length);
            this.buffer.put((byte) 61);
            this.buffer.put((byte[]) tuple2.f0);
            this.buffer.put((byte[]) tuple2.f1);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PythonSender(Configuration configuration) {
        this.config = configuration;
        this.mappedFileSizeBytes = configuration.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
    }

    public void open(File file) throws IOException {
        file.mkdirs();
        if (file.exists()) {
            file.delete();
        }
        file.createNewFile();
        this.outputRAF = new RandomAccessFile(file, "rw");
        this.outputRAF.setLength(this.mappedFileSizeBytes);
        this.outputRAF.seek(this.mappedFileSizeBytes - 1);
        this.outputRAF.writeByte(0);
        this.outputRAF.seek(0L);
        this.outputChannel = this.outputRAF.getChannel();
        this.fileBuffer = this.outputChannel.map(FileChannel.MapMode.READ_WRITE, 0L, this.mappedFileSizeBytes);
    }

    public void close() throws IOException {
        closeMappedFile();
    }

    private void closeMappedFile() throws IOException {
        this.outputChannel.close();
        this.outputRAF.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <IN> int sendBuffer(SingleElementPushBackIterator<IN> singleElementPushBackIterator, Serializer<IN> serializer) throws IOException {
        this.fileBuffer.clear();
        while (true) {
            if (!singleElementPushBackIterator.hasNext()) {
                break;
            }
            IN next = singleElementPushBackIterator.next();
            ByteBuffer serialize = serializer.serialize(next);
            if (serialize.remaining() <= this.mappedFileSizeBytes) {
                if (serialize.remaining() > this.fileBuffer.remaining()) {
                    singleElementPushBackIterator.pushBack(next);
                    break;
                }
                this.fileBuffer.put(serialize);
            } else {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
        }
        return this.fileBuffer.position();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <IN> Serializer<IN> getSerializer(IN in) {
        if (in instanceof byte[]) {
            return new ArraySerializer();
        }
        if (((Tuple2) in).f0 instanceof byte[]) {
            return new ValuePairSerializer();
        }
        if (((Tuple2) in).f0 instanceof Tuple) {
            return new KeyValuePairSerializer();
        }
        throw new IllegalArgumentException("This object can't be serialized: " + in);
    }
}
