package org.apache.flink.addons.redis.core.input.format;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.models.partitions.Partitions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.conf.RedisConnectorOptions;
import org.apache.flink.addons.redis.conf.RedisDeployMode;
import org.apache.flink.addons.redis.core.exception.RedisConnectorException;
import org.apache.flink.addons.redis.core.input.datatype.RedisDataTypeReader;
import org.apache.flink.addons.redis.core.input.deserializer.RedisDataDeserializer;
import org.apache.flink.addons.redis.core.input.deserializer.RedisInternalDataDeserializer;
import org.apache.flink.addons.redis.core.manager.ClusterRedisManager;
import org.apache.flink.addons.redis.core.manager.StandaloneRedisManager;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/addons/redis/core/input/format/RedisInputFormat.class */
public class RedisInputFormat<T> extends RichInputFormat<T, RedisTableSplit> implements ResultTypeQueryable<T>, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(RedisInputFormat.class);
    private final FlinkRedisConf conf;
    private final RedisDataTypeReader<T> dataTypeReader;
    private transient boolean reachEnd = false;
    private transient StandaloneRedisManager redisManager;
    private transient Iterator<String> keysIter;
    private transient int scanKeysCount;
    private transient Iterator<T> valueIter;

    public RedisInputFormat(FlinkRedisConf flinkRedisConf, RedisDataDeserializer<T> redisDataDeserializer) {
        this.conf = flinkRedisConf;
        this.dataTypeReader = RedisDataTypeReader.forDataType(flinkRedisConf.getSchema(), redisDataDeserializer);
    }

    /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
    public RedisTableSplit[] m299createInputSplits(int i) {
        switch (this.conf.getRedisMode()) {
            case STANDALONE:
                RedisTableSplit redisTableSplit = new RedisTableSplit(0);
                redisTableSplit.setMaster(this.conf.getHosts().get(0));
                LOG.debug("Created 1 split of Redis connector in standalone mode.");
                return new RedisTableSplit[]{redisTableSplit};
            case MASTER_REPLICA:
                RedisTableSplit redisTableSplit2 = new RedisTableSplit(0);
                RedisURI redisURI = this.conf.getHosts().get(0);
                ArrayList arrayList = new ArrayList(this.conf.getHosts().subList(1, this.conf.getHosts().size()));
                redisTableSplit2.setMaster(redisURI);
                redisTableSplit2.setReplicas(arrayList);
                LOG.debug("Created 1 split of Redis connector in master/replica mode.");
                return new RedisTableSplit[]{redisTableSplit2};
            case CLUSTER:
                return createClusterInputSplits();
            default:
                throw new IllegalStateException("Unknown Redis deploy mode.");
        }
    }

    public void open(RedisTableSplit redisTableSplit) {
        LOG.debug("Redis Connector. Open split = " + redisTableSplit.toString());
        this.reachEnd = false;
        this.valueIter = Collections.emptyIterator();
        this.redisManager = new StandaloneRedisManager(redisTableSplit.getAllHosts(), this.conf, this.conf.getRedisMode() == RedisDeployMode.MASTER_REPLICA || (this.conf.getRedisMode() == RedisDeployMode.CLUSTER && !redisTableSplit.getReplicas().isEmpty()));
        String str = this.conf.getSchema().hasNamespace() ? this.conf.getSchema().getNamespace() + this.conf.getSchema().getNamespaceDelimiter() + WebSocketServerHandshaker.SUB_PROTOCOL_WILDCARD : WebSocketServerHandshaker.SUB_PROTOCOL_WILDCARD;
        this.keysIter = this.redisManager.scanAll(str, this.scanKeysCount).iterator();
        if (this.keysIter.hasNext()) {
            return;
        }
        LOG.warn(String.format("Keys size is 0 by pattern '%s' on '%s:%d'", str, redisTableSplit.getMaster().getHost(), Integer.valueOf(redisTableSplit.getMaster().getPort())));
    }

    public InputSplitAssigner getInputSplitAssigner(RedisTableSplit[] redisTableSplitArr) {
        return new RedisSplitAssigner(redisTableSplitArr);
    }

    public boolean reachedEnd() {
        return this.reachEnd;
    }

    public T nextRecord(T t) {
        if (!this.keysIter.hasNext() && !this.valueIter.hasNext()) {
            this.reachEnd = true;
            return null;
        }
        if (this.valueIter.hasNext()) {
            return this.valueIter.next();
        }
        Iterable<T> read = this.redisManager.read(this.keysIter.next(), this.dataTypeReader);
        if (read == null || Iterables.isEmpty(read)) {
            return nextRecord(t);
        }
        this.valueIter = read.iterator();
        return this.valueIter.next();
    }

    public void close() {
        if (this.redisManager != null) {
            this.redisManager.close();
        }
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
        return baseStatistics;
    }

    public void configure(Configuration configuration) {
        configuration.addAll(this.conf.getParameters());
        this.scanKeysCount = ((Integer) configuration.get(RedisConnectorOptions.SCAN_KEYS_COUNT)).intValue();
        this.dataTypeReader.configure(configuration);
        this.conf.getParameters().addAll(configuration);
    }

    public TypeInformation<T> getProducedType() {
        return this.dataTypeReader.getDeserializer().getProducedType();
    }

    private RedisTableSplit[] createClusterInputSplits() {
        ClusterRedisManager clusterRedisManager = new ClusterRedisManager(this.conf);
        Throwable th = null;
        try {
            Partitions partitions = clusterRedisManager.getConnection().getPartitions();
            HashMap hashMap = new HashMap();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            partitions.forEach(redisClusterNode -> {
                switch (redisClusterNode.getRole()) {
                    case UPSTREAM:
                    case MASTER:
                        if (redisClusterNode.getSlots().isEmpty()) {
                            return;
                        }
                        RedisTableSplit redisTableSplit = (RedisTableSplit) hashMap.computeIfAbsent(redisClusterNode.getNodeId(), str -> {
                            return new RedisTableSplit(atomicInteger.getAndIncrement());
                        });
                        redisTableSplit.setMaster(redisClusterNode.getUri());
                        hashMap.put(redisClusterNode.getNodeId(), redisTableSplit);
                        return;
                    case REPLICA:
                        RedisTableSplit redisTableSplit2 = (RedisTableSplit) hashMap.computeIfAbsent(redisClusterNode.getSlaveOf(), str2 -> {
                            return new RedisTableSplit(atomicInteger.getAndIncrement());
                        });
                        redisTableSplit2.addSlave(redisClusterNode.getUri());
                        hashMap.put(redisClusterNode.getSlaveOf(), redisTableSplit2);
                        return;
                    default:
                        return;
                }
            });
            hashMap.values().forEach(redisTableSplit -> {
                updateHosts(redisTableSplit.getAllHosts());
            });
            if (hashMap.isEmpty()) {
                throw new RedisConnectorException("Count of splits is 0, can't retrieve information from Redis cluster");
            }
            LOG.debug(String.format("Created %d splits of Redis connector.", Integer.valueOf(hashMap.size())));
            RedisTableSplit[] redisTableSplitArr = (RedisTableSplit[]) hashMap.values().toArray(new RedisTableSplit[0]);
            if (clusterRedisManager != null) {
                if (0 != 0) {
                    try {
                        clusterRedisManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    clusterRedisManager.close();
                }
            }
            return redisTableSplitArr;
        } catch (Throwable th3) {
            if (clusterRedisManager != null) {
                if (0 != 0) {
                    try {
                        clusterRedisManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    clusterRedisManager.close();
                }
            }
            throw th3;
        }
    }

    private void updateHosts(Collection<RedisURI> collection) {
        collection.forEach(redisURI -> {
            Optional map = Optional.ofNullable(this.conf.getPassword()).map((v0) -> {
                return v0.toCharArray();
            });
            redisURI.getClass();
            map.ifPresent(redisURI::setPassword);
            Optional ofNullable = Optional.ofNullable(this.conf.getUsername());
            redisURI.getClass();
            ofNullable.ifPresent(redisURI::setUsername);
            redisURI.setSsl(this.conf.isSsl());
        });
    }

    public static RedisInputFormat<RowData> forRowData(FlinkRedisConf flinkRedisConf) {
        return new RedisInputFormat<>(flinkRedisConf, new RedisInternalDataDeserializer(flinkRedisConf.getSchema()));
    }
}
