package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.class */
public abstract class AbstractSinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommT> implements OneInputStreamOperator<InputT, CommT>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final AbstractSinkWriterOperator<InputT, CommT>.Context<InputT> context;
    private Long currentWatermark;
    protected SinkWriter<InputT, CommT, ?> sinkWriter;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator$Context.class */
    private class Context<IN> implements SinkWriter.Context {
        private StreamRecord<IN> element;

        private Context() {
        }

        @Override // org.apache.flink.api.connector.sink.SinkWriter.Context
        public long currentWatermark() {
            return AbstractSinkWriterOperator.this.currentWatermark.longValue();
        }

        @Override // org.apache.flink.api.connector.sink.SinkWriter.Context
        public Long timestamp() {
            if (this.element.hasTimestamp()) {
                return Long.valueOf(this.element.getTimestamp());
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator$InitContextImpl.class */
    private static class InitContextImpl implements Sink.InitContext {
        private final int subtaskIdx;
        private final ProcessingTimeService processingTimeService;
        private final MetricGroup metricGroup;

        public InitContextImpl(int i, ProcessingTimeService processingTimeService, MetricGroup metricGroup) {
            this.subtaskIdx = i;
            this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.metricGroup = (MetricGroup) Preconditions.checkNotNull(metricGroup);
        }

        @Override // org.apache.flink.api.connector.sink.Sink.InitContext
        public Sink.ProcessingTimeService getProcessingTimeService() {
            return new ProcessingTimerServiceImpl(this.processingTimeService);
        }

        @Override // org.apache.flink.api.connector.sink.Sink.InitContext
        public int getSubtaskId() {
            return this.subtaskIdx;
        }

        @Override // org.apache.flink.api.connector.sink.Sink.InitContext
        public MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator$ProcessingTimerServiceImpl.class */
    private static class ProcessingTimerServiceImpl implements Sink.ProcessingTimeService {
        private final ProcessingTimeService processingTimeService;

        public ProcessingTimerServiceImpl(ProcessingTimeService processingTimeService) {
            this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        }

        @Override // org.apache.flink.api.connector.sink.Sink.ProcessingTimeService
        public long getCurrentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        @Override // org.apache.flink.api.connector.sink.Sink.ProcessingTimeService
        public void registerProcessingTimer(long j, Sink.ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            Preconditions.checkNotNull(processingTimeCallback);
            ProcessingTimeService processingTimeService = this.processingTimeService;
            processingTimeCallback.getClass();
            processingTimeService.registerTimer(j, processingTimeCallback::onProcessingTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractSinkWriterOperator(ProcessingTimeService processingTimeService) {
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.context = new Context<>();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.currentWatermark = Long.MIN_VALUE;
        this.sinkWriter = createWriter();
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<InputT> streamRecord) throws Exception {
        ((Context) this.context).element = streamRecord;
        this.sinkWriter.write(streamRecord.getValue(), this.context);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        sendCommittables(this.sinkWriter.prepareCommit(false));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = Long.valueOf(watermark.getTimestamp());
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        sendCommittables(this.sinkWriter.prepareCommit(true));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.sinkWriter.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Sink.InitContext createInitContext() {
        return new InitContextImpl(getRuntimeContext().getIndexOfThisSubtask(), this.processingTimeService, getMetricGroup());
    }

    abstract SinkWriter<InputT, CommT, ?> createWriter() throws Exception;

    private void sendCommittables(List<CommT> list) {
        Iterator<CommT> it = list.iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(it.next()));
        }
    }
}
