package org.apache.flink.connector.kafka.dynamic.source.enumerator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.class */
public class StoppableKafkaEnumContextProxy implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(StoppableKafkaEnumContextProxy.class);
    private final String kafkaClusterId;
    private final KafkaMetadataService kafkaMetadataService;
    private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
    private final ScheduledExecutorService subEnumeratorWorker;
    private final Runnable signalNoMoreSplitsCallback;
    private boolean noMoreSplits = false;
    private volatile boolean isClosing = false;

    @Internal
    /* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy$HandledFlinkKafkaException.class */
    public static class HandledFlinkKafkaException extends RuntimeException {
        private static final String ERROR_MESSAGE = "An error occurred with %s";
        private final String kafkaClusterId;

        public HandledFlinkKafkaException(Throwable th, String str) {
            super(th);
            this.kafkaClusterId = str;
        }

        @Override // java.lang.Throwable
        public String getMessage() {
            return String.format(ERROR_MESSAGE, this.kafkaClusterId);
        }
    }

    @Internal
    /* loaded from: input_file:org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy$StoppableKafkaEnumContextProxyFactory.class */
    public interface StoppableKafkaEnumContextProxyFactory {
        StoppableKafkaEnumContextProxy create(SplitEnumeratorContext<DynamicKafkaSourceSplit> splitEnumeratorContext, String str, KafkaMetadataService kafkaMetadataService, Runnable runnable);

        static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
            return (splitEnumeratorContext, str, kafkaMetadataService, runnable) -> {
                return new StoppableKafkaEnumContextProxy(str, kafkaMetadataService, splitEnumeratorContext, runnable);
            };
        }
    }

    public StoppableKafkaEnumContextProxy(String str, KafkaMetadataService kafkaMetadataService, SplitEnumeratorContext<DynamicKafkaSourceSplit> splitEnumeratorContext, @Nullable Runnable runnable) {
        this.kafkaClusterId = str;
        this.kafkaMetadataService = kafkaMetadataService;
        this.enumContext = splitEnumeratorContext;
        this.subEnumeratorWorker = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(str + "-enum-worker"));
        this.signalNoMoreSplitsCallback = runnable;
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return this.enumContext.metricGroup();
    }

    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        this.enumContext.sendEventToSourceReader(i, sourceEvent);
    }

    public int currentParallelism() {
        return this.enumContext.currentParallelism();
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return this.enumContext.registeredReaders();
    }

    public void assignSplits(SplitsAssignment<KafkaPartitionSplit> splitsAssignment) {
        if (logger.isInfoEnabled()) {
            logger.info("Assigning {} splits for cluster {}: {}", new Object[]{Long.valueOf(splitsAssignment.assignment().values().stream().mapToLong((v0) -> {
                return v0.size();
            }).sum()), this.kafkaClusterId, splitsAssignment});
        }
        HashMap hashMap = new HashMap();
        splitsAssignment.assignment().forEach((num, list) -> {
            hashMap.put(num, (List) list.stream().map(kafkaPartitionSplit -> {
                return new DynamicKafkaSourceSplit(this.kafkaClusterId, kafkaPartitionSplit);
            }).collect(Collectors.toList()));
        });
        if (hashMap.isEmpty()) {
            return;
        }
        this.enumContext.assignSplits(new SplitsAssignment(hashMap));
    }

    public void signalNoMoreSplits(int i) {
        this.noMoreSplits = true;
        if (this.signalNoMoreSplitsCallback != null) {
            this.signalNoMoreSplitsCallback.run();
        }
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        this.enumContext.callAsync(wrapCallAsyncCallable(callable), wrapCallAsyncCallableHandler(biConsumer));
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        this.subEnumeratorWorker.scheduleAtFixedRate(() -> {
            callAsync(callable, biConsumer);
        }, j, j2, TimeUnit.MILLISECONDS);
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.enumContext.runInCoordinatorThread(runnable);
    }

    public void signalBacklogIsOver(int i) {
        this.enumContext.signalBacklogIsOver(i);
    }

    public boolean isNoMoreSplits() {
        return this.noMoreSplits;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("Closing enum context for {}", this.kafkaClusterId);
        if (this.subEnumeratorWorker != null) {
            this.isClosing = true;
            this.subEnumeratorWorker.shutdown();
            this.subEnumeratorWorker.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    }

    protected <T> Callable<T> wrapCallAsyncCallable(Callable<T> callable) {
        return () -> {
            try {
                return callable.call();
            } catch (Exception e) {
                if (this.isClosing) {
                    throw new HandledFlinkKafkaException(e, this.kafkaClusterId);
                }
                Optional findThrowable = ExceptionUtils.findThrowable(e, KafkaException.class);
                if (!findThrowable.isPresent() || this.kafkaMetadataService.isClusterActive(this.kafkaClusterId)) {
                    throw e;
                }
                throw new HandledFlinkKafkaException((Throwable) findThrowable.get(), this.kafkaClusterId);
            }
        };
    }

    protected <T> BiConsumer<T, Throwable> wrapCallAsyncCallableHandler(BiConsumer<T, Throwable> biConsumer) {
        return (obj, th) -> {
            Optional findThrowable = ExceptionUtils.findThrowable(th, HandledFlinkKafkaException.class);
            if (findThrowable.isPresent()) {
                logger.warn("Swallowed handled exception for {}.", this.kafkaClusterId, findThrowable.get());
            } else {
                biConsumer.accept(obj, th);
            }
        };
    }
}
