package org.apache.flink.streaming.api.functions.sink.async;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/async/AsyncSinkFunction.class */
public abstract class AsyncSinkFunction<InputT, RequestEntryT> extends YieldingStatefulSinkFunction<InputT> {
    private static final long serialVersionUID = 1;
    private static final int INFLIGHT_MESSAGES_LIMIT_INCREASE_RATE = 10;
    private static final double INFLIGHT_MESSAGES_LIMIT_DECREASE_FACTOR = 0.5d;
    private static final String STATE_NAME = "_async_sink_function_state_";
    private transient AIMDRateLimitingStrategy rateLimitingStrategy;
    private final ElementConverter<InputT, RequestEntryT> elementConverter;
    private final TypeSerializer<RequestEntryT> typeSerializer;
    protected final int maxBatchSize;
    protected final int maxInFlightRequests;
    protected final int maxBufferedRequests;
    protected final int maxRetryCount;
    protected final long maxBatchSizeInBytes;
    protected final long maxTimeInBufferMS;
    protected final long maxRecordSizeInBytes;
    protected final boolean snapshotsEnabled;
    private int inFlightRequestsCount;
    private int inFlightMessages;
    private double bufferedRequestEntriesTotalSizeInBytes;
    private transient Consumer<Exception> fatalExceptionCons;
    private transient ListState<RequestEntryWrapper<RequestEntryT>> bufferedRequestsState;
    private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries = new ArrayDeque();
    private boolean existsActiveTimerCallback = false;

    protected abstract void submitRequestEntries(List<RequestEntryWrapper<RequestEntryT>> list, Consumer<List<RequestEntryWrapper<RequestEntryT>>> consumer);

    protected abstract long getSizeInBytes(RequestEntryT requestentryt);

    public AsyncSinkFunction(ElementConverter<InputT, RequestEntryT> elementConverter, TypeSerializer<RequestEntryT> typeSerializer, int i, int i2, int i3, long j, long j2, long j3, int i4, boolean z) {
        Preconditions.checkNotNull(elementConverter);
        Preconditions.checkArgument((z && typeSerializer == null) ? false : true);
        Preconditions.checkArgument(i > 0);
        Preconditions.checkArgument(i3 > 0);
        Preconditions.checkArgument(i2 > 0);
        Preconditions.checkArgument(j > 0);
        Preconditions.checkArgument(j2 > 0);
        Preconditions.checkArgument(j3 > 0);
        Preconditions.checkArgument(i4 >= 0);
        Preconditions.checkArgument(i3 >= i, "The maximum number of requests that may be buffered should be greater than the maximum number of requests per batch.");
        Preconditions.checkArgument(j >= j3, "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
        this.elementConverter = elementConverter;
        this.typeSerializer = typeSerializer;
        this.maxBatchSize = i;
        this.maxInFlightRequests = i2;
        this.maxBufferedRequests = i3;
        this.maxBatchSizeInBytes = j;
        this.maxTimeInBufferMS = j2;
        this.maxRecordSizeInBytes = j3;
        this.inFlightRequestsCount = 0;
        this.bufferedRequestEntriesTotalSizeInBytes = 0.0d;
        this.snapshotsEnabled = z;
        this.maxRetryCount = i4;
        this.inFlightMessages = 0;
    }

    public void open(Configuration configuration) throws Exception {
        this.rateLimitingStrategy = new AIMDRateLimitingStrategy(INFLIGHT_MESSAGES_LIMIT_INCREASE_RATE, INFLIGHT_MESSAGES_LIMIT_DECREASE_FACTOR, this.maxBatchSize * this.maxInFlightRequests, this.maxBatchSize * this.maxInFlightRequests);
        this.fatalExceptionCons = exc -> {
            this.mailboxExecutor.execute(() -> {
                throw exc;
            }, "A fatal exception occurred in the sink that cannot be recovered from or should not be retried.");
        };
    }

    private void registerCallback() {
        this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.maxTimeInBufferMS, j -> {
            this.existsActiveTimerCallback = false;
            while (!this.bufferedRequestEntries.isEmpty()) {
                flush();
            }
        });
        this.existsActiveTimerCallback = true;
    }

    @Override // org.apache.flink.streaming.api.functions.sink.SinkFunction
    public void invoke(InputT inputt, SinkFunction.Context context) throws Exception {
        while (this.bufferedRequestEntries.size() >= this.maxBufferedRequests) {
            flush();
        }
        addEntryToBuffer((AsyncSinkFunction<InputT, RequestEntryT>) this.elementConverter.apply(inputt, context), false);
        nonBlockingFlush();
    }

    private void nonBlockingFlush() throws InterruptedException {
        while (!isInFlightRequestOrMessageLimitExceeded()) {
            if (this.bufferedRequestEntries.size() < getNextBatchSizeLimit() && this.bufferedRequestEntriesTotalSizeInBytes < this.maxBatchSizeInBytes) {
                return;
            } else {
                flush();
            }
        }
    }

    private boolean isInFlightRequestOrMessageLimitExceeded() {
        return this.inFlightRequestsCount >= this.maxInFlightRequests || this.inFlightMessages >= this.rateLimitingStrategy.getRateLimit();
    }

    protected void flush() throws InterruptedException {
        while (isInFlightRequestOrMessageLimitExceeded()) {
            this.mailboxExecutor.yield();
        }
        List<RequestEntryWrapper<RequestEntryT>> createNextAvailableBatch = createNextAvailableBatch();
        int size = createNextAvailableBatch.size();
        if (createNextAvailableBatch.size() == 0) {
            return;
        }
        Consumer<List<RequestEntryWrapper<RequestEntryT>>> consumer = list -> {
            this.mailboxExecutor.execute(() -> {
                completeRequest(list, size);
            }, "Mark in-flight request as completed and requeue %d request entries", new Object[]{Integer.valueOf(list.size())});
        };
        this.inFlightRequestsCount++;
        this.inFlightMessages += size;
        submitRequestEntries(createNextAvailableBatch, consumer);
    }

    private List<RequestEntryWrapper<RequestEntryT>> createNextAvailableBatch() {
        int min = Math.min(getNextBatchSizeLimit(), this.bufferedRequestEntries.size());
        ArrayList arrayList = new ArrayList(min);
        int i = 0;
        for (int i2 = 0; i2 < min; i2++) {
            long size = this.bufferedRequestEntries.peek().getSize();
            if (i + size > this.maxBatchSizeInBytes) {
                break;
            }
            arrayList.add(this.bufferedRequestEntries.remove());
            this.bufferedRequestEntriesTotalSizeInBytes -= size;
            i = (int) (i + size);
        }
        return arrayList;
    }

    private void completeRequest(List<RequestEntryWrapper<RequestEntryT>> list, int i) throws InterruptedException {
        this.inFlightRequestsCount--;
        this.inFlightMessages -= i;
        updateInFlightMessagesLimit(list.size() == 0);
        ListIterator<RequestEntryWrapper<RequestEntryT>> listIterator = list.listIterator(list.size());
        while (listIterator.hasPrevious()) {
            RequestEntryWrapper<RequestEntryT> previous = listIterator.previous();
            previous.incrementRetryNum();
            if (previous.getRetryNum() > this.maxRetryCount) {
                getFatalExceptionCons().accept(new RuntimeException("Max retries exceeded for: " + previous.getRequestEntry()));
            }
            addEntryToBuffer((RequestEntryWrapper) previous, true);
        }
        nonBlockingFlush();
    }

    private void updateInFlightMessagesLimit(boolean z) {
        if (z) {
            this.rateLimitingStrategy.scaleUp();
        } else {
            this.rateLimitingStrategy.scaleDown();
        }
    }

    private void addEntryToBuffer(RequestEntryT requestentryt, boolean z) {
        RequestEntryWrapper<RequestEntryT> requestEntryWrapper = new RequestEntryWrapper<>(requestentryt, getSizeInBytes(requestentryt));
        if (requestEntryWrapper.getSize() > this.maxRecordSizeInBytes) {
            throw new IllegalArgumentException(String.format("The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].", Long.valueOf(requestEntryWrapper.getSize()), Long.valueOf(this.maxRecordSizeInBytes)));
        }
        addEntryToBuffer((RequestEntryWrapper) requestEntryWrapper, z);
    }

    private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> requestEntryWrapper, boolean z) {
        if (this.bufferedRequestEntries.isEmpty() && !this.existsActiveTimerCallback) {
            registerCallback();
        }
        if (z) {
            this.bufferedRequestEntries.addFirst(requestEntryWrapper);
        } else {
            this.bufferedRequestEntries.add(requestEntryWrapper);
        }
        this.bufferedRequestEntriesTotalSizeInBytes += requestEntryWrapper.getSize();
    }

    @Override // org.apache.flink.streaming.api.functions.sink.async.StatefulSinkFunction
    public void prepareCommit(boolean z) throws InterruptedException {
        flush(z);
    }

    public void flush(boolean z) throws InterruptedException {
        while (true) {
            if (this.inFlightRequestsCount <= 0 && (this.bufferedRequestEntries.size() <= 0 || !z)) {
                return;
            }
            yieldIfThereExistsInFlightRequests();
            if (z) {
                flush();
            }
        }
    }

    private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
        if (this.inFlightRequestsCount > 0) {
            this.mailboxExecutor.yield();
        }
    }

    private void initializeState(Iterable<RequestEntryWrapper<RequestEntryT>> iterable) {
        for (RequestEntryWrapper<RequestEntryT> requestEntryWrapper : iterable) {
            if (requestEntryWrapper.getSize() > this.maxRecordSizeInBytes) {
                throw new IllegalStateException(String.format("State contains record of size %d which exceeds sink maximum record size %d.", Long.valueOf(requestEntryWrapper.getSize()), Long.valueOf(this.maxRecordSizeInBytes)));
            }
            this.bufferedRequestEntries.add(requestEntryWrapper);
            this.bufferedRequestEntriesTotalSizeInBytes += requestEntryWrapper.getSize();
        }
    }

    private int getNextBatchSizeLimit() {
        return Math.min(this.maxBatchSize, this.rateLimitingStrategy.getRateLimit());
    }

    protected Consumer<Exception> getFatalExceptionCons() {
        return this.fatalExceptionCons;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.snapshotsEnabled) {
            this.bufferedRequestsState.update(new ArrayList(this.bufferedRequestEntries));
        }
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.snapshotsEnabled) {
            this.bufferedRequestsState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(STATE_NAME, new RequestEntryWrapperSerializer(this.typeSerializer)));
            initializeState((Iterable) this.bufferedRequestsState.get());
        }
    }
}
