package org.apache.flink.runtime.source.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinator.class */
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
    private static final Logger LOG;
    private final String operatorName;
    private final ExecutorService coordinatorExecutor;
    private final Source<?, SplitT, EnumChkT> source;
    private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final SourceCoordinatorContext<SplitT> context;
    private SplitEnumerator<SplitT, EnumChkT> enumerator;
    private boolean started;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SourceCoordinator(String str, ExecutorService executorService, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> sourceCoordinatorContext) {
        this.operatorName = str;
        this.coordinatorExecutor = executorService;
        this.source = source;
        this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
        this.splitSerializer = source.getSplitSerializer();
        this.context = sourceCoordinatorContext;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
        LOG.info("Starting split enumerator for source {}.", this.operatorName);
        if (this.enumerator == null) {
            TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(this.context.getCoordinatorContext().getUserCodeClassloader());
            Throwable th = null;
            try {
                try {
                    this.enumerator = this.source.createEnumerator(this.context);
                    if (of != null) {
                        if (0 != 0) {
                            try {
                                of.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            of.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (of != null) {
                    if (th != null) {
                        try {
                            of.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        of.close();
                    }
                }
                throw th3;
            }
        }
        this.started = true;
        runInEventLoop(() -> {
            this.enumerator.start();
        }, "starting the SplitEnumerator.", new Object[0]);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.info("Closing SourceCoordinator for source {}.", this.operatorName);
        try {
            if (this.started) {
                this.context.close();
                if (this.enumerator != null) {
                    this.enumerator.close();
                }
            }
            LOG.info("Source coordinator for source {} closed.", this.operatorName);
        } finally {
            this.coordinatorExecutor.shutdownNow();
            this.coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
        runInEventLoop(() -> {
            LOG.debug("Handling event from subtask {} of source {}: {}", Integer.valueOf(i), this.operatorName, operatorEvent);
            if (operatorEvent instanceof RequestSplitEvent) {
                this.enumerator.handleSplitRequest(i, ((RequestSplitEvent) operatorEvent).hostName());
            } else if (operatorEvent instanceof SourceEventWrapper) {
                this.enumerator.handleSourceEvent(i, ((SourceEventWrapper) operatorEvent).getSourceEvent());
            } else {
                if (!(operatorEvent instanceof ReaderRegistrationEvent)) {
                    throw new FlinkException("Unrecognized Operator Event: " + operatorEvent);
                }
                handleReaderRegistrationEvent((ReaderRegistrationEvent) operatorEvent);
            }
        }, "handling operator event %s from subtask %d", operatorEvent, Integer.valueOf(i));
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskFailed(int i, @Nullable Throwable th) {
        runInEventLoop(() -> {
            LOG.info("Removing registered reader after failure for subtask {} of source {}.", Integer.valueOf(i), this.operatorName);
            this.context.unregisterSourceReader(i);
        }, "handling subtask %d failure", Integer.valueOf(i));
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void subtaskReset(int i, long j) {
        runInEventLoop(() -> {
            LOG.info("Recovering subtask {} to checkpoint {} for source {} to checkpoint.", Integer.valueOf(i), Long.valueOf(j), this.operatorName);
            List<SplitT> andRemoveUncheckpointedAssignment = this.context.getAndRemoveUncheckpointedAssignment(i, j);
            LOG.debug("Adding splits back to the split enumerator of source {}: {}", this.operatorName, andRemoveUncheckpointedAssignment);
            this.enumerator.addSplitsBack(andRemoveUncheckpointedAssignment, i);
        }, "handling subtask %d recovery to checkpoint %d", Integer.valueOf(i), Long.valueOf(j));
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        runInEventLoop(() -> {
            LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", this.operatorName, Long.valueOf(j));
            try {
                completableFuture.complete(toBytes(j));
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint SplitEnumerator for source %s", this.operatorName), th));
            }
        }, "taking checkpoint %d", Long.valueOf(j));
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
        runInEventLoop(() -> {
            LOG.info("Marking checkpoint {} as completed for source {}.", Long.valueOf(j), this.operatorName);
            this.context.onCheckpointComplete(j);
            this.enumerator.notifyCheckpointComplete(j);
        }, "notifying the enumerator of completion of checkpoint %d", Long.valueOf(j));
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) {
        runInEventLoop(() -> {
            LOG.info("Marking checkpoint {} as aborted for source {}.", Long.valueOf(j), this.operatorName);
            this.enumerator.notifyCheckpointAborted(j);
        }, "calling notifyCheckpointAborted()", new Object[0]);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        Preconditions.checkState(!this.started, "The coordinator can only be reset if it was not yet started");
        if (!$assertionsDisabled && this.enumerator != null) {
            throw new AssertionError();
        }
        if (bArr == null) {
            return;
        }
        LOG.info("Restoring SplitEnumerator of source {} from checkpoint.", this.operatorName);
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(this.context.getCoordinatorContext().getUserCodeClassloader());
        Throwable th = null;
        try {
            try {
                this.enumerator = this.source.restoreEnumerator(this.context, deserializeCheckpointAndRestoreContext(bArr));
                if (of != null) {
                    if (0 == 0) {
                        of.close();
                        return;
                    }
                    try {
                        of.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    of.close();
                }
            }
            throw th4;
        }
    }

    private void runInEventLoop(ThrowingRunnable<Throwable> throwingRunnable, String str, Object... objArr) {
        ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                throwingRunnable.run();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                LOG.error("Uncaught exception in the SplitEnumerator for Source {} while {}. Triggering job failover.", this.operatorName, String.format(str, objArr), th);
                this.context.failJob(th);
            }
        });
    }

    @VisibleForTesting
    SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
        return this.enumerator;
    }

    @VisibleForTesting
    SourceCoordinatorContext<SplitT> getContext() {
        return this.context;
    }

    private byte[] toBytes(long j) throws Exception {
        return writeCheckpointBytes(j, this.enumerator.snapshotState(), this.context, this.enumCheckpointSerializer, this.splitSerializer);
    }

    static <SplitT extends SourceSplit, EnumChkT> byte[] writeCheckpointBytes(long j, EnumChkT enumchkt, SourceCoordinatorContext<SplitT> sourceCoordinatorContext, SimpleVersionedSerializer<EnumChkT> simpleVersionedSerializer, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer2) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion(dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.writeInt(simpleVersionedSerializer.getVersion());
                    byte[] serialize = simpleVersionedSerializer.serialize(enumchkt);
                    dataOutputViewStreamWrapper.writeInt(serialize.length);
                    dataOutputViewStreamWrapper.write(serialize);
                    sourceCoordinatorContext.snapshotState(j, simpleVersionedSerializer2, dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (dataOutputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataOutputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputViewStreamWrapper.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataOutputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    private EnumChkT deserializeCheckpointAndRestoreContext(byte[] bArr) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(byteArrayInputStream);
            Throwable th2 = null;
            try {
                try {
                    SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion(dataInputViewStreamWrapper);
                    int readInt = dataInputViewStreamWrapper.readInt();
                    byte[] readBytes = SourceCoordinatorSerdeUtils.readBytes(dataInputViewStreamWrapper, dataInputViewStreamWrapper.readInt());
                    this.context.restoreState(this.splitSerializer, dataInputViewStreamWrapper);
                    EnumChkT deserialize = this.enumCheckpointSerializer.deserialize(readInt, readBytes);
                    if (dataInputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataInputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataInputViewStreamWrapper.close();
                        }
                    }
                    return deserialize;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataInputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
        }
    }

    private void handleReaderRegistrationEvent(ReaderRegistrationEvent readerRegistrationEvent) {
        this.context.registerSourceReader(new ReaderInfo(readerRegistrationEvent.subtaskId(), readerRegistrationEvent.location()));
        this.enumerator.addReader(readerRegistrationEvent.subtaskId());
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
        if (!$assertionsDisabled && this.enumerator == null) {
            throw new AssertionError();
        }
    }

    static {
        $assertionsDisabled = !SourceCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) SourceCoordinator.class);
    }
}
