package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.class */
final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT> extends AbstractSinkWriterOperator<InputT, CommT> {
    private static final ListStateDescriptor<byte[]> WRITER_RAW_STATES_DESC = new ListStateDescriptor<>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private final Sink<InputT, CommT, WriterStateT, ?> sink;
    private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer;

    @Nullable
    private final String previousSinkStateName;

    @Nullable
    private ListState<WriterStateT> previousSinkState;
    private ListState<WriterStateT> writerState;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatefulSinkWriterOperator(@Nullable String str, ProcessingTimeService processingTimeService, Sink<InputT, CommT, WriterStateT, ?> sink, SimpleVersionedSerializer<WriterStateT> simpleVersionedSerializer) {
        super(processingTimeService);
        this.sink = sink;
        this.writerStateSimpleVersionedSerializer = simpleVersionedSerializer;
        this.previousSinkStateName = str;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.writerState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC), this.writerStateSimpleVersionedSerializer);
        if (this.previousSinkStateName != null) {
            this.previousSinkState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(this.previousSinkStateName, BytePrimitiveArraySerializer.INSTANCE)), this.writerStateSimpleVersionedSerializer);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        this.writerState.update(this.sinkWriter.snapshotState());
        if (this.previousSinkState != null) {
            this.previousSinkState.clear();
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator
    SinkWriter<InputT, CommT, WriterStateT> createWriter() throws Exception {
        ArrayList arrayList = new ArrayList(CollectionUtil.iterableToList((Iterable) this.writerState.get()));
        if (this.previousSinkStateName != null) {
            Preconditions.checkNotNull(this.previousSinkState);
            arrayList.addAll(CollectionUtil.iterableToList((Iterable) this.previousSinkState.get()));
        }
        return this.sink.createWriter(createInitContext(), arrayList);
    }
}
