package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.class */
class SinkWriterOperator<InputT, CommT> extends WriterOperator<InputT, CommittableMessage<CommT>, InputT> {

    @Nullable
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final List<CommT> legacyCommittables;
    private final boolean emitDownstream;
    private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC = new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);

    /* JADX INFO: Access modifiers changed from: package-private */
    public SinkWriterOperator(Sink<InputT> sink, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor) {
        super(sink, processingTimeService, mailboxExecutor);
        this.legacyCommittables = new ArrayList();
        this.emitDownstream = (sink instanceof SupportsCommitter) || (sink instanceof TwoPhaseCommittingSink);
        if (sink instanceof SupportsCommitter) {
            this.committableSerializer = ((SupportsCommitter) sink).getCommittableSerializer();
        } else if (sink instanceof TwoPhaseCommittingSink) {
            this.committableSerializer = ((TwoPhaseCommittingSink) sink).getCommittableSerializer();
        } else {
            this.committableSerializer = null;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.SetupableStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<CommittableMessage<CommT>>> output) {
        super.setup(streamTask, streamConfig, output);
        this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
        this.metrics.getIOMetricGroup().reuseBytesOutputMetricsForTask();
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.WriterOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        WriterInitContext createInitContext = createInitContext(stateInitializationContext.getRestoredCheckpointId());
        if (stateInitializationContext.isRestored() && this.committableSerializer != null) {
            SimpleVersionedListState simpleVersionedListState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new SinkV1WriterCommittableSerializer(this.committableSerializer));
            Iterable iterable = (Iterable) simpleVersionedListState.get();
            List<CommT> list = this.legacyCommittables;
            Objects.requireNonNull(list);
            iterable.forEach((v1) -> {
                r1.addAll(v1);
            });
            simpleVersionedListState.clear();
        }
        this.sinkWriter = this.writerStateHandler.createWriter(createInitContext, stateInitializationContext);
        if (this.emitDownstream) {
            this.endOfInputState = stateInitializationContext.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
            ArrayList newArrayList = Lists.newArrayList((Iterable) this.endOfInputState.get());
            this.endOfInput = (newArrayList.isEmpty() || newArrayList.contains(false)) ? false : true;
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.WriterOperator
    protected void writeElement(StreamRecord<InputT> streamRecord) throws Exception {
        this.sinkWriter.write(streamRecord.getValue(), this.context);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.WriterOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.endOfInput) {
            return;
        }
        this.sinkWriter.flush(false);
        emitCommittables(j);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.WriterOperator, org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        boolean z = this.endOfInput;
        super.endInput();
        if (z) {
            return;
        }
        emitCommittables(CommittableMessage.EOI);
    }

    private void emitCommittables(long j) throws IOException, InterruptedException {
        if (!this.emitDownstream) {
            if (this.sinkWriter instanceof CommittingSinkWriter) {
                this.sinkWriter.prepareCommit();
                return;
            }
            return;
        }
        Collection<CommT> prepareCommit = this.sinkWriter.prepareCommit();
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        int indexOfThisSubtask = runtimeContext.getTaskInfo().getIndexOfThisSubtask();
        int numberOfParallelSubtasks = runtimeContext.getTaskInfo().getNumberOfParallelSubtasks();
        if (!this.legacyCommittables.isEmpty()) {
            Preconditions.checkState(j > 1);
            emit(indexOfThisSubtask, numberOfParallelSubtasks, 1L, this.legacyCommittables);
            this.legacyCommittables.clear();
        }
        emit(indexOfThisSubtask, numberOfParallelSubtasks, j, prepareCommit);
    }

    private void emit(int i, int i2, long j, Collection<CommT> collection) {
        emit(new StreamRecord<>(new CommittableSummary(i, i2, j, collection.size(), collection.size(), 0)));
        Iterator<CommT> it = collection.iterator();
        while (it.hasNext()) {
            emit(new StreamRecord<>(new CommittableWithLineage(it.next(), j, i)));
        }
    }

    private void emit(StreamRecord<CommittableMessage<CommT>> streamRecord) {
        LOG.debug("Sending message to committer: {}", streamRecord);
        this.output.collect(streamRecord);
    }
}
