package org.apache.flink.addons.redis.core.output;

import java.io.IOException;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.core.manager.RedisManager;
import org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter;
import org.apache.flink.addons.redis.core.output.datatype.RedisWriteCommandPayload;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

/* loaded from: input_file:org/apache/flink/addons/redis/core/output/RedisBatchWriter.class */
public class RedisBatchWriter<T, U> implements SinkWriter<T> {
    private final RedisDataTypeWriter<T, U> dataTypeWriter;
    private final RedisWriteElementConverter<T, U> elementConverter;
    private final RedisManager redisManager;
    private final ProcessingTimeService timeService;
    private final boolean flushOnCheckpoint;
    private final int maxBatchSize;
    private final long bufferFlushIntervalMillis;
    private final boolean periodicFlushingEnabled;
    private final boolean batchSizeFlushEnabled;
    private long numPendingRequests = 0;
    private boolean existsActiveTimerCallback = false;

    public RedisBatchWriter(FlinkRedisConf flinkRedisConf, RedisDataTypeWriter<T, U> redisDataTypeWriter, Sink.InitContext initContext) {
        this.dataTypeWriter = redisDataTypeWriter;
        this.redisManager = RedisManager.forDeployMode(flinkRedisConf);
        this.elementConverter = new RedisWriteElementConverter<>(redisDataTypeWriter, flinkRedisConf.getWriteOptions().isIgnoreRetractions(), true);
        this.timeService = initContext.getProcessingTimeService();
        this.flushOnCheckpoint = flinkRedisConf.getWriteOptions().isFlushOnCheckpoint();
        this.maxBatchSize = flinkRedisConf.getWriteOptions().getNumRecordsInBatch();
        this.bufferFlushIntervalMillis = flinkRedisConf.getWriteOptions().getBufferFlushIntervalMillis();
        this.periodicFlushingEnabled = this.bufferFlushIntervalMillis > 0;
        this.batchSizeFlushEnabled = this.maxBatchSize > 1;
        this.redisManager.initBatchCommands();
    }

    public void write(T t, SinkWriter.Context context) throws IOException, InterruptedException {
        if (this.periodicFlushingEnabled && !this.existsActiveTimerCallback) {
            registerCallback();
        }
        RedisWriteCommandPayload<U> convert = this.elementConverter.convert(t);
        if (convert != null) {
            this.numPendingRequests++;
            boolean z = this.batchSizeFlushEnabled && this.numPendingRequests == ((long) this.maxBatchSize);
            this.redisManager.putWriteRequestToBatch(convert, this.dataTypeWriter, z);
            if (z) {
                this.numPendingRequests = 0L;
            }
        }
    }

    public void flush(boolean z) {
        if (this.flushOnCheckpoint || z) {
            flush();
        }
    }

    public void close() {
        if (this.redisManager != null) {
            this.redisManager.close();
        }
    }

    private void flush() {
        if (this.numPendingRequests != 0) {
            this.redisManager.flushBatch();
            this.numPendingRequests = 0L;
        }
    }

    private void registerCallback() {
        this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.bufferFlushIntervalMillis, j -> {
            this.existsActiveTimerCallback = false;
            flush();
        });
        this.existsActiveTimerCallback = true;
    }
}
