package org.apache.flink.streaming.api.operators.dynamicconfig;

import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.operators.dynamicconfig.util.ConfigUpdateCoordinationRequest;
import org.apache.flink.streaming.api.operators.dynamicconfig.util.ConfigUpdateCoordinationResponse;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/dynamicconfig/DynamicConfigOperatorContext.class */
public class DynamicConfigOperatorContext implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigOperatorContext.class);
    private final int socketTimeout;
    private final ScheduledExecutorService coordinatorExecutor;
    private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private Map<Integer, DynamicOperatorSocket> subtaskSockets;
    private final int operatorParallelism;

    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/dynamicconfig/DynamicConfigOperatorContext$CoordinatorExecutorThreadFactory.class */
    public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
        private static final Logger LOG = LoggerFactory.getLogger(CoordinatorExecutorThreadFactory.class);
        private final String coordinatorThreadName;
        private final ClassLoader classLoader;
        private final Thread.UncaughtExceptionHandler errorHandler;

        @Nullable
        private Thread thread;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CoordinatorExecutorThreadFactory(String str, OperatorCoordinator.Context context) {
            this(str, context.getUserCodeClassloader(), getExceptionHandler(context));
        }

        @VisibleForTesting
        CoordinatorExecutorThreadFactory(String str, ClassLoader classLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
            this.coordinatorThreadName = str;
            this.classLoader = classLoader;
            this.errorHandler = uncaughtExceptionHandler;
        }

        @Override // java.util.concurrent.ThreadFactory
        public synchronized Thread newThread(@NotNull Runnable runnable) {
            this.thread = new Thread(runnable, this.coordinatorThreadName);
            this.thread.setContextClassLoader(this.classLoader);
            this.thread.setUncaughtExceptionHandler(this.errorHandler);
            return this.thread;
        }

        boolean isCurrentThreadCoordinatorThread() {
            return Thread.currentThread() == this.thread;
        }

        void failCoordinatorThread(Throwable th) {
            if (!isCurrentThreadCoordinatorThread()) {
                throw new IllegalStateException("Current thread is not coordinator thread");
            }
            this.errorHandler.uncaughtException(this.thread, th);
        }

        private static Thread.UncaughtExceptionHandler getExceptionHandler(OperatorCoordinator.Context context) {
            return (thread, th) -> {
                LOG.error("Thread '{}' produced an uncaught exception. Failing the job.", thread.getName(), th);
                context.failJob(th);
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/dynamicconfig/DynamicConfigOperatorContext$DynamicOperatorSocket.class */
    public static class DynamicOperatorSocket {
        private static final int DEFAULT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
        private final int socketTimeout;
        private final InetSocketAddress address;
        private Socket socket;
        private DataInputViewStreamWrapper inStream;
        private DataOutputViewStreamWrapper outStream;

        public DynamicOperatorSocket(int i, InetSocketAddress inetSocketAddress) {
            this.socketTimeout = i;
            this.address = inetSocketAddress;
        }

        public DynamicOperatorSocket(InetSocketAddress inetSocketAddress) {
            this(DEFAULT_SOCKET_TIMEOUT, inetSocketAddress);
        }

        public CompletableFuture<ConfigUpdateCoordinationResponse> send(ConfigUpdateCoordinationRequest configUpdateCoordinationRequest) {
            try {
                if (this.socket == null) {
                    this.socket = new Socket();
                    this.socket.setSoTimeout(this.socketTimeout);
                    this.socket.setKeepAlive(true);
                    this.socket.setTcpNoDelay(true);
                    this.socket.connect(this.address);
                    this.inStream = new DataInputViewStreamWrapper(this.socket.getInputStream());
                    this.outStream = new DataOutputViewStreamWrapper(this.socket.getOutputStream());
                    DynamicConfigOperatorContext.LOG.info("Operator connection established");
                }
                if (DynamicConfigOperatorContext.LOG.isDebugEnabled()) {
                    DynamicConfigOperatorContext.LOG.debug("Forwarding request to operator socket server");
                }
                configUpdateCoordinationRequest.serialize(this.outStream);
                return CompletableFuture.completedFuture(ConfigUpdateCoordinationResponse.successfulFromDataInputView(this.inStream));
            } catch (Exception e) {
                if (DynamicConfigOperatorContext.LOG.isDebugEnabled()) {
                    DynamicConfigOperatorContext.LOG.debug("Config update operator coordinator encounters an exception", e);
                }
                close();
                return CompletableFuture.completedFuture(ConfigUpdateCoordinationResponse.operatorUnavailable());
            }
        }

        public void close() {
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e) {
                    DynamicConfigOperatorContext.LOG.warn("Failed to close sink socket server connection", e);
                }
            }
            this.socket = null;
        }
    }

    public DynamicConfigOperatorContext(CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, OperatorCoordinator.Context context, int i) {
        this(Executors.newScheduledThreadPool(1, coordinatorExecutorThreadFactory), coordinatorExecutorThreadFactory, context, i);
    }

    public DynamicConfigOperatorContext(ScheduledExecutorService scheduledExecutorService, CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, OperatorCoordinator.Context context, int i) {
        this.coordinatorExecutor = scheduledExecutorService;
        this.coordinatorThreadFactory = coordinatorExecutorThreadFactory;
        this.operatorParallelism = context.currentParallelism();
        this.subtaskSockets = new HashMap(this.operatorParallelism);
        this.socketTimeout = i;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        ComponentClosingUtils.shutdownExecutorForcefully(this.coordinatorExecutor, Duration.ofNanos(CommittableMessage.EOI));
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute(runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskReady(int i, InetSocketAddress inetSocketAddress) {
        if (this.subtaskSockets.get(Integer.valueOf(i)) != null) {
            throw new IllegalStateException("Already have a subtask socket for " + i);
        }
        this.subtaskSockets.put(Integer.valueOf(i), new DynamicOperatorSocket(this.socketTimeout, inetSocketAddress));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskNotReady(int i) {
        this.subtaskSockets.put(Integer.valueOf(i), null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Integer> getSubtasks() {
        return this.subtaskSockets.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<Integer, InetSocketAddress> getSubtaskAddresses() {
        return (Map) this.subtaskSockets.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((DynamicOperatorSocket) entry.getValue()).address;
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSubtaskAddresses(Map<Integer, InetSocketAddress> map) {
        this.subtaskSockets = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new DynamicOperatorSocket((InetSocketAddress) entry.getValue());
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOperatorReady() {
        return getSubtasks().size() == this.operatorParallelism;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<ConfigUpdateCoordinationResponse> sendUpdateRequestToOperator(int i, ConfigUpdateCoordinationRequest configUpdateCoordinationRequest) {
        return (CompletableFuture) callInCoordinatorThread(() -> {
            DynamicOperatorSocket dynamicOperatorSocket = this.subtaskSockets.get(Integer.valueOf(i));
            if (dynamicOperatorSocket != null) {
                return dynamicOperatorSocket.send(configUpdateCoordinationRequest);
            }
            LOG.warn(String.format("Subtask %d is not ready yet to receive events.", Integer.valueOf(i)));
            return CompletableFuture.completedFuture(ConfigUpdateCoordinationResponse.operatorUnavailable());
        }, String.format("Failed to send update config request %s to subtask %d", configUpdateCoordinationRequest, Integer.valueOf(i)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failCoordinator(Throwable th) {
        try {
            this.coordinatorExecutor.submit(() -> {
                this.coordinatorThreadFactory.failCoordinatorThread(th);
            }).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlinkRuntimeException("Error failing DynamicConfigOperatorCoordinator", e);
        } catch (ExecutionException e2) {
            throw new FlinkRuntimeException("Error failing DynamicConfigOperatorCoordinator", e2);
        }
    }

    private <V> V callInCoordinatorThread(Callable<V> callable, String str) {
        if (this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread() || this.coordinatorExecutor.isShutdown()) {
            try {
                return callable.call();
            } catch (Exception e) {
                throw new FlinkRuntimeException(str, e);
            }
        }
        try {
            return (V) this.coordinatorExecutor.submit(() -> {
                try {
                    return callable.call();
                } catch (Exception e2) {
                    LOG.error("Uncaught Exception in DynamicConfigOperatorCoordinator Executor", e2);
                    ExceptionUtils.rethrowException(e2);
                    return null;
                }
            }).get();
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new FlinkRuntimeException(str, e2);
        } catch (ExecutionException e3) {
            throw new FlinkRuntimeException(str, e3);
        }
    }
}
