package org.apache.flink.addons.redis.core.input.lookup.sync;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.conf.RedisConnectorOptions;
import org.apache.flink.addons.redis.core.input.datatype.RedisDataTypeReader;
import org.apache.flink.addons.redis.core.input.deserializer.RedisDataDeserializer;
import org.apache.flink.addons.redis.core.input.lookup.RedisLookupHandler;
import org.apache.flink.addons.redis.core.manager.RedisManager;
import org.apache.flink.addons.redis.core.output.serializer.RedisDataSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.groups.LookupMetricGroup;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;

/* loaded from: input_file:org/apache/flink/addons/redis/core/input/lookup/sync/RedisLookupFunction.class */
public class RedisLookupFunction extends LookupFunction {
    private final FlinkRedisConf conf;
    private final RedisDataTypeReader<RowData> dataTypeReader;
    private final RedisLookupHandler<RowData> lookupHandler;
    private RedisManager redisManager;

    public RedisLookupFunction(FlinkRedisConf flinkRedisConf, RedisDataSerializer<RowData> redisDataSerializer, RedisDataDeserializer<RowData> redisDataDeserializer, List<String> list) {
        this.conf = flinkRedisConf;
        this.dataTypeReader = RedisDataTypeReader.forDataType(flinkRedisConf.getSchema(), redisDataDeserializer);
        this.lookupHandler = new RedisLookupHandler<>(flinkRedisConf, list, redisDataSerializer);
    }

    public Collection<RowData> lookup(RowData rowData) {
        return (Collection) this.lookupHandler.getLookupRedisKeys(this.redisManager, (GenericRowData) rowData).stream().flatMap(str -> {
            return StreamSupport.stream(this.redisManager.read(str, this.dataTypeReader).spliterator(), false);
        }).collect(Collectors.toList());
    }

    public void open(FunctionContext functionContext) {
        Configuration parameters = this.conf.getParameters();
        RedisConnectorOptions.PARAMETERS.forEach(configOption -> {
            if (parameters.contains(configOption)) {
                return;
            }
            String value = parameters.getValue(configOption);
            String jobParameter = functionContext.getJobParameter(configOption.key(), value);
            if (Objects.equals(jobParameter, value)) {
                return;
            }
            parameters.setString(configOption.key(), jobParameter);
        });
        parameters.set(RedisConnectorOptions.RETRY_COUNT, parameters.get(LookupOptions.MAX_RETRIES));
        this.redisManager = RedisManager.forDeployMode(this.conf);
        this.dataTypeReader.configure(this.conf.getParameters());
    }

    public void open(LookupMetricGroup lookupMetricGroup, Histogram histogram) {
        super.open(lookupMetricGroup, histogram);
        this.redisManager.setMetrics(this.failsCounter, this.lookupRequestTimeHist);
    }

    public void close() {
        this.redisManager.close();
    }

    public TypeInference getTypeInference(DataTypeFactory dataTypeFactory) {
        return TypeInference.newBuilder().outputTypeStrategy(TypeStrategies.explicit(this.conf.getSchema().buildDataType(RowData.class))).build();
    }
}
