package org.apache.flink.table.runtime.operators.bundle;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractProcessOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/bundle/ListBundleProcessOperator.class */
public class ListBundleProcessOperator<IN, OUT> extends AbstractProcessOperator<List<IN>, IN, OUT> implements BundleTriggerCallback {
    private static final long serialVersionUID = 1;
    private static final String STATE_NAME = "_list_bundle_process_state_";
    private final BundleTrigger<IN> bundleTrigger;
    private final TypeSerializer<IN> inputSerializer;
    private final boolean isObjectReuseEnabled;
    private transient List<StreamRecord<IN>> bundle;
    private transient ListState<StreamElement> bundleState;
    private transient StreamElementSerializer<IN> streamRecordsSerializer;
    private boolean bundleProcessingFailed;

    public ListBundleProcessOperator(ProcessFunction<List<IN>, OUT> processFunction, long j, TypeSerializer<IN> typeSerializer, boolean z) {
        super(processFunction);
        this.bundleTrigger = new CountBundleTrigger(j);
        this.inputSerializer = typeSerializer;
        this.isObjectReuseEnabled = z;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.streamRecordsSerializer = new StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn(0, getUserCodeClassloader()));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.bundleState.clear();
        try {
            this.bundleState.addAll(new ArrayList(this.bundle));
        } catch (Exception e) {
            this.bundleState.clear();
            throw new Exception("Could not add stream element bundle entries to operator state backend of operator " + getOperatorName(), e);
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.bundleState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(STATE_NAME, this.streamRecordsSerializer));
    }

    public void open() throws Exception {
        super.open();
        this.bundle = new ArrayList();
        this.bundleTrigger.registerCallback(this);
        this.bundleTrigger.reset();
        maybeRecoverBundle();
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        super.processElement(streamRecord);
        if (this.isObjectReuseEnabled) {
            streamRecord = streamRecord.copy(this.inputSerializer.copy(streamRecord.getValue()));
        }
        this.bundle.add(streamRecord);
        this.bundleTrigger.onElement(streamRecord.getValue());
        this.context.reset();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback
    public void finishBundle() throws Exception {
        if (!this.bundle.isEmpty()) {
            try {
                this.userFunction.processElement((List) this.bundle.stream().map((v0) -> {
                    return v0.getValue();
                }).collect(Collectors.toList()), this.context, this.collector);
                this.bundle.clear();
            } catch (Exception e) {
                this.bundleProcessingFailed = true;
                throw e;
            }
        }
        this.bundleTrigger.reset();
    }

    public void processWatermark(Watermark watermark) throws Exception {
        finishBundle();
        super.processWatermark(watermark);
    }

    public void close() throws Exception {
        try {
            if (!this.bundleProcessingFailed) {
                finishBundle();
            }
        } finally {
            try {
                super.close();
            } catch (Exception e) {
                LOG.warn("Errors occurred while closing the ListBundleProcessOperator.", e);
            }
        }
    }

    private void maybeRecoverBundle() throws Exception {
        Iterable iterable = (Iterable) this.bundleState.get();
        if (iterable != null) {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                processElement(((StreamElement) it.next()).asRecord());
            }
        }
    }
}
