package org.apache.flink.addons.redis.core.output.datatype.list;

import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.command.BatchCommands;
import io.lettuce.core.dynamic.batch.CommandBatching;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import org.apache.flink.addons.redis.core.RedisSchema;
import org.apache.flink.addons.redis.core.exception.RedisConnectorException;
import org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter;
import org.apache.flink.addons.redis.core.output.datatype.RedisWriteCommandPayload;
import org.apache.flink.addons.redis.core.output.serializer.RedisDataSerializer;
import org.apache.flink.addons.redis.core.ttl.TTLStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;

/* loaded from: input_file:org/apache/flink/addons/redis/core/output/datatype/list/RedisListWriter.class */
public abstract class RedisListWriter<T> extends RedisDataTypeWriter<T, String[]> {
    /* JADX INFO: Access modifiers changed from: protected */
    public RedisListWriter(RedisSchema redisSchema, RedisDataSerializer<T> redisDataSerializer) {
        super(redisSchema, redisDataSerializer);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public TypeInformation<String[]> getPayloadValueTypeInfo() {
        return Types.OBJECT_ARRAY(Types.STRING);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public long estimatePayloadValueSize(String[] strArr) {
        return Arrays.stream(strArr).mapToLong((v0) -> {
            return v0.length();
        }).sum();
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public void write(RedisClusterCommands<String, String> redisClusterCommands, RedisWriteCommandPayload<String[]> redisWriteCommandPayload, TTLStrategy tTLStrategy) {
        redisClusterCommands.rpush(redisWriteCommandPayload.getRedisKey(), redisWriteCommandPayload.getValue());
        tTLStrategy.setKeyTTL(redisWriteCommandPayload.getRedisKey(), redisClusterCommands);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public CompletionStage<?> writeAsync(RedisClusterAsyncCommands<String, String> redisClusterAsyncCommands, RedisWriteCommandPayload<String[]> redisWriteCommandPayload, TTLStrategy tTLStrategy) {
        return redisClusterAsyncCommands.rpush(redisWriteCommandPayload.getRedisKey(), redisWriteCommandPayload.getValue()).thenCompose(l -> {
            return tTLStrategy.setKeyTTLAsync(redisWriteCommandPayload.getRedisKey(), redisClusterAsyncCommands);
        });
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public void writeToBatch(BatchCommands batchCommands, RedisWriteCommandPayload<String[]> redisWriteCommandPayload, TTLStrategy tTLStrategy, CommandBatching commandBatching) {
        batchCommands.rpush(redisWriteCommandPayload.getRedisKey(), redisWriteCommandPayload.getValue(), tTLStrategy.transformWriteBatching(commandBatching));
        tTLStrategy.setKeyTTLBatch(redisWriteCommandPayload.getRedisKey(), batchCommands, commandBatching);
    }

    public static <T> RedisListWriter<T> forSchemaSyntax(RedisSchema redisSchema, RedisDataSerializer<T> redisDataSerializer) {
        switch (redisSchema.getRedisSchemaSyntax()) {
            case FIELDS:
                return new FieldsToListWriter(redisSchema, redisDataSerializer);
            case ARRAY:
                return new ArrayToListWriter(redisSchema, redisDataSerializer);
            default:
                throw new RedisConnectorException(String.format("Unsupported Redis schema syntax '%s' for LIST data type.", redisSchema.getRedisSchemaSyntax().getTitle()));
        }
    }
}
