package org.apache.flink.addons.redis.core.output.function;

import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.conf.RedisConnectorOptions;
import org.apache.flink.addons.redis.core.manager.RedisManager;
import org.apache.flink.addons.redis.core.output.RedisWriteElementConverter;
import org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter;
import org.apache.flink.addons.redis.core.output.datatype.RedisWriteCommandPayload;
import org.apache.flink.addons.redis.core.output.serializer.RedisDataSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/* loaded from: input_file:org/apache/flink/addons/redis/core/output/function/RedisSinkFunction.class */
public abstract class RedisSinkFunction<T, U> extends RichSinkFunction<T> {
    private final FlinkRedisConf conf;
    private final RedisDataTypeWriter<U, ?> dataTypeWriter;
    private RedisWriteElementConverter<U, ?> elementConverter;
    private RedisManager redisManager;

    public RedisSinkFunction(FlinkRedisConf flinkRedisConf, RedisDataSerializer<U> redisDataSerializer) {
        this.conf = flinkRedisConf;
        this.dataTypeWriter = RedisDataTypeWriter.forDataType(flinkRedisConf, redisDataSerializer);
    }

    public void open(Configuration configuration) {
        configuration.addAll(this.conf.getParameters());
        this.dataTypeWriter.configure(configuration);
        this.conf.getParameters().addAll(configuration);
        this.redisManager = RedisManager.forDeployMode(this.conf);
        this.elementConverter = new RedisWriteElementConverter<>(this.dataTypeWriter, this.conf.getWriteOptions().isIgnoreRetractions() || ((Boolean) configuration.get(RedisConnectorOptions.IGNORE_RETRACTION)).booleanValue(), true);
    }

    protected abstract U toProcessedRecord(T t);

    public void invoke(T t, SinkFunction.Context context) {
        RedisWriteCommandPayload<?> convert = this.elementConverter.convert(toProcessedRecord(t));
        if (convert != null) {
            this.redisManager.writeRequest(convert, this.dataTypeWriter);
        }
    }

    public void close() {
        this.redisManager.close();
    }
}
