package org.apache.flink.streaming.api.operators.sort;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.MutableObjectIterator;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/*  JADX ERROR: NullPointerException in pass: ProcessKotlinInternals
    java.lang.NullPointerException
    */
/* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.class */
public final class MultiInputSortingDataInput<IN, K> implements StreamTaskInput<IN> {
    private final int idx;
    private final StreamTaskInput<IN> wrappedInput;
    private final PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter;
    private final CommonContext commonContext;
    private final KeySelector<IN, K> keySelector;
    private final TypeSerializer<K> keySerializer;
    private final DataOutputSerializer dataOutputSerializer;
    private MutableObjectIterator<Tuple2<byte[], StreamRecord<IN>>> sortedInput;
    private final MultiInputSortingDataInput<IN, K>.SortingPhaseDataOutput sortingPhaseDataOutput = new SortingPhaseDataOutput();
    private long seenWatermark = Long.MIN_VALUE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput$CommonContext.class */
    public static final class CommonContext {
        private long notFinishedSortingMask;
        private final PriorityQueue<HeadElement> queueOfHeads = new PriorityQueue<>();
        private final AvailabilityProvider.AvailabilityHelper allFinished = new AvailabilityProvider.AvailabilityHelper();
        private long finishedEmitting = 0;

        public CommonContext(StreamTaskInput<Object>[] streamTaskInputArr) {
            this.notFinishedSortingMask = 0L;
            for (StreamTaskInput<Object> streamTaskInput : streamTaskInputArr) {
                this.notFinishedSortingMask = setBitMask(this.notFinishedSortingMask, streamTaskInput.getInputIndex());
            }
        }

        public boolean allSorted() {
            return this.notFinishedSortingMask == 0;
        }

        public void setFinishedSorting(int i) {
            this.notFinishedSortingMask = unsetBitMask(this.notFinishedSortingMask, i);
        }

        public void setFinishedEmitting(int i) {
            this.finishedEmitting = setBitMask(this.finishedEmitting, i);
        }

        public boolean isFinishedEmitting(int i) {
            return checkBitMask(this.finishedEmitting, i);
        }

        public PriorityQueue<HeadElement> getQueueOfHeads() {
            return this.queueOfHeads;
        }

        public AvailabilityProvider.AvailabilityHelper getAllFinished() {
            return this.allFinished;
        }

        private static long setBitMask(long j, int i) {
            return j | (1 << i);
        }

        private static long unsetBitMask(long j, int i) {
            return j & ((1 << i) ^ (-1));
        }

        private static boolean checkBitMask(long j, int i) {
            return (j & (1 << i)) != 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput$HeadElement.class */
    public static final class HeadElement implements Comparable<HeadElement> {
        final int inputIndex;
        Tuple2<byte[], StreamRecord<Object>> streamElement;

        private HeadElement(int i) {
            this.inputIndex = i;
        }

        @Override // java.lang.Comparable
        public int compareTo(HeadElement headElement) {
            int compare = compare(this.streamElement.f0, headElement.streamElement.f0);
            return compare != 0 ? compare : Long.compare(this.streamElement.f1.asRecord().getTimestamp(), headElement.streamElement.f1.asRecord().getTimestamp());
        }

        private int compare(byte[] bArr, byte[] bArr2) {
            int length = bArr.length;
            int length2 = bArr2.length;
            int min = Math.min(length, length2);
            for (int i = 0; i < min; i++) {
                int compare = Byte.compare(bArr[i], bArr2[i]);
                if (compare != 0) {
                    return compare;
                }
            }
            return Integer.compare(length, length2);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput$InputSelector.class */
    private static class InputSelector implements InputSelectable, BoundedMultiInput {
        private final CommonContext commonContext;
        private final int numInputs;
        private final Queue<Integer> passThroughInputsIndices;

        private InputSelector(CommonContext commonContext, int i, List<Integer> list) {
            this.commonContext = commonContext;
            this.numInputs = i;
            this.passThroughInputsIndices = new LinkedList(list);
        }

        @Override // org.apache.flink.streaming.api.operators.BoundedMultiInput
        public void endInput(int i) throws Exception {
            this.passThroughInputsIndices.remove(Integer.valueOf(i));
        }

        @Override // org.apache.flink.streaming.api.operators.InputSelectable
        public InputSelection nextSelection() {
            HeadElement peek;
            Integer peek2 = this.passThroughInputsIndices.peek();
            return peek2 != null ? new InputSelection.Builder().select(peek2.intValue() + 1).build(this.numInputs) : (!this.commonContext.allSorted() || (peek = this.commonContext.getQueueOfHeads().peek()) == null) ? InputSelection.ALL : new InputSelection.Builder().select(peek.inputIndex + 1).build(this.numInputs);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput$SelectableSortingInputs.class */
    public static class SelectableSortingInputs {
        private final InputSelectable inputSelectable;
        private final StreamTaskInput<?>[] sortedInputs;
        private final StreamTaskInput<?>[] passThroughInputs;

        public SelectableSortingInputs(StreamTaskInput<?>[] streamTaskInputArr, StreamTaskInput<?>[] streamTaskInputArr2, InputSelectable inputSelectable) {
            this.sortedInputs = streamTaskInputArr;
            this.passThroughInputs = streamTaskInputArr2;
            this.inputSelectable = inputSelectable;
        }

        public InputSelectable getInputSelectable() {
            return this.inputSelectable;
        }

        public StreamTaskInput<?>[] getSortedInputs() {
            return this.sortedInputs;
        }

        public StreamTaskInput<?>[] getPassThroughInputs() {
            return this.passThroughInputs;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput$SortingPhaseDataOutput.class */
    private class SortingPhaseDataOutput implements PushingAsyncDataInput.DataOutput<IN> {
        private SortingPhaseDataOutput() {
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<IN> streamRecord) throws Exception {
            MultiInputSortingDataInput.this.keySerializer.serialize(MultiInputSortingDataInput.this.keySelector.getKey(streamRecord.getValue()), MultiInputSortingDataInput.this.dataOutputSerializer);
            byte[] copyOfBuffer = MultiInputSortingDataInput.this.dataOutputSerializer.getCopyOfBuffer();
            MultiInputSortingDataInput.this.dataOutputSerializer.clear();
            MultiInputSortingDataInput.this.sorter.writeRecord(Tuple2.of(copyOfBuffer, streamRecord));
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.access$702(org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(org.apache.flink.streaming.api.watermark.Watermark r7) {
            /*
                r6 = this;
                r0 = r6
                org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput r0 = org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.this
                r1 = r6
                org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput r1 = org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.this
                long r1 = org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.access$700(r1)
                r2 = r7
                long r2 = r2.getTimestamp()
                long r1 = java.lang.Math.max(r1, r2)
                long r0 = org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.access$702(r0, r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.SortingPhaseDataOutput.emitWatermark(org.apache.flink.streaming.api.watermark.Watermark):void");
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitStreamStatus(StreamStatus streamStatus) {
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    private MultiInputSortingDataInput(CommonContext commonContext, StreamTaskInput<IN> streamTaskInput, int i, PushSorter<Tuple2<byte[], StreamRecord<IN>>> pushSorter, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer, DataOutputSerializer dataOutputSerializer) {
        this.wrappedInput = streamTaskInput;
        this.idx = i;
        this.commonContext = commonContext;
        this.sorter = pushSorter;
        this.keySelector = keySelector;
        this.keySerializer = typeSerializer;
        this.dataOutputSerializer = dataOutputSerializer;
    }

    public static <K> SelectableSortingInputs wrapInputs(AbstractInvokable abstractInvokable, StreamTaskInput<Object>[] streamTaskInputArr, KeySelector<Object, K>[] keySelectorArr, TypeSerializer<Object>[] typeSerializerArr, TypeSerializer<K> typeSerializer, StreamTaskInput<Object>[] streamTaskInputArr2, MemoryManager memoryManager, IOManager iOManager, boolean z, double d, Configuration configuration) {
        DataOutputSerializer dataOutputSerializer;
        TypeComparator variableLengthByteKeyComparator;
        int length = typeSerializer.getLength();
        if (length > 0) {
            dataOutputSerializer = new DataOutputSerializer(length);
            variableLengthByteKeyComparator = new FixedLengthByteKeyComparator(length);
        } else {
            dataOutputSerializer = new DataOutputSerializer(64);
            variableLengthByteKeyComparator = new VariableLengthByteKeyComparator();
        }
        List list = (List) Arrays.stream(streamTaskInputArr2).map((v0) -> {
            return v0.getInputIndex();
        }).collect(Collectors.toList());
        int length2 = streamTaskInputArr.length + streamTaskInputArr2.length;
        CommonContext commonContext = new CommonContext(streamTaskInputArr);
        InputSelector inputSelector = new InputSelector(commonContext, length2, list);
        TypeComparator typeComparator = variableLengthByteKeyComparator;
        DataOutputSerializer dataOutputSerializer2 = dataOutputSerializer;
        return new SelectableSortingInputs((StreamTaskInput[]) IntStream.range(0, streamTaskInputArr.length).mapToObj(i -> {
            try {
                return new MultiInputSortingDataInput(commonContext, streamTaskInputArr[i], streamTaskInputArr[i].getInputIndex(), ExternalSorter.newBuilder(memoryManager, abstractInvokable, new KeyAndValueSerializer(typeSerializerArr[i], length), typeComparator).memoryFraction(d / length2).enableSpilling(iOManager, ((Float) configuration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)).floatValue()).maxNumFileHandles(((Integer) configuration.get(AlgorithmOptions.SPILLING_MAX_FAN)).intValue() / length2).objectReuse(z).largeRecords(true).build(), keySelectorArr[i], typeSerializer, dataOutputSerializer2);
            } catch (MemoryAllocationException e) {
                throw new RuntimeException();
            }
        }).toArray(i2 -> {
            return new StreamTaskInput[i2];
        }), (StreamTaskInput[]) Arrays.stream(streamTaskInputArr2).map(streamTaskInput -> {
            return new ObservableStreamTaskInput(streamTaskInput, inputSelector);
        }).toArray(i3 -> {
            return new StreamTaskInput[i3];
        }), inputSelector);
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public int getInputIndex() {
        return this.idx;
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) {
        throw new UnsupportedOperationException("Checkpoints are not supported with sorted inputs in the BATCH runtime.");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        try {
            this.wrappedInput.close();
        } catch (IOException e) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, null);
        }
        try {
            this.sorter.close();
        } catch (IOException e2) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e2, iOException);
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput
    public InputStatus emitNext(PushingAsyncDataInput.DataOutput<IN> dataOutput) throws Exception {
        if (this.sortedInput != null) {
            return emitNextAfterSorting(dataOutput);
        }
        InputStatus emitNext = this.wrappedInput.emitNext(this.sortingPhaseDataOutput);
        if (emitNext != InputStatus.END_OF_INPUT) {
            return emitNext;
        }
        endSorting();
        return addNextToQueue(new HeadElement(this.idx), dataOutput);
    }

    @Nonnull
    private InputStatus emitNextAfterSorting(PushingAsyncDataInput.DataOutput<IN> dataOutput) throws Exception {
        if (this.commonContext.isFinishedEmitting(this.idx)) {
            return InputStatus.END_OF_INPUT;
        }
        if (!this.commonContext.allSorted()) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        HeadElement peek = this.commonContext.getQueueOfHeads().peek();
        if (peek == null || peek.inputIndex != this.idx) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        HeadElement poll = this.commonContext.getQueueOfHeads().poll();
        dataOutput.emitRecord((StreamRecord) poll.streamElement.f1);
        return addNextToQueue(poll, dataOutput);
    }

    private void endSorting() throws Exception {
        this.sorter.finishReading();
        this.commonContext.setFinishedSorting(this.idx);
        this.sortedInput = this.sorter.getIterator();
        if (this.commonContext.allSorted()) {
            this.commonContext.getAllFinished().getUnavailableToResetAvailable().complete(null);
        }
    }

    @Nonnull
    private InputStatus addNextToQueue(HeadElement headElement, PushingAsyncDataInput.DataOutput<IN> dataOutput) throws Exception {
        HeadElement peek;
        Tuple2<byte[], StreamRecord<IN>> next = this.sortedInput.next();
        if (next != null) {
            headElement.streamElement = getAsObject(next);
            this.commonContext.getQueueOfHeads().add(headElement);
            return (this.commonContext.allSorted() && (peek = this.commonContext.getQueueOfHeads().peek()) != null && peek.inputIndex == this.idx) ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;
        }
        this.commonContext.setFinishedEmitting(this.idx);
        if (this.seenWatermark > Long.MIN_VALUE) {
            dataOutput.emitWatermark(new Watermark(this.seenWatermark));
        }
        return InputStatus.END_OF_INPUT;
    }

    private Tuple2<byte[], StreamRecord<Object>> getAsObject(Tuple2<byte[], StreamRecord<IN>> tuple2) {
        return tuple2;
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return this.sortedInput != null ? this.commonContext.getAllFinished().getAvailableFuture() : this.wrappedInput.getAvailableFuture();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.access$702(org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$702(org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.seenWatermark = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput.access$702(org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput, long):long");
    }
}
