package org.apache.flink.runtime.operators.coordination;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.class */
public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RecreateOnResetOperatorCoordinator.class);
    private static final long CLOSING_TIMEOUT_MS = 60000;
    private final Provider provider;
    private final long closingTimeoutMs;
    private final OperatorCoordinator.Context context;
    private DeferrableCoordinator coordinator;
    private boolean started;
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator$DeferrableCoordinator.class */
    public static class DeferrableCoordinator {
        private final OperatorID operatorId;
        private final BlockingQueue<NamedCall> pendingCalls;
        private QuiesceableContext internalQuiesceableContext;
        private OperatorCoordinator internalCoordinator;
        private boolean hasCaughtUp;
        private boolean closed;
        private volatile boolean failed;

        private DeferrableCoordinator(OperatorID operatorID) {
            this.operatorId = operatorID;
            this.pendingCalls = new LinkedBlockingQueue();
            this.hasCaughtUp = false;
            this.closed = false;
            this.failed = false;
        }

        synchronized <T extends Exception> void applyCall(String str, ThrowingConsumer<OperatorCoordinator, T> throwingConsumer) throws Exception {
            if (this.hasCaughtUp) {
                throwingConsumer.accept(this.internalCoordinator);
            } else {
                this.pendingCalls.add(new NamedCall(str, throwingConsumer));
            }
        }

        synchronized void createNewInternalCoordinator(OperatorCoordinator.Context context, Provider provider) {
            if (this.closed) {
                return;
            }
            try {
                this.internalQuiesceableContext = new QuiesceableContext(context);
                this.internalCoordinator = provider.getCoordinator(this.internalQuiesceableContext);
            } catch (Exception e) {
                RecreateOnResetOperatorCoordinator.LOG.error("Failed to create new internal coordinator due to ", (Throwable) e);
                cleanAndFailJob(e);
            }
        }

        synchronized CompletableFuture<Void> closeAsync(long j) {
            this.closed = true;
            if (this.internalCoordinator == null) {
                return CompletableFuture.completedFuture(null);
            }
            this.internalQuiesceableContext.quiesce();
            this.pendingCalls.clear();
            String str = "SourceCoordinator for " + this.operatorId;
            OperatorCoordinator operatorCoordinator = this.internalCoordinator;
            operatorCoordinator.getClass();
            return ComponentClosingUtils.closeAsyncWithTimeout(str, (ThrowingRunnable<Exception>) operatorCoordinator::close, Duration.ofMillis(j)).exceptionally(th -> {
                cleanAndFailJob(th);
                return null;
            });
        }

        void processPendingCalls() {
            if (this.failed || this.closed || this.internalCoordinator == null) {
                return;
            }
            String str = "Unknown Call Name";
            while (!this.hasCaughtUp) {
                try {
                    while (!this.pendingCalls.isEmpty()) {
                        NamedCall poll = this.pendingCalls.poll();
                        if (poll != null) {
                            str = poll.name;
                            poll.getConsumer().accept(this.internalCoordinator);
                        }
                    }
                    synchronized (this) {
                        if (this.pendingCalls.isEmpty()) {
                            this.hasCaughtUp = true;
                        }
                    }
                } catch (Throwable th) {
                    RecreateOnResetOperatorCoordinator.LOG.error("Failed to process pending calls {} on coordinator.", str, th);
                    cleanAndFailJob(th);
                    return;
                }
            }
        }

        void start() throws Exception {
            this.internalCoordinator.start();
        }

        void resetAndStart(long j, @Nullable byte[] bArr, boolean z) {
            if (this.failed || this.closed || this.internalCoordinator == null) {
                return;
            }
            try {
                this.internalCoordinator.resetToCheckpoint(j, bArr);
                if (z) {
                    this.internalCoordinator.start();
                }
            } catch (Exception e) {
                RecreateOnResetOperatorCoordinator.LOG.error("Failed to reset the coordinator to checkpoint and start.", (Throwable) e);
                cleanAndFailJob(e);
            }
        }

        private void cleanAndFailJob(Throwable th) {
            if (this.failed) {
                return;
            }
            this.failed = true;
            this.internalQuiesceableContext.getContext().failJob(th);
            this.pendingCalls.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator$NamedCall.class */
    public static class NamedCall {
        private final String name;
        private final ThrowingConsumer<OperatorCoordinator, ?> consumer;

        private NamedCall(String str, ThrowingConsumer<OperatorCoordinator, ?> throwingConsumer) {
            this.name = str;
            this.consumer = throwingConsumer;
        }

        public String getName() {
            return this.name;
        }

        public ThrowingConsumer<OperatorCoordinator, ?> getConsumer() {
            return this.consumer;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator$Provider.class */
    public static abstract class Provider implements OperatorCoordinator.Provider {
        private static final long serialVersionUID = 3002837631612629071L;
        private final OperatorID operatorID;

        public Provider(OperatorID operatorID) {
            this.operatorID = operatorID;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider
        public OperatorID getOperatorId() {
            return this.operatorID;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider
        public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
            return create(context, 60000L);
        }

        @VisibleForTesting
        protected OperatorCoordinator create(OperatorCoordinator.Context context, long j) throws Exception {
            return new RecreateOnResetOperatorCoordinator(context, this, j);
        }

        protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator$QuiesceableContext.class */
    public static class QuiesceableContext implements OperatorCoordinator.Context {
        private final OperatorCoordinator.Context context;
        private volatile boolean quiesced = false;

        QuiesceableContext(OperatorCoordinator.Context context) {
            this.context = context;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public OperatorID getOperatorId() {
            return this.context.getOperatorId();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public synchronized void failJob(Throwable th) {
            if (this.quiesced) {
                return;
            }
            this.context.failJob(th);
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public int currentParallelism() {
            return this.context.currentParallelism();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public ClassLoader getUserCodeClassloader() {
            return this.context.getUserCodeClassloader();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public CoordinatorStore getCoordinatorStore() {
            return this.context.getCoordinatorStore();
        }

        @VisibleForTesting
        synchronized void quiesce() {
            this.quiesced = true;
        }

        @VisibleForTesting
        boolean isQuiesced() {
            return this.quiesced;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorCoordinator.Context getContext() {
            return this.context;
        }
    }

    private RecreateOnResetOperatorCoordinator(OperatorCoordinator.Context context, Provider provider, long j) throws Exception {
        this.context = context;
        this.provider = provider;
        this.coordinator = new DeferrableCoordinator(context.getOperatorId());
        this.coordinator.createNewInternalCoordinator(context, provider);
        this.coordinator.processPendingCalls();
        this.closingTimeoutMs = j;
        this.started = false;
        this.closed = false;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
        Preconditions.checkState(!this.started, "coordinator already started");
        this.started = true;
        this.coordinator.applyCall("start", (v0) -> {
            v0.start();
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
        this.closed = true;
        this.coordinator.closeAsync(this.closingTimeoutMs);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) throws Exception {
        this.coordinator.applyCall("handleEventFromOperator", operatorCoordinator -> {
            operatorCoordinator.handleEventFromOperator(i, operatorEvent);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskFailed(int i, @Nullable Throwable th) {
        this.coordinator.applyCall("subtaskFailed", operatorCoordinator -> {
            operatorCoordinator.subtaskFailed(i, th);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskReset(int i, long j) {
        this.coordinator.applyCall("subtaskReset", operatorCoordinator -> {
            operatorCoordinator.subtaskReset(i, j);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskReady(int i, OperatorCoordinator.SubtaskGateway subtaskGateway) {
        this.coordinator.applyCall("subtaskReady", operatorCoordinator -> {
            operatorCoordinator.subtaskReady(i, subtaskGateway);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        this.coordinator.applyCall("checkpointCoordinator", operatorCoordinator -> {
            operatorCoordinator.checkpointCoordinator(j, completableFuture);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
        this.coordinator.applyCall("checkpointComplete", operatorCoordinator -> {
            operatorCoordinator.notifyCheckpointComplete(j);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void resetToCheckpoint(long j, @Nullable byte[] bArr) {
        LOG.info("Resetting coordinator to checkpoint.");
        DeferrableCoordinator deferrableCoordinator = this.coordinator;
        DeferrableCoordinator deferrableCoordinator2 = new DeferrableCoordinator(this.context.getOperatorId());
        this.coordinator = deferrableCoordinator2;
        CompletableFuture<Void> closeAsync = deferrableCoordinator.closeAsync(this.closingTimeoutMs);
        boolean z = this.started;
        closeAsync.whenComplete((r14, th) -> {
            if (th != null) {
                LOG.warn(String.format("Received exception when closing operator coordinator for %s.", deferrableCoordinator.operatorId), th);
            }
            if (this.closed) {
                return;
            }
            deferrableCoordinator2.createNewInternalCoordinator(this.context, this.provider);
            deferrableCoordinator2.resetAndStart(j, bArr, z);
            deferrableCoordinator2.processPendingCalls();
        });
    }

    @VisibleForTesting
    public OperatorCoordinator getInternalCoordinator() throws Exception {
        waitForAllAsyncCallsFinish();
        return this.coordinator.internalCoordinator;
    }

    @VisibleForTesting
    QuiesceableContext getQuiesceableContext() throws Exception {
        waitForAllAsyncCallsFinish();
        return this.coordinator.internalQuiesceableContext;
    }

    @VisibleForTesting
    void waitForAllAsyncCallsFinish() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.coordinator.applyCall("waitForAllAsyncCallsFinish", operatorCoordinator -> {
            completableFuture.complete(null);
        });
        completableFuture.get();
    }
}
