package org.apache.flink.addons.redis.core.manager;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.command.BatchCommands;
import io.lettuce.core.dynamic.RedisCommandFactory;
import io.lettuce.core.dynamic.batch.CommandBatching;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.addons.redis.conf.FlinkRedisConf;
import org.apache.flink.addons.redis.conf.RedisConnectorOptions;
import org.apache.flink.addons.redis.core.exception.RedisConnectorException;
import org.apache.flink.addons.redis.core.input.datatype.RedisDataTypeReader;
import org.apache.flink.addons.redis.core.output.datatype.RedisDataTypeWriter;
import org.apache.flink.addons.redis.core.output.datatype.RedisWriteCommandPayload;
import org.apache.flink.addons.redis.core.ttl.TTLStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/addons/redis/core/manager/RedisManager.class */
public abstract class RedisManager implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RedisManager.class);
    protected final AbstractRedisClient redisClient;
    protected final TTLStrategy ttlStrategy;
    private BatchCommands batchCommands;
    private final int maxAttempts;
    private final int reconnectDelay;

    @Nullable
    transient Counter failsCounter;

    @Nullable
    transient Histogram requestTimeHist;

    public RedisManager(AbstractRedisClient abstractRedisClient, FlinkRedisConf flinkRedisConf) {
        this.redisClient = abstractRedisClient;
        Configuration parameters = flinkRedisConf.getParameters();
        this.maxAttempts = ((Integer) parameters.get(RedisConnectorOptions.RETRY_COUNT)).intValue() + 1;
        this.reconnectDelay = parameters.getInteger(RedisConnectorOptions.RECONNECT_DELAY);
        this.ttlStrategy = TTLStrategy.of(flinkRedisConf.getWriteOptions().getKeyTTLOptions());
        Preconditions.checkArgument(this.maxAttempts > 0);
        Preconditions.checkArgument(this.reconnectDelay > 0);
    }

    protected abstract RedisClusterCommands<String, String> getCommands();

    protected abstract RedisClusterAsyncCommands<String, String> getAsyncCommands();

    public abstract StatefulConnection<?, ?> getConnection();

    public abstract Runnable getConnectionOperation();

    public void setMetrics(Counter counter, Histogram histogram) {
        this.failsCounter = counter;
        this.requestTimeHist = histogram;
    }

    public void initBatchCommands() {
        this.batchCommands = (BatchCommands) new RedisCommandFactory(getConnection()).getCommands(BatchCommands.class);
    }

    public void disableAsyncAutoFlush() {
        getAsyncCommands().setAutoFlushCommands(false);
    }

    public <T> Iterable<T> read(String str, RedisDataTypeReader<T> redisDataTypeReader) {
        return (Iterable) doWithRetry(() -> {
            return redisDataTypeReader.readAll(getCommands(), str);
        });
    }

    public <T> CompletionStage<Iterable<T>> readAsync(String str, RedisDataTypeReader<T> redisDataTypeReader) {
        return doWithRetryAsync(redisDataTypeReader.readAllAsync(getAsyncCommands(), str));
    }

    public boolean exists(String str) {
        return ((Boolean) doWithRetry(() -> {
            return Boolean.valueOf(getCommands().exists(str).longValue() == 1);
        })).booleanValue();
    }

    public CompletionStage<Boolean> existsAsync(String str) {
        return doWithRetryAsync(getAsyncCommands().exists(str).thenApply(l -> {
            return Boolean.valueOf(l.longValue() == 1);
        }));
    }

    public String get(String str) {
        return (String) doWithRetry(() -> {
            return getCommands().get(str);
        });
    }

    public void set(String str, String str2) {
        doWithRetry(() -> {
            return getCommands().set(str, str2, this.ttlStrategy.getSetArgs());
        });
    }

    public <T, U> void writeRequest(RedisWriteCommandPayload<U> redisWriteCommandPayload, RedisDataTypeWriter<T, U> redisDataTypeWriter) {
        if (redisWriteCommandPayload.isDelete()) {
            doWithRetry(() -> {
                return getCommands().del(redisWriteCommandPayload.getRedisKey());
            });
        } else {
            doWithRetry(() -> {
                redisDataTypeWriter.write(getCommands(), redisWriteCommandPayload, this.ttlStrategy);
            });
        }
    }

    public <T, U> CompletableFuture<?> asyncWriteRequest(RedisWriteCommandPayload<U> redisWriteCommandPayload, RedisDataTypeWriter<T, U> redisDataTypeWriter) {
        return redisWriteCommandPayload.isDelete() ? getAsyncCommands().del(redisWriteCommandPayload.getRedisKey()).toCompletableFuture() : redisDataTypeWriter.writeAsync(getAsyncCommands(), redisWriteCommandPayload, this.ttlStrategy).toCompletableFuture();
    }

    public <T, U> void putWriteRequestToBatch(RedisWriteCommandPayload<U> redisWriteCommandPayload, RedisDataTypeWriter<T, U> redisDataTypeWriter, boolean z) {
        CommandBatching flush = z ? CommandBatching.flush() : CommandBatching.queue();
        if (redisWriteCommandPayload.isDelete()) {
            this.batchCommands.del(redisWriteCommandPayload.getRedisKey(), flush);
        } else {
            redisDataTypeWriter.writeToBatch(this.batchCommands, redisWriteCommandPayload, this.ttlStrategy, flush);
        }
    }

    public void flushBatch() {
        this.batchCommands.flush();
    }

    public void flushAsyncBuffer() {
        getAsyncCommands().flushCommands();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            if (this.redisClient != null) {
                this.redisClient.shutdown();
            }
        } catch (NoSuchElementException e) {
            LOG.warn("No resources to close Redis client.", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [io.lettuce.core.ScanCursor] */
    public List<String> scanAll(String str, int i) {
        ArrayList arrayList = new ArrayList();
        ScanArgs limit = new ScanArgs().match(str).limit(i);
        KeyScanCursor<String> keyScanCursor = ScanCursor.INITIAL;
        do {
            KeyScanCursor<String> scan = scan(keyScanCursor, limit);
            arrayList.addAll(scan.getKeys());
            keyScanCursor = scan;
        } while (!keyScanCursor.isFinished());
        return arrayList;
    }

    private KeyScanCursor<String> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return (KeyScanCursor) doWithRetry(() -> {
            return getCommands().scan(scanCursor, scanArgs);
        });
    }

    public CompletionStage<List<String>> scanAllAsync(String str, int i) {
        return scanAsync(ScanCursor.INITIAL, new ScanArgs().match(str).limit(i), new ArrayList());
    }

    private CompletionStage<List<String>> scanAsync(ScanCursor scanCursor, ScanArgs scanArgs, List<String> list) {
        return doWithRetryAsync(getAsyncCommands().scan(scanCursor, scanArgs)).thenCompose(keyScanCursor -> {
            list.addAll(keyScanCursor.getKeys());
            return keyScanCursor.isFinished() ? CompletableFuture.completedFuture(list) : scanAsync(keyScanCursor, scanArgs, list);
        });
    }

    public <T> T doWithRetry(Supplier<T> supplier) {
        Runnable connectionOperation = getConnectionOperation();
        for (int i = 0; i < this.maxAttempts; i++) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                T t = supplier.get();
                if (this.requestTimeHist != null) {
                    this.requestTimeHist.update(System.currentTimeMillis() - currentTimeMillis);
                }
                return t;
            } catch (RedisException e) {
                if (this.failsCounter != null) {
                    this.failsCounter.inc();
                }
                if (i >= this.maxAttempts - 1) {
                    throw e;
                }
                reconnect(connectionOperation, e, i);
            }
        }
        throw new IllegalStateException();
    }

    public void doWithRetry(Runnable runnable) {
        doWithRetry(() -> {
            runnable.run();
            return null;
        });
    }

    public <T> CompletionStage<T> doWithRetryAsync(CompletionStage<T> completionStage) {
        return doWithRetryAsync(completionStage, getConnectionOperation(), 0);
    }

    private <T> CompletionStage<T> doWithRetryAsync(CompletionStage<T> completionStage, Runnable runnable, int i) {
        long currentTimeMillis = System.currentTimeMillis();
        return completionStage.handle((obj, th) -> {
            if (i >= this.maxAttempts || !(th instanceof RedisException)) {
                return CompletableFuture.supplyAsync(() -> {
                    if (th != null) {
                        throw new RedisConnectorException(th);
                    }
                    if (this.requestTimeHist != null) {
                        this.requestTimeHist.update(System.currentTimeMillis() - currentTimeMillis);
                    }
                    return obj;
                });
            }
            if (this.failsCounter != null) {
                this.failsCounter.inc();
            }
            reconnect(runnable, th, i);
            return doWithRetryAsync(completionStage, runnable, i + 1);
        }).thenCompose(completionStage2 -> {
            return completionStage2;
        });
    }

    public void reconnect(Runnable runnable, Throwable th, int i) {
        try {
            LOG.error("Error occurred while executing Redis commands. Sleep for the {} ms to let Redis failover.", Integer.valueOf(this.reconnectDelay), th);
            Thread.sleep(this.reconnectDelay);
            LOG.info("Trying to reconnect to Redis. Attempt {}.", Integer.valueOf(i + 1));
            runnable.run();
            LOG.info("Connection successful.");
        } catch (InterruptedException e) {
            throw new RedisConnectionException("Thread was interrupted during reconnect delay sleep.", e);
        }
    }

    public static RedisManager forDeployMode(FlinkRedisConf flinkRedisConf) {
        switch (flinkRedisConf.getRedisMode()) {
            case STANDALONE:
                return new StandaloneRedisManager(flinkRedisConf.getHosts(), flinkRedisConf, false);
            case MASTER_REPLICA:
                return new StandaloneRedisManager(flinkRedisConf.getHosts(), flinkRedisConf, true);
            case CLUSTER:
                return new ClusterRedisManager(flinkRedisConf);
            default:
                throw new RedisConnectorException("Unknown Redis deploy mode.");
        }
    }
}
