package org.apache.flink.addons.redis.core.output.datatype.hash;

import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.command.BatchCommands;
import io.lettuce.core.dynamic.batch.CommandBatching;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.conf.RedisWriteOptions;
import org.apache.flink.addons.redis.core.RedisSchema;
import org.apache.flink.addons.redis.core.exception.RedisConnectorException;
import org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter;
import org.apache.flink.addons.redis.core.output.datatype.RedisWriteCommandPayload;
import org.apache.flink.addons.redis.core.output.serializer.RedisDataSerializer;
import org.apache.flink.addons.redis.core.ttl.TTLStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;

/* loaded from: input_file:org/apache/flink/addons/redis/core/output/datatype/hash/RedisHashWriter.class */
public abstract class RedisHashWriter<T> extends RedisDataTypeWriter<T, Object> {
    private final boolean isBatchWrite;

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisHashWriter(RedisSchema redisSchema, RedisDataSerializer<T> redisDataSerializer, boolean z) {
        super(redisSchema, redisDataSerializer);
        this.isBatchWrite = z;
    }

    protected abstract RedisWriteCommandPayload<String[]> buildArrayPayload(T t, RedisWriteCommandPayload<String[]> redisWriteCommandPayload);

    protected abstract RedisWriteCommandPayload<Map<String, String>> buildMapPayload(T t, RedisWriteCommandPayload<Map<String, String>> redisWriteCommandPayload);

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public TypeInformation<Object> getPayloadValueTypeInfo() {
        return this.isBatchWrite ? Types.OBJECT_ARRAY(Types.STRING) : Types.MAP(Types.STRING, Types.STRING);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public RedisWriteCommandPayload<Object> buildCommandPayload(T t, RedisWriteCommandPayload<Object> redisWriteCommandPayload) {
        return this.isBatchWrite ? buildArrayPayload(t, redisWriteCommandPayload) : buildMapPayload(t, redisWriteCommandPayload);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public long estimatePayloadValueSize(Object obj) {
        return this.isBatchWrite ? Arrays.stream((String[]) obj).mapToLong((v0) -> {
            return v0.length();
        }).sum() : ((Map) obj).entrySet().stream().mapToLong(entry -> {
            return ((String) entry.getKey()).length() + ((String) entry.getValue()).length();
        }).sum();
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public void write(RedisClusterCommands<String, String> redisClusterCommands, RedisWriteCommandPayload<Object> redisWriteCommandPayload, TTLStrategy tTLStrategy) {
        redisClusterCommands.hmset(redisWriteCommandPayload.getRedisKey(), (Map) redisWriteCommandPayload.getValue());
        tTLStrategy.setKeyTTL(redisWriteCommandPayload.getRedisKey(), redisClusterCommands);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public void writeToBatch(BatchCommands batchCommands, RedisWriteCommandPayload<Object> redisWriteCommandPayload, TTLStrategy tTLStrategy, CommandBatching commandBatching) {
        batchCommands.hmset(redisWriteCommandPayload.getRedisKey(), (String[]) redisWriteCommandPayload.getValue(), tTLStrategy.transformWriteBatching(commandBatching));
        tTLStrategy.setKeyTTLBatch(redisWriteCommandPayload.getRedisKey(), batchCommands, commandBatching);
    }

    @Override // org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter
    public CompletionStage<?> writeAsync(RedisClusterAsyncCommands<String, String> redisClusterAsyncCommands, RedisWriteCommandPayload<Object> redisWriteCommandPayload, TTLStrategy tTLStrategy) {
        return redisClusterAsyncCommands.hmset(redisWriteCommandPayload.getRedisKey(), (Map) redisWriteCommandPayload.getValue()).thenCompose(str -> {
            return tTLStrategy.setKeyTTLAsync(redisWriteCommandPayload.getRedisKey(), redisClusterAsyncCommands);
        });
    }

    public static <T> RedisHashWriter<T> forSchemaSyntax(FlinkRedisConf flinkRedisConf, RedisDataSerializer<T> redisDataSerializer) {
        RedisWriteOptions writeOptions = flinkRedisConf.getWriteOptions();
        boolean z = writeOptions.isBatchingEnabled() && !writeOptions.isAsync();
        switch (flinkRedisConf.getSchema().getRedisSchemaSyntax()) {
            case FIELDS:
                return new FieldsToHashWriter(flinkRedisConf.getSchema(), redisDataSerializer, z);
            case MAP:
                return new MapToHashWriter(flinkRedisConf.getSchema(), redisDataSerializer, z);
            default:
                throw new RedisConnectorException(String.format("Unsupported Redis schema syntax '%s' for HASH data type.", flinkRedisConf.getSchema().getRedisSchemaSyntax().getTitle()));
        }
    }
}
