package org.apache.flink.addons.redis.core.output.async;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.core.manager.RedisManager;
import org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter;
import org.apache.flink.addons.redis.core.output.datatype.RedisWriteCommandPayload;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RetryableAsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.RetryableRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/addons/redis/core/output/async/RedisAsyncSinkWriter.class */
public class RedisAsyncSinkWriter<T, U> extends RetryableAsyncSinkWriter<T, RedisWriteCommandPayload<U>> {
    private static final Logger LOG = LoggerFactory.getLogger(RedisAsyncSinkWriter.class);
    private final RedisDataTypeWriter<T, U> dataTypeWriter;
    private final RedisManager redisManager;

    public RedisAsyncSinkWriter(FlinkRedisConf flinkRedisConf, RedisDataTypeWriter<T, U> redisDataTypeWriter, ElementConverter<T, RetryableRequest<RedisWriteCommandPayload<U>>> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, int i4, Collection<BufferedRequestState<RetryableRequest<RedisWriteCommandPayload<U>>>> collection) {
        super(elementConverter, initContext, i, i2, i3, j, j2, j3, i4, collection);
        this.dataTypeWriter = redisDataTypeWriter;
        this.redisManager = RedisManager.forDeployMode(flinkRedisConf);
        this.redisManager.disableAsyncAutoFlush();
    }

    protected void submitRequestEntries(List<RetryableRequest<RedisWriteCommandPayload<U>>> list, Consumer<List<RetryableRequest<RedisWriteCommandPayload<U>>>> consumer) {
        ArrayList arrayList = new ArrayList();
        CompletableFuture.allOf((CompletableFuture[]) ((List) list.stream().filter(retryableRequest -> {
            return retryableRequest.getRequest() != null;
        }).map(retryableRequest2 -> {
            return this.redisManager.asyncWriteRequest((RedisWriteCommandPayload) retryableRequest2.getRequest(), this.dataTypeWriter).exceptionally(th -> {
                LOG.error("Error during redis async write.", th);
                arrayList.add(retryableRequest2);
                return null;
            });
        }).collect(Collectors.toList())).toArray(new CompletableFuture[0])).thenRun(() -> {
            consumer.accept(arrayList);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getRequestSizeInBytes(RedisWriteCommandPayload<U> redisWriteCommandPayload) {
        if (redisWriteCommandPayload == null) {
            return 0L;
        }
        return redisWriteCommandPayload.getRedisKey().length() + (redisWriteCommandPayload.getValue() == null ? 0L : this.dataTypeWriter.estimatePayloadValueSize(redisWriteCommandPayload.getValue()));
    }

    protected void flush() throws InterruptedException {
        super.flush();
        this.redisManager.flushAsyncBuffer();
    }

    public void close() {
        if (this.redisManager != null) {
            this.redisManager.close();
        }
    }
}
