package org.apache.flink.addons.redis;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.core.exception.RedisConnectorException;
import org.apache.flink.addons.redis.core.input.deserializer.RedisInternalDataDeserializer;
import org.apache.flink.addons.redis.core.input.format.RedisInputFormat;
import org.apache.flink.addons.redis.core.input.lookup.async.RedisAsyncLookupFunction;
import org.apache.flink.addons.redis.core.input.lookup.sync.RedisLookupFunction;
import org.apache.flink.addons.redis.core.output.serializer.RedisInternalDataSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.FullCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.table.connector.source.lookup.filter.ProbabilisticFilter;
import org.apache.flink.table.connector.source.lookup.filter.trigger.ProbFilterReloadTrigger;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/addons/redis/RedisDynamicTableSource.class */
public class RedisDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
    private final FlinkRedisConf conf;
    private final LookupConfig lookupConfig;
    private final Integer parallelism;

    @Nullable
    private final LookupCache lookupCache;

    @Nullable
    private final CacheReloadTrigger cacheReloadTrigger;

    @Nullable
    protected final ProbabilisticFilter probabilisticFilter;

    @Nullable
    protected final ProbFilterReloadTrigger probFilterReloadTrigger;
    private int[] projectedFields;

    public RedisDynamicTableSource(FlinkRedisConf flinkRedisConf, LookupConfig lookupConfig, Integer num, @Nullable LookupCache lookupCache, @Nullable CacheReloadTrigger cacheReloadTrigger, @Nullable ProbabilisticFilter probabilisticFilter, @Nullable ProbFilterReloadTrigger probFilterReloadTrigger) {
        this.conf = flinkRedisConf;
        this.lookupConfig = lookupConfig;
        this.parallelism = num;
        this.lookupCache = lookupCache;
        this.cacheReloadTrigger = cacheReloadTrigger;
        this.probabilisticFilter = probabilisticFilter;
        this.probFilterReloadTrigger = probFilterReloadTrigger;
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return InputFormatProvider.of(RedisInputFormat.forRowData(this.conf), Optional.ofNullable(this.parallelism));
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext lookupContext) {
        ScanTableSource.ScanRuntimeProvider scanRuntimeProvider = (this.cacheReloadTrigger == null && this.probFilterReloadTrigger == null) ? null : getScanRuntimeProvider(null);
        if (this.cacheReloadTrigger != null) {
            return FullCachingLookupProvider.of(scanRuntimeProvider, this.cacheReloadTrigger);
        }
        Preconditions.checkState(this.conf.getSchema().hasKey(), "Redis connector must have PRIMARY KEY to perform a lookup.");
        int[][] keys = lookupContext.getKeys();
        Preconditions.checkArgument(!Arrays.stream(keys).anyMatch(iArr -> {
            return iArr.length != 1;
        }), "Redis connector does not support nested keys.");
        List<String> lookupKeys = getLookupKeys(keys);
        return this.lookupConfig.isLookupAsync() ? AsyncLookupFunctionProvider.createImplementation(new RedisAsyncLookupFunction(this.conf, new RedisInternalDataSerializer(this.conf.getSchema()), new RedisInternalDataDeserializer(this.conf.getSchema()), lookupKeys), this.lookupCache, this.probabilisticFilter, this.probFilterReloadTrigger, scanRuntimeProvider) : LookupFunctionProvider.createImplementation(new RedisLookupFunction(this.conf, new RedisInternalDataSerializer(this.conf.getSchema()), new RedisInternalDataDeserializer(this.conf.getSchema()), lookupKeys), this.lookupCache, this.probabilisticFilter, this.probFilterReloadTrigger, scanRuntimeProvider);
    }

    public LookupConfig getLookupConfig() {
        return this.lookupConfig;
    }

    public DynamicTableSource copy() {
        try {
            return new RedisDynamicTableSource(this.conf.m282clone(), this.lookupConfig, this.parallelism, this.lookupCache, this.cacheReloadTrigger, this.probabilisticFilter, this.probFilterReloadTrigger);
        } catch (CloneNotSupportedException e) {
            throw new RedisConnectorException("Failed to clone FlinkRedisConf.", e);
        }
    }

    public String asSummaryString() {
        return "RedisDynamicTableSource[schema=" + Arrays.toString(this.conf.getSchema().getTableSchema().getFieldNames()) + (this.projectedFields == null ? "]" : ", projectFields=" + Arrays.toString(this.projectedFields) + "]");
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] iArr) {
        this.projectedFields = Arrays.stream(iArr).mapToInt(iArr2 -> {
            Preconditions.checkArgument(iArr2.length == 1, "Nested projection is not supported.");
            return iArr2[0];
        }).toArray();
        this.conf.setSchema(this.conf.getSchema().projectFields(this.projectedFields));
    }

    private List<String> getLookupKeys(int[][] iArr) {
        Set<String> keyColumnNames = this.conf.getSchema().getKeyColumnNames();
        Tuple1 of = Tuple1.of(0);
        Map map = (Map) this.conf.getSchema().getFields().keySet().stream().map(str -> {
            Integer num = (Integer) of.f0;
            of.f0 = Integer.valueOf(((Integer) of.f0).intValue() + 1);
            return Tuple2.of(num, str);
        }).filter(tuple2 -> {
            return keyColumnNames.contains(tuple2.f1);
        }).collect(Collectors.toMap(tuple22 -> {
            return (Integer) tuple22.f0;
        }, tuple23 -> {
            return (String) tuple23.f1;
        }, (str2, str3) -> {
            return str2;
        }));
        Set set = (Set) Arrays.stream(iArr).map(iArr2 -> {
            return Integer.valueOf(iArr2[0]);
        }).collect(Collectors.toSet());
        Preconditions.checkArgument(map.keySet().containsAll(set), "Redis lookup join keys should be subset of PRIMARY KEY fields of the source connector");
        Stream stream = set.stream();
        map.getClass();
        return (List) stream.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
    }
}
