package org.apache.flink.streaming.api.operators.sort;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MutableObjectIterator;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/*  JADX ERROR: NullPointerException in pass: ProcessKotlinInternals
    java.lang.NullPointerException
    */
/* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/SortingDataInput.class */
public final class SortingDataInput<T, K> implements StreamTaskInput<T> {
    private final StreamTaskInput<T> wrappedInput;
    private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter;
    private final KeySelector<T, K> keySelector;
    private final TypeSerializer<K> keySerializer;
    private final DataOutputSerializer dataOutputSerializer;
    private final SortingDataInput<T, K>.ForwardingDataOutput forwardingDataOutput;
    private boolean emittedLast;
    private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null;
    private long watermarkSeen = Long.MIN_VALUE;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/SortingDataInput$ForwardingDataOutput.class */
    private class ForwardingDataOutput implements PushingAsyncDataInput.DataOutput<T> {
        private ForwardingDataOutput() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            SortingDataInput.this.keySerializer.serialize(SortingDataInput.this.keySelector.getKey(streamRecord.getValue()), SortingDataInput.this.dataOutputSerializer);
            byte[] copyOfBuffer = SortingDataInput.this.dataOutputSerializer.getCopyOfBuffer();
            SortingDataInput.this.dataOutputSerializer.clear();
            SortingDataInput.this.sorter.writeRecord(Tuple2.of(copyOfBuffer, streamRecord));
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.flink.streaming.api.operators.sort.SortingDataInput.access$502(org.apache.flink.streaming.api.operators.sort.SortingDataInput, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.flink.streaming.api.operators.sort.SortingDataInput
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(org.apache.flink.streaming.api.watermark.Watermark r7) {
            /*
                r6 = this;
                r0 = r6
                org.apache.flink.streaming.api.operators.sort.SortingDataInput r0 = org.apache.flink.streaming.api.operators.sort.SortingDataInput.this
                r1 = r6
                org.apache.flink.streaming.api.operators.sort.SortingDataInput r1 = org.apache.flink.streaming.api.operators.sort.SortingDataInput.this
                long r1 = org.apache.flink.streaming.api.operators.sort.SortingDataInput.access$500(r1)
                r2 = r7
                long r2 = r2.getTimestamp()
                long r1 = java.lang.Math.max(r1, r2)
                long r0 = org.apache.flink.streaming.api.operators.sort.SortingDataInput.access$502(r0, r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.sort.SortingDataInput.ForwardingDataOutput.emitWatermark(org.apache.flink.streaming.api.watermark.Watermark):void");
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    public SortingDataInput(StreamTaskInput<T> streamTaskInput, TypeSerializer<T> typeSerializer, TypeSerializer<K> typeSerializer2, KeySelector<T, K> keySelector, MemoryManager memoryManager, IOManager iOManager, boolean z, double d, Configuration configuration, TaskInvokable taskInvokable, ExecutionConfig executionConfig) {
        TypeComparator variableLengthByteKeyComparator;
        try {
            this.forwardingDataOutput = new ForwardingDataOutput();
            this.keySelector = keySelector;
            this.keySerializer = typeSerializer2;
            int length = typeSerializer2.getLength();
            if (length > 0) {
                this.dataOutputSerializer = new DataOutputSerializer(length);
                variableLengthByteKeyComparator = new FixedLengthByteKeyComparator(length);
            } else {
                this.dataOutputSerializer = new DataOutputSerializer(64);
                variableLengthByteKeyComparator = new VariableLengthByteKeyComparator();
            }
            KeyAndValueSerializer keyAndValueSerializer = new KeyAndValueSerializer(typeSerializer, length);
            this.wrappedInput = streamTaskInput;
            this.sorter = ExternalSorter.newBuilder(memoryManager, taskInvokable, keyAndValueSerializer, variableLengthByteKeyComparator, executionConfig).memoryFraction(d).enableSpilling(iOManager, ((Float) configuration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)).floatValue()).maxNumFileHandles(((Integer) configuration.get(AlgorithmOptions.SPILLING_MAX_FAN)).intValue()).objectReuse(z).largeRecords(((Boolean) configuration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)).booleanValue()).build();
        } catch (MemoryAllocationException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public int getInputIndex() {
        return this.wrappedInput.getInputIndex();
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
        throw new UnsupportedOperationException("Checkpoints are not supported with sorted inputs in the BATCH runtime.");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        try {
            this.wrappedInput.close();
        } catch (IOException e) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, null);
        }
        try {
            this.sorter.close();
        } catch (IOException e2) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e2, iOException);
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<T> dataOutput) throws Exception {
        if (this.sortedInput != null) {
            return emitNextSortedRecord(dataOutput);
        }
        DataInputStatus emitNext = this.wrappedInput.emitNext(this.forwardingDataOutput);
        if (emitNext != DataInputStatus.END_OF_DATA) {
            return emitNext;
        }
        endSorting();
        return emitNextSortedRecord(dataOutput);
    }

    @Nonnull
    private DataInputStatus emitNextSortedRecord(PushingAsyncDataInput.DataOutput<T> dataOutput) throws Exception {
        if (this.emittedLast) {
            return DataInputStatus.END_OF_INPUT;
        }
        Tuple2<byte[], StreamRecord<T>> next = this.sortedInput.next();
        if (next != null) {
            dataOutput.emitRecord(next.f1);
            return DataInputStatus.MORE_AVAILABLE;
        }
        this.emittedLast = true;
        if (this.watermarkSeen > Long.MIN_VALUE) {
            dataOutput.emitWatermark(new Watermark(this.watermarkSeen));
        }
        return DataInputStatus.END_OF_DATA;
    }

    private void endSorting() throws Exception {
        this.sorter.finishReading();
        this.sortedInput = this.sorter.getIterator();
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return this.sortedInput != null ? AvailabilityProvider.AVAILABLE : this.wrappedInput.getAvailableFuture();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.flink.streaming.api.operators.sort.SortingDataInput.access$502(org.apache.flink.streaming.api.operators.sort.SortingDataInput, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$502(org.apache.flink.streaming.api.operators.sort.SortingDataInput r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.watermarkSeen = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.sort.SortingDataInput.access$502(org.apache.flink.streaming.api.operators.sort.SortingDataInput, long):long");
    }
}
