package org.apache.flink.cep.dynamic.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.cep.dynamic.DynamicCEPOptions;
import org.apache.flink.cep.dynamic.processor.PatternProcessor;
import org.apache.flink.cep.dynamic.processor.PatternProcessorDiscoverer;
import org.apache.flink.cep.dynamic.processor.PatternProcessorDiscovererFactory;
import org.apache.flink.cep.dynamic.processor.PatternProcessorManager;
import org.apache.flink.cep.dynamic.serializer.PatternProcessorSerializer;
import org.apache.flink.cep.event.UpdatePatternProcessorEvent;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cep/dynamic/coordinator/DynamicCepOperatorCoordinator.class */
public class DynamicCepOperatorCoordinator<T> implements OperatorCoordinator, PatternProcessorManager<T> {
    private static final Logger LOG;
    private final String operatorName;
    private final PatternProcessorDiscovererFactory<T> discovererFactory;
    private final DynamicCepOperatorCoordinatorContext context;
    private final DynamicCEPOptions<T> options;
    private PatternProcessorDiscoverer<T> discoverer;
    private boolean started;
    static final /* synthetic */ boolean $assertionsDisabled;
    private List<PatternProcessor<T>> currentPatternProcessors = new ArrayList();
    private final PatternProcessorSerializer<T> patternProcessorSerializer = new PatternProcessorSerializer<>();

    public DynamicCepOperatorCoordinator(String str, PatternProcessorDiscovererFactory<T> patternProcessorDiscovererFactory, DynamicCepOperatorCoordinatorContext dynamicCepOperatorCoordinatorContext, DynamicCEPOptions<T> dynamicCEPOptions) {
        this.operatorName = str;
        this.discovererFactory = patternProcessorDiscovererFactory;
        this.context = dynamicCepOperatorCoordinatorContext;
        this.options = dynamicCEPOptions;
    }

    public void start() throws Exception {
        LOG.info("Starting pattern processor discoverer for {}", this.operatorName);
        this.started = true;
        if (this.discoverer == null) {
            try {
                this.discoverer = this.discovererFactory.createPatternProcessorDiscoverer(this.context.getUserCodeClassloader(), this.options.getAdditionalOptions());
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                LOG.error("Failed to create PatternProcessorDiscoverer for pattern processor {}", this.operatorName, th);
                this.context.failCoordinator(th);
                return;
            }
        }
        runInEventLoop(() -> {
            this.discoverer.discoverPatternProcessorUpdates(this);
        }, "discovering the PatternProcessor updates.", new Object[0]);
    }

    public void close() throws Exception {
        LOG.info("Closing DynamicCepOperatorCoordinator for pattern processor {}.", this.operatorName);
        if (this.started) {
            IOUtils.closeAll(new AutoCloseable[]{this.context, this.discoverer});
        }
        this.started = false;
        LOG.info("DynamicCepOperatorCoordinator for pattern processor {} closed.", this.operatorName);
    }

    public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) throws Exception {
    }

    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        runInEventLoop(() -> {
            LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", this.operatorName, Long.valueOf(j));
            try {
                completableFuture.complete(toBytes());
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint the current PatternProcessor for pattern processor %s", this.operatorName), th));
            }
        }, "taking checkpoint %d", Long.valueOf(j));
    }

    public void notifyCheckpointComplete(long j) {
        LOG.info("Marking checkpoint {} as completed for pattern processor {}.", Long.valueOf(j), this.operatorName);
    }

    public void notifyCheckpointAborted(long j) {
        LOG.info("Marking checkpoint {} as aborted for pattern processor {}.", Long.valueOf(j), this.operatorName);
    }

    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        if (bArr == null) {
            return;
        }
        LOG.info("Restoring PatternProcessorDiscoverer of pattern processor {} from checkpoint.", this.operatorName);
        this.currentPatternProcessors = deserializeCheckpoint(bArr);
        this.discoverer = this.discovererFactory.createPatternProcessorDiscoverer(this.context.getUserCodeClassloader(), this.options.getAdditionalOptions());
    }

    public void subtaskReset(int i, long j) {
        LOG.info("Recovering subtask {} to checkpoint {} for pattern processor {} to checkpoint.", new Object[]{Integer.valueOf(i), Long.valueOf(j), this.operatorName});
        runInEventLoop(() -> {
            this.context.sendEventToOperator(i, new UpdatePatternProcessorEvent(this.currentPatternProcessors));
        }, "making event gateway to subtask %d available", Integer.valueOf(i));
    }

    public void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
        runInEventLoop(() -> {
            LOG.info("Removing itself after failure for subtask {} of pattern processor {}.", Integer.valueOf(i), this.operatorName);
            this.context.subtaskNotReady(i);
        }, "handling subtask %d failure", Integer.valueOf(i));
    }

    public void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
        if (!$assertionsDisabled && i != subtaskGateway.getSubtask()) {
            throw new AssertionError();
        }
        LOG.debug("Subtask {} of pattern processor {} is ready.", Integer.valueOf(i), this.operatorName);
        runInEventLoop(() -> {
            this.context.subtaskReady(subtaskGateway);
            this.context.sendEventToOperator(i, new UpdatePatternProcessorEvent(this.currentPatternProcessors));
        }, "making event gateway to subtask %d available", Integer.valueOf(i));
    }

    @Override // org.apache.flink.cep.dynamic.processor.PatternProcessorManager
    public void onPatternProcessorsUpdated(List<PatternProcessor<T>> list) {
        try {
            checkPatternProcessors(list);
            if (list.isEmpty()) {
                LOG.warn("The list of patternProcessors received by the coordination is empty. No update will be applied.");
                if (this.options.getEmptyProcessorsBehaviour() == DynamicCEPOptions.EmptyProcessorsBehaviour.FAIL_JOB) {
                    throw new IllegalArgumentException("The list of patternProcessors received by the coordination is empty. No update will be applied.");
                }
                return;
            }
            Iterator<Integer> it = this.context.getSubtasks().iterator();
            while (it.hasNext()) {
                try {
                    this.context.sendEventToOperator(it.next().intValue(), new UpdatePatternProcessorEvent(list));
                } catch (IOException e) {
                    LOG.error("Failed to send UpdatePatternProcessorEvent to pattern processor for operator: " + this.operatorName);
                    this.context.failCoordinator(e);
                    return;
                }
            }
            this.currentPatternProcessors = list;
            LOG.info("PatternProcessors have been updated.");
            LOG.debug("new PatternProcessors: " + list);
        } catch (Exception e2) {
            LOG.error("Failed to send UpdatePatternProcessorEvent to operator: " + this.operatorName);
            this.context.failCoordinator(e2);
        }
    }

    @VisibleForTesting
    PatternProcessorDiscovererFactory<T> getPatternProcessorDiscovererFactory() {
        return this.discovererFactory;
    }

    @VisibleForTesting
    PatternProcessorDiscoverer<T> getDiscoverer() {
        return this.discoverer;
    }

    @VisibleForTesting
    List<PatternProcessor<T>> getCurrentPatternProcessors() {
        return this.currentPatternProcessors;
    }

    @VisibleForTesting
    DynamicCepOperatorCoordinatorContext getContext() {
        return this.context;
    }

    private void runInEventLoop(ThrowingRunnable<Throwable> throwingRunnable, String str, Object... objArr) {
        ensureStarted();
        if (this.discoverer == null) {
            return;
        }
        this.context.runInCoordinatorThread(() -> {
            try {
                throwingRunnable.run();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                LOG.error("Uncaught exception in the PatternProcessorDiscover for PatternProcessor {} while {}. Triggering job failover.", new Object[]{this.operatorName, String.format(str, objArr), th});
                this.context.failCoordinator(th);
            }
        });
    }

    private byte[] toBytes() throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    DynamicCepOperatorCoordinatorSerdeUtils.writeCoordinatorSerdeVersion(dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.writeInt(this.patternProcessorSerializer.getVersion());
                    byte[] serialize = this.patternProcessorSerializer.serialize(this.currentPatternProcessors);
                    dataOutputViewStreamWrapper.writeInt(serialize.length);
                    dataOutputViewStreamWrapper.write(serialize);
                    dataOutputViewStreamWrapper.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (dataOutputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataOutputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputViewStreamWrapper.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataOutputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    private List<PatternProcessor<T>> deserializeCheckpoint(byte[] bArr) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(byteArrayInputStream);
            Throwable th2 = null;
            try {
                try {
                    DynamicCepOperatorCoordinatorSerdeUtils.verifyCoordinatorSerdeVersion(dataInputViewStreamWrapper);
                    List<PatternProcessor<T>> m13deserialize = this.patternProcessorSerializer.m13deserialize(dataInputViewStreamWrapper.readInt(), DynamicCepOperatorCoordinatorSerdeUtils.readBytes(dataInputViewStreamWrapper, dataInputViewStreamWrapper.readInt()));
                    if (dataInputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataInputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataInputViewStreamWrapper.close();
                        }
                    }
                    return m13deserialize;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataInputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
        }
    }

    private void checkPatternProcessors(List<PatternProcessor<T>> list) {
        Preconditions.checkNotNull(list, "Pattern processor cannot be null.");
        for (PatternProcessor<T> patternProcessor : list) {
            Preconditions.checkNotNull(patternProcessor, "Pattern processor cannot be null.");
            PatternProcessFunction<T, ?> patternProcessFunction = patternProcessor.getPatternProcessFunction();
            if (patternProcessFunction == null) {
                throw new NullPointerException(String.format("Patten process function of the pattern processor {id={%s}, version={%d}} is null", patternProcessor.getId(), Integer.valueOf(patternProcessor.getVersion())));
            }
            if ((patternProcessFunction instanceof CheckpointedFunction) && (patternProcessFunction instanceof ListCheckpointed)) {
                throw new IllegalStateException(String.format("Pattern process function of the pattern processor {id={%s}, version={%d}} is not allowed to implement CheckpointedFunction AND ListCheckpointed.", patternProcessor.getId(), Integer.valueOf(patternProcessor.getVersion())));
            }
        }
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
    }

    static {
        $assertionsDisabled = !DynamicCepOperatorCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(DynamicCepOperatorCoordinator.class);
    }
}
