package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StreamingGlobalCommitterOperator.class */
public final class StreamingGlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT> implements BoundedOneInput {
    private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
    private final List<GlobalCommT> recoveredGlobalCommittables;
    private boolean endOfInput;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingGlobalCommitterOperator(GlobalCommitter<CommT, GlobalCommT> globalCommitter, SimpleVersionedSerializer<GlobalCommT> simpleVersionedSerializer) {
        super(simpleVersionedSerializer);
        this.globalCommitter = (GlobalCommitter) Preconditions.checkNotNull(globalCommitter);
        this.recoveredGlobalCommittables = new ArrayList();
        this.endOfInput = false;
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator
    void recoveredCommittables(List<GlobalCommT> list) throws IOException {
        this.recoveredGlobalCommittables.addAll(this.globalCommitter.filterRecoveredCommittables((List) Preconditions.checkNotNull(list)));
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator
    List<GlobalCommT> prepareCommit(List<CommT> list) throws IOException {
        Preconditions.checkNotNull(list);
        ArrayList arrayList = new ArrayList(this.recoveredGlobalCommittables);
        this.recoveredGlobalCommittables.clear();
        if (!list.isEmpty()) {
            arrayList.add(this.globalCommitter.combine(list));
        }
        return arrayList;
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator
    List<GlobalCommT> commit(List<GlobalCommT> list) throws Exception {
        return this.globalCommitter.commit((List) Preconditions.checkNotNull(list));
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() {
        this.endOfInput = true;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.globalCommitter.close();
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        if (this.endOfInput) {
            this.globalCommitter.endOfInput();
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public /* bridge */ /* synthetic */ void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator, org.apache.flink.streaming.api.operators.Input
    public /* bridge */ /* synthetic */ void processElement(StreamRecord streamRecord) throws Exception {
        super.processElement(streamRecord);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public /* bridge */ /* synthetic */ void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
    }
}
