package org.apache.flink.streaming.api.operators.dynamicconfig;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.operators.dynamicconfig.DynamicConfigOperatorContext;
import org.apache.flink.streaming.api.operators.dynamicconfig.util.ConfigUpdateCoordinationRequest;
import org.apache.flink.streaming.api.operators.dynamicconfig.util.ConfigUpdateCoordinationResponse;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/dynamicconfig/DynamicConfigOperatorCoordinator.class */
public class DynamicConfigOperatorCoordinator implements OperatorCoordinator, CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigOperatorCoordinator.class);
    private final String operatorName;
    private final DynamicConfigOperatorContext context;
    private boolean started;

    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/dynamicconfig/DynamicConfigOperatorCoordinator$Provider.class */
    public static class Provider implements OperatorCoordinator.Provider {
        private final OperatorID operatorId;
        private final String operatorName;
        private final int socketTimeout;

        public Provider(OperatorID operatorID, String str, int i) {
            this.operatorId = operatorID;
            this.operatorName = str;
            this.socketTimeout = i;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new DynamicConfigOperatorCoordinator(new DynamicConfigOperatorContext(new DynamicConfigOperatorContext.CoordinatorExecutorThreadFactory("DynamicConfigOperatorCoordinator-" + this.operatorName, context), context, this.socketTimeout), this.operatorName);
        }
    }

    public DynamicConfigOperatorCoordinator(DynamicConfigOperatorContext dynamicConfigOperatorContext, String str) {
        this.context = dynamicConfigOperatorContext;
        this.operatorName = str;
    }

    public void start() throws Exception {
        LOG.info("Start DynamicConfigOperatorCoordinator");
        this.started = true;
    }

    public void close() throws Exception {
        closeConnection();
    }

    private void closeConnection() throws InterruptedException {
        LOG.info("Closing DynamicConfigOperatorCoordinator for {}.", this.operatorName);
        if (this.started) {
            this.context.close();
        }
        this.started = false;
        LOG.info("DynamicConfigOperatorCoordinator for {} closed.", this.operatorName);
    }

    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest coordinationRequest) {
        Preconditions.checkArgument(coordinationRequest instanceof ConfigUpdateCoordinationRequest, "Coordination request must be a ConfigUpdateCoordinationRequest");
        if (!this.context.isOperatorReady()) {
            return CompletableFuture.completedFuture(ConfigUpdateCoordinationResponse.operatorUnavailable());
        }
        ConfigUpdateCoordinationRequest configUpdateCoordinationRequest = (ConfigUpdateCoordinationRequest) coordinationRequest;
        List list = (List) this.context.getSubtasks().stream().map(num -> {
            return this.context.sendUpdateRequestToOperator(num.intValue(), configUpdateCoordinationRequest);
        }).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])).thenApply(r4 -> {
            return (CoordinationResponse) ((CompletableFuture) list.get(0)).join();
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            LOG.error("Failed applied config {} for operator {}", configUpdateCoordinationRequest.getConfig(), this.operatorName);
            return ConfigUpdateCoordinationResponse.operatorUnavailable();
        });
    }

    public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) {
        Preconditions.checkArgument(operatorEvent instanceof DynamicConfigOperatorAddressEvent, "Operator event must be a DynamicConfigOperatorAddressEvent");
        InetSocketAddress address = ((DynamicConfigOperatorAddressEvent) operatorEvent).getAddress();
        this.context.subtaskReady(i, address);
        LOG.info("Received operator socket server address: {}", address);
    }

    public void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
        runInEventLoop(() -> {
            LOG.info("Removing itself after failure for subtask {} of operator {}.", Integer.valueOf(i), this.operatorName);
            this.context.subtaskNotReady(i);
        }, "handling subtask %d failure", Integer.valueOf(i));
    }

    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        runInEventLoop(() -> {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                    try {
                        objectOutputStream.writeObject(this.context.getSubtaskAddresses());
                        completableFuture.complete(byteArrayOutputStream.toByteArray());
                        objectOutputStream.close();
                        byteArrayOutputStream.close();
                    } catch (Throwable th) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint the DynamicConfigOperatorCoordinator for operator %s", this.operatorName), e));
            }
        }, "taking checkpoint %d", Long.valueOf(j));
    }

    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        if (bArr == null) {
            close();
        } else {
            this.context.setSubtaskAddresses((Map) new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject());
        }
    }

    public void subtaskReset(int i, long j) {
    }

    public void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
    }

    public void notifyCheckpointComplete(long j) {
    }

    private void runInEventLoop(ThrowingRunnable<Throwable> throwingRunnable, String str, Object... objArr) {
        ensureStarted();
        this.context.runInCoordinatorThread(() -> {
            try {
                throwingRunnable.run();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                LOG.error("Uncaught exception in the DynamicConfigOperatorCoordinator for operator {} while {}. Triggering job failover.", new Object[]{this.operatorName, String.format(str, objArr), th});
                this.context.failCoordinator(th);
            }
        });
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
    }
}
