package org.apache.flink.table.runtime.operators.aggregate;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.class */
public class MiniBatchGroupAggFunction extends MapBundleFunction<RowData, List<RowData>, RowData, RowData> {
    private static final long serialVersionUID = 7455939331036508477L;
    private final GeneratedAggsHandleFunction genAggsHandler;
    private final GeneratedRecordEqualiser genRecordEqualiser;
    private final LogicalType[] accTypes;
    private final RowType inputType;
    private final RecordCounter recordCounter;
    private final boolean generateUpdateBefore;
    private final long stateRetentionTime;
    private final boolean isColdStateEnabled;
    private final long coldStateRetentionTime;
    private transient TypeSerializer<RowData> inputRowSerializer;
    private transient JoinedRowData resultRow = new JoinedRowData();
    private transient AggsHandleFunction function = null;
    private transient RecordEqualiser equaliser = null;
    private transient ValueState<RowData> accState = null;

    public MiniBatchGroupAggFunction(GeneratedAggsHandleFunction generatedAggsHandleFunction, GeneratedRecordEqualiser generatedRecordEqualiser, LogicalType[] logicalTypeArr, RowType rowType, int i, boolean z, long j, boolean z2, long j2) {
        this.genAggsHandler = generatedAggsHandleFunction;
        this.genRecordEqualiser = generatedRecordEqualiser;
        this.recordCounter = RecordCounter.of(i);
        this.accTypes = logicalTypeArr;
        this.inputType = rowType;
        this.generateUpdateBefore = z;
        this.stateRetentionTime = j;
        this.isColdStateEnabled = z2;
        this.coldStateRetentionTime = j2;
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void open(ExecutionContext executionContext) throws Exception {
        super.open(executionContext);
        StateTtlConfig createTtlConfig = StateConfigUtil.createTtlConfig(this.stateRetentionTime, this.isColdStateEnabled, this.coldStateRetentionTime);
        this.function = this.genAggsHandler.newInstance(executionContext.getRuntimeContext().getUserCodeClassLoader());
        this.function.open(new PerKeyStateDataViewStore(executionContext.getRuntimeContext(), createTtlConfig));
        this.equaliser = this.genRecordEqualiser.newInstance(executionContext.getRuntimeContext().getUserCodeClassLoader());
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("accState", InternalTypeInfo.ofFields(this.accTypes));
        if (createTtlConfig.isEnabled()) {
            valueStateDescriptor.enableTimeToLive(createTtlConfig);
        }
        this.accState = executionContext.getRuntimeContext().getState(valueStateDescriptor);
        this.inputRowSerializer = InternalSerializers.create(this.inputType);
        this.resultRow = new JoinedRowData();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public List<RowData> addInput(RowData rowData, @Nullable List<RowData> list, RowData rowData2, Collector<RowData> collector) throws Exception {
        List<RowData> list2 = list;
        if (list == null) {
            list2 = new ArrayList();
        }
        list2.add((RowData) this.inputRowSerializer.copy(rowData2));
        return list2;
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void finishBundle(Map<RowData, List<RowData>> map, Collector<RowData> collector) throws Exception {
        for (Map.Entry<RowData, List<RowData>> entry : map.entrySet()) {
            RowData key = entry.getKey();
            List<RowData> value = entry.getValue();
            boolean z = false;
            this.ctx.setCurrentKey(key);
            RowData rowData = (RowData) this.accState.value();
            if (rowData == null) {
                Iterator<RowData> it = value.iterator();
                while (it.hasNext() && RowDataUtil.isRetractMsg(it.next())) {
                    it.remove();
                }
                if (value.isEmpty()) {
                    return;
                }
                rowData = this.function.createAccumulators();
                z = true;
            }
            this.function.setAccumulators(rowData);
            RowData value2 = this.function.getValue();
            for (RowData rowData2 : value) {
                if (RowDataUtil.isAccumulateMsg(rowData2)) {
                    this.function.accumulate(rowData2);
                } else {
                    this.function.retract(rowData2);
                }
            }
            RowData value3 = this.function.getValue();
            RowData accumulators = this.function.getAccumulators();
            if (this.recordCounter.recordCountIsZero(accumulators)) {
                if (!z) {
                    this.resultRow.replace(key, value2).setRowKind(RowKind.DELETE);
                    collector.collect(this.resultRow);
                }
                this.accState.clear();
                this.function.cleanup();
            } else {
                this.accState.update(accumulators);
                if (z) {
                    this.resultRow.replace(key, value3).setRowKind(RowKind.INSERT);
                    collector.collect(this.resultRow);
                } else if (this.stateRetentionTime > 0 || !this.equaliser.equals(value2, value3)) {
                    if (this.generateUpdateBefore) {
                        this.resultRow.replace(key, value2).setRowKind(RowKind.UPDATE_BEFORE);
                        collector.collect(this.resultRow);
                    }
                    this.resultRow.replace(key, value3).setRowKind(RowKind.UPDATE_AFTER);
                    collector.collect(this.resultRow);
                }
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void close() throws Exception {
        if (this.function != null) {
            this.function.close();
        }
    }
}
