package org.apache.flink.streaming.api.operators.async;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperationKindTag;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AbstractAsyncWaitOperator.class */
public abstract class AbstractAsyncWaitOperator<IN, OUT, FIN> extends AbstractUdfStreamOperator<OUT, AsyncFunction<FIN, OUT>> implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    protected final int capacity;
    protected final AsyncDataStream.OutputMode outputMode;
    protected transient StreamElementSerializer<IN> inStreamElementSerializer;
    protected transient StreamElementQueue<OUT> queue;
    protected final transient MailboxExecutor mailboxExecutor;
    private final long timeout;
    private transient TimestampedCollector<OUT> timestampedCollector;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AbstractAsyncWaitOperator$ResultHandler.class */
    protected class ResultHandler implements ResultFuture<OUT> {
        private ScheduledFuture<?> timeoutTimer;
        private final ResultFuture<OUT> resultFuture;
        private final AtomicBoolean completed = new AtomicBoolean(false);

        public ResultHandler(ResultFuture<OUT> resultFuture) {
            this.resultFuture = resultFuture;
        }

        public void setTimeoutTimer(ScheduledFuture<?> scheduledFuture) {
            this.timeoutTimer = scheduledFuture;
        }

        @Override // org.apache.flink.streaming.api.functions.async.ResultFuture
        public void complete(Collection<OUT> collection) {
            Preconditions.checkNotNull(collection, "Results must not be null, use empty collection to emit nothing");
            if (this.completed.compareAndSet(false, true)) {
                processInMailbox(collection);
            }
        }

        private void processInMailbox(Collection<OUT> collection) {
            AbstractAsyncWaitOperator.this.mailboxExecutor.execute(() -> {
                processResults(collection);
            }, "Result in AsyncWaitOperator of input %s", new Object[]{collection});
        }

        private void processResults(Collection<OUT> collection) {
            if (this.timeoutTimer != null) {
                this.timeoutTimer.cancel(true);
            }
            this.resultFuture.complete(collection);
            AbstractAsyncWaitOperator.this.outputCompletedElement();
        }

        @Override // org.apache.flink.streaming.api.functions.async.ResultFuture
        public void completeExceptionally(Throwable th) {
            if (this.completed.compareAndSet(false, true)) {
                AbstractAsyncWaitOperator.this.getContainingTask().getEnvironment().failExternally(wrapError(th));
                processInMailbox(Collections.emptyList());
            }
        }

        protected Exception wrapError(Throwable th) {
            return new Exception("Could not complete the stream elements batch", th);
        }
    }

    public AbstractAsyncWaitOperator(@Nonnull AsyncFunction<FIN, OUT> asyncFunction, long j, int i, @Nonnull AsyncDataStream.OutputMode outputMode, @Nonnull ProcessingTimeService processingTimeService, @Nonnull MailboxExecutor mailboxExecutor) {
        super(asyncFunction);
        setChainingStrategy(ChainingStrategy.ALWAYS);
        Preconditions.checkArgument(i > 0, "The number of concurrent async operation should be greater than 0.");
        this.capacity = i;
        this.outputMode = (AsyncDataStream.OutputMode) Preconditions.checkNotNull(outputMode, "outputMode");
        this.timeout = j;
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.mailboxExecutor = mailboxExecutor;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.SetupableStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
        this.timestampedCollector = new TimestampedCollector<>(output);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        addToWorkQueue(watermark);
        outputCompletedElement();
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        waitInFlightInputsFinished();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException {
        while (true) {
            Optional<ResultFuture<OUT>> tryPut = this.queue.tryPut(streamElement);
            if (tryPut.isPresent()) {
                return tryPut.get();
            }
            this.mailboxExecutor.yield();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybeRegisterTimer(AbstractAsyncWaitOperator<IN, OUT, FIN>.ResultHandler resultHandler, FIN fin) {
        if (this.timeout > 0) {
            resultHandler.setTimeoutTimer(getProcessingTimeService().registerTimer(this.timeout + getProcessingTimeService().getCurrentProcessingTime(), j -> {
                ((AsyncFunction) this.userFunction).timeout(fin, resultHandler);
            }));
        }
    }

    private void waitInFlightInputsFinished() throws InterruptedException {
        while (!this.queue.isEmpty()) {
            this.mailboxExecutor.yield();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void outputCompletedElement() {
        if (this.queue.hasCompletedElements()) {
            this.queue.emitCompletedElement(this.timestampedCollector);
            if (this.queue.hasCompletedElements()) {
                this.mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
            }
        }
    }

    public Set<OperationKindTag> getOperationKindTags() {
        return OperationKindTag.asSet(new OperationKindTag[]{OperationKindTag.ASYNC, OperationKindTag.UDF, OperationKindTag.ONE_INPUT});
    }
}
