package org.apache.flink.table.runtime.operators.bundle;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.AbstractAsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamBundleQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamBundleQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/bundle/AsyncBundleWaitOperator.class */
public class AsyncBundleWaitOperator<IN, OUT> extends AbstractAsyncWaitOperator<IN, OUT, List<IN>> implements BundleTriggerCallback {
    private static final long serialVersionUID = 1;
    private static final String QUEUE_STATE_NAME = "_async_bundle_queue_wait_operator_state_";
    private static final String BUNDLE_STATE_NAME = "_async_bundle_wait_operator_state_";
    private final BundleTrigger<IN> bundleTrigger;
    private final TypeSerializer<IN> inputSerializer;
    private final boolean isObjectReuseEnabled;
    private transient List<StreamRecord<IN>> bundle;
    private transient ListSerializer<StreamElement> queueSerializer;
    private transient ListState<List<StreamElement>> queueState;
    private transient ListState<StreamElement> bundleState;

    public AsyncBundleWaitOperator(AsyncFunction<List<IN>, OUT> asyncFunction, AsyncDataStream.OutputMode outputMode, ProcessingTimeService processingTimeService, AsyncRetryStrategy<OUT> asyncRetryStrategy, MailboxExecutor mailboxExecutor, TypeSerializer<IN> typeSerializer, long j, int i, long j2, boolean z) {
        super(asyncFunction, j, i, outputMode, asyncRetryStrategy, processingTimeService, mailboxExecutor);
        this.bundleTrigger = new CountBundleTrigger(j2);
        this.inputSerializer = typeSerializer;
        this.isObjectReuseEnabled = z;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.queueSerializer = new ListSerializer<>(this.inStreamElementSerializer);
        if (this.outputMode != AsyncDataStream.OutputMode.ORDERED) {
            throw new IllegalStateException("Unsupported output mode: " + this.outputMode);
        }
        this.queue = new OrderedStreamBundleQueue(this.capacity);
    }

    public void open() throws Exception {
        super.open();
        this.bundle = new ArrayList();
        this.bundleTrigger.registerCallback(this);
        this.bundleTrigger.reset();
        maybeRestoreFromState();
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.queueState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(QUEUE_STATE_NAME, this.queueSerializer));
        this.bundleState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(BUNDLE_STATE_NAME, this.inStreamElementSerializer));
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        if (this.isObjectReuseEnabled) {
            streamRecord = streamRecord.copy(this.inputSerializer.copy(streamRecord.getValue()));
        }
        this.bundle.add(streamRecord);
        this.bundleTrigger.onElement(streamRecord.getValue());
    }

    public void processWatermark(Watermark watermark) throws Exception {
        finishBundle();
        super.processWatermark(watermark);
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback
    public void finishBundle() throws Exception {
        if (this.bundle.isEmpty()) {
            return;
        }
        List list = (List) this.bundle.stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
        AbstractAsyncWaitOperator.ResultHandler resultHandler = new AbstractAsyncWaitOperator.ResultHandler(this, list, addBundleToWorkQueue());
        maybeRegisterTimer(resultHandler);
        this.userFunction.asyncInvoke(list, resultHandler);
        this.bundle.clear();
        this.bundleTrigger.reset();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        writeToState(this.queueState, getQueue().bundles());
        writeToState(this.bundleState, new ArrayList(this.bundle));
    }

    public void endInput() throws Exception {
        finishBundle();
        super.endInput();
    }

    public void close() throws Exception {
        try {
            finishBundle();
        } finally {
            try {
                super.close();
            } catch (Exception e) {
                LOG.warn("Errors occurred while closing the AsyncBundleWaitOperator.", e);
            }
        }
    }

    private <T> void writeToState(ListState<T> listState, List<T> list) {
        try {
            listState.clear();
            listState.addAll(list);
        } catch (Exception e) {
            listState.clear();
            throw new RuntimeException("Could not add stream element queue entries to operator state backend of operator " + getOperatorName(), e);
        }
    }

    private void maybeRestoreFromState() throws Exception {
        Iterable<List> iterable = (Iterable) this.queueState.get();
        if (iterable != null) {
            for (List list : iterable) {
                StreamElement streamElement = (StreamElement) list.get(0);
                if (streamElement.isRecord()) {
                    this.bundle = (List) list.stream().map((v0) -> {
                        return v0.asRecord();
                    }).collect(Collectors.toList());
                    finishBundle();
                } else if (streamElement.isWatermark()) {
                    super.processWatermark(streamElement.asWatermark());
                } else {
                    if (!streamElement.isLatencyMarker()) {
                        throw new IllegalStateException("Unknown record type " + streamElement.getClass() + " encountered while opening the operator.");
                    }
                    processLatencyMarker(streamElement.asLatencyMarker());
                }
            }
        }
        if (this.bundleState != null) {
            Iterator it = ((Iterable) this.bundleState.get()).iterator();
            while (it.hasNext()) {
                processElement(((StreamElement) it.next()).asRecord());
            }
        }
    }

    private ResultFuture<OUT> addBundleToWorkQueue() throws InterruptedException {
        ArrayList arrayList = new ArrayList(this.bundle);
        while (true) {
            Optional tryPut = getQueue().tryPut(arrayList);
            if (tryPut.isPresent()) {
                return (ResultFuture) tryPut.get();
            }
            this.mailboxExecutor.yield();
        }
    }

    private StreamBundleQueue<OUT> getQueue() {
        return this.queue;
    }
}
