package org.apache.flink.table.runtime.operators.join.temporal;

import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.class */
public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorWithStateRetention {
    private static final Logger LOG = LoggerFactory.getLogger(TemporalProcessTimeJoinOperator.class);
    private static final long serialVersionUID = -5182289624027523612L;
    private static final String LEFT_STATE_NAME = "backlogLeftRows";
    private static final String RIGHT_STATE_NAME = "latestRight";
    private static final String PARALLELISM_STATE_NAME = "previousParallelism";
    private final boolean isLeftOuterJoin;
    private final RowDataKeySelector leftKeySelector;
    private final InternalTypeInfo<RowData> leftType;
    private final InternalTypeInfo<RowData> rightType;
    private final GeneratedJoinCondition generatedJoinCondition;
    private boolean isBacklogPhase;
    private transient ListState<RowData> backlogLeftRowsState;
    private transient ValueState<RowData> rightState;
    private transient ListState<Integer> previousParallelismState;
    private transient JoinCondition joinCondition;
    private transient JoinedRowData outRow;
    private transient GenericRowData rightNullRow;
    private transient TimestampedCollector<RowData> collector;

    public TemporalProcessTimeJoinOperator(RowDataKeySelector rowDataKeySelector, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, long j, long j2, boolean z, boolean z2) {
        super(j, j2);
        this.leftKeySelector = rowDataKeySelector;
        this.leftType = internalTypeInfo;
        this.rightType = internalTypeInfo2;
        this.generatedJoinCondition = generatedJoinCondition;
        this.isLeftOuterJoin = z;
        this.isBacklogPhase = z2;
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void open() throws Exception {
        super.open();
        this.joinCondition = this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(this.joinCondition, getRuntimeContext());
        FunctionUtils.openFunction(this.joinCondition, new Configuration());
        this.rightState = getRuntimeContext().getState(new ValueStateDescriptor(RIGHT_STATE_NAME, this.rightType));
        this.collector = new TimestampedCollector<>(this.output);
        this.outRow = new JoinedRowData();
        this.rightNullRow = new GenericRowData(this.rightType.toRowSize());
        super.processWatermark2(Watermark.MAX_WATERMARK);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.previousParallelismState = getOperatorStateBackend().getUnionListState(new ListStateDescriptor(PARALLELISM_STATE_NAME, Types.INT));
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int intValue = ((Iterable) this.previousParallelismState.get()).iterator().hasNext() ? ((Integer) ((Iterable) this.previousParallelismState.get()).iterator().next()).intValue() : numberOfParallelSubtasks;
        this.backlogLeftRowsState = getOperatorStateBackend().getListState(new ListStateDescriptor(LEFT_STATE_NAME, this.leftType));
        if (stateInitializationContext.isRestored() && ((Iterable) this.backlogLeftRowsState.get()).iterator().hasNext() && intValue != numberOfParallelSubtasks) {
            throw new IllegalStateException(String.format("Processing Time Temporal Join operator was restored after rescaling in the backlog phase with non-empty left side rows buffer. Previous parallelism was %d, but current parallelism is %d. Unfortunately, rescaling makes left side row buffer state inconsistent, which could lead to incorrect join results. Please, return parallelism back to %d in order to recover properly. Note, that you can still rescale after the end of the backlog phase.", Integer.valueOf(intValue), Integer.valueOf(numberOfParallelSubtasks), Integer.valueOf(numberOfParallelSubtasks)));
        }
        this.previousParallelismState.add(Integer.valueOf(numberOfParallelSubtasks));
    }

    public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
        super.processRecordAttributes2(recordAttributes);
        if (!this.isBacklogPhase || recordAttributes.isBacklog()) {
            return;
        }
        LOG.info("PTTJ operator received RecordAttributes{isBacklog=false}, denoting the end of the backlog phase. Join is unblocked.");
        LOG.info("PTTJ operator processed {} left side backlog records. Left side state was cleared. Realtime phase begins.", Long.valueOf(flushBacklogLeftBuffer()));
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        RowData rowData2 = (RowData) this.rightState.value();
        if (this.isBacklogPhase) {
            this.backlogLeftRowsState.add(rowData);
            return;
        }
        if (rowData2 == null) {
            if (this.isLeftOuterJoin) {
                collectJoinedRow(rowData, this.rightNullRow);
            }
        } else {
            if (this.joinCondition.apply(rowData, rowData2)) {
                collectJoinedRow(rowData, rowData2);
            } else if (this.isLeftOuterJoin) {
                collectJoinedRow(rowData, this.rightNullRow);
            }
            registerProcessingCleanupTimer();
        }
    }

    private void collectJoinedRow(RowData rowData, RowData rowData2) {
        this.outRow.setRowKind(rowData.getRowKind());
        this.outRow.replace(rowData, rowData2);
        this.collector.collect(this.outRow);
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        if (RowDataUtil.isAccumulateMsg((RowData) streamRecord.getValue())) {
            this.rightState.update(streamRecord.getValue());
            registerProcessingCleanupTimer();
        } else {
            this.rightState.clear();
            cleanupLastTimer();
        }
    }

    public void finish() throws Exception {
        if (this.isBacklogPhase && ((Iterable) this.backlogLeftRowsState.get()).iterator().hasNext()) {
            LOG.warn("Flushed {} left side rows which were buffered during backlog phase before closing the operator. Couldn't wait for RecordAttributes{isBacklog=false}.", Long.valueOf(flushBacklogLeftBuffer()));
        }
    }

    private long flushBacklogLeftBuffer() throws Exception {
        this.isBacklogPhase = false;
        StreamRecord<RowData> streamRecord = new StreamRecord<>((Object) null);
        long j = 0;
        for (RowData rowData : (Iterable) this.backlogLeftRowsState.get()) {
            streamRecord.replace(rowData);
            setCurrentKey((RowData) this.leftKeySelector.getKey(rowData));
            processElement1(streamRecord);
            j++;
        }
        this.backlogLeftRowsState.clear();
        return j;
    }

    public void close() throws Exception {
        FunctionUtils.closeFunction(this.joinCondition);
        super.close();
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void cleanupState(long j) {
        this.rightState.clear();
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
    }
}
