package org.apache.flink.streaming.api.operators.python;

import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/OneInputPythonFunctionOperator.class */
public abstract class OneInputPythonFunctionOperator<IN, OUT, UDFIN, UDFOUT> extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
    private static final long serialVersionUID = 1;
    protected static final String DATA_STREAM_STATELESS_FUNCTION_URN = "flink:transform:datastream_stateless_function:v1";
    protected final Map<String, String> jobOptions;
    private final TypeInformation<UDFIN> runnerInputTypeInfo;
    final TypeInformation<UDFOUT> runnerOutputTypeInfo;
    private final DataStreamPythonFunctionInfo pythonFunctionInfo;
    transient TypeSerializer<UDFIN> runnerInputTypeSerializer;
    transient TypeSerializer<UDFOUT> runnerOutputTypeSerializer;
    protected transient ByteArrayInputStreamWithPos bais;
    protected transient DataInputViewStreamWrapper baisWrapper;
    protected transient ByteArrayOutputStreamWithPos baos;
    protected transient DataOutputViewStreamWrapper baosWrapper;
    protected transient TimestampedCollector<OUT> collector;
    transient LinkedList<Long> bufferedTimestamp;

    public OneInputPythonFunctionOperator(Configuration configuration, TypeInformation<UDFIN> typeInformation, TypeInformation<UDFOUT> typeInformation2, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo) {
        super(configuration);
        this.jobOptions = configuration.toMap();
        this.runnerInputTypeInfo = typeInformation;
        this.runnerOutputTypeInfo = typeInformation2;
        this.pythonFunctionInfo = dataStreamPythonFunctionInfo;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.bufferedTimestamp = new LinkedList<>();
        this.runnerInputTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerInputTypeInfo);
        this.runnerOutputTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerOutputTypeInfo);
        this.collector = new TimestampedCollector<>(this.output);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamDataStreamPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), this.runnerInputTypeInfo, this.runnerOutputTypeInfo, getFunctionUrn(), PythonOperatorUtils.getUserDefinedDataStreamFunctionProto(this.pythonFunctionInfo, getRuntimeContext(), getInternalParameters()), getCoderUrn(), this.jobOptions, getFlinkMetricContainer(), null, null, getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.pythonFunctionInfo.getPythonFunction().getPythonEnv();
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.bufferedTimestamp.offer(Long.valueOf(streamRecord.getTimestamp()));
        this.runnerInputTypeSerializer.serialize(streamRecord.getValue(), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    public String getFunctionUrn() {
        return DATA_STREAM_STATELESS_FUNCTION_URN;
    }

    public Map<String, String> getInternalParameters() {
        return Collections.EMPTY_MAP;
    }

    public abstract String getCoderUrn();
}
