package org.apache.flink.cep.operator;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.migration.MigrationTestUtil;
import org.apache.flink.streaming.util.migration.MigrationVersion;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest.class */
public class CEPMigrationTest {
    private final MigrationVersion flinkGenerateSavepointVersion = null;
    private final MigrationVersion migrateVersion;

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$EndFilter.class */
    private static class EndFilter extends SimpleCondition<Event> {
        private static final long serialVersionUID = 7056763917392056548L;

        private EndFilter() {
        }

        public boolean filter(Event event) throws Exception {
            return event.getName().equals("end");
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$MiddleFilter.class */
    private static class MiddleFilter extends SimpleCondition<SubEvent> {
        private static final long serialVersionUID = 6215754202506583964L;

        private MiddleFilter() {
        }

        public boolean filter(SubEvent subEvent) throws Exception {
            return subEvent.getVolume() > 5.0d;
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$NFAComplexConditionsFactory.class */
    private static class NFAComplexConditionsFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAComplexConditionsFactory() {
            this(false);
        }

        private NFAComplexConditionsFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compileFactory(Pattern.begin("start").subtype(SubEvent.class).where(new MiddleFilter()).or(new SubEventEndFilter()).times(2).within(Time.milliseconds(10L)), this.handleTimeout).createNFA();
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$NFAFactory.class */
    private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compileFactory(Pattern.begin("start").where(new StartFilter()).followedByAny("middle").subtype(SubEvent.class).where(new MiddleFilter()).followedByAny("end").where(new EndFilter()).within(Time.milliseconds(10L)), this.handleTimeout).createNFA();
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$SinglePatternNFAFactory.class */
    private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private SinglePatternNFAFactory() {
            this(false);
        }

        private SinglePatternNFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compileFactory(Pattern.begin("start").where(new StartFilter()).within(Time.milliseconds(10L)), this.handleTimeout).createNFA();
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$StartFilter.class */
    private static class StartFilter extends SimpleCondition<Event> {
        private static final long serialVersionUID = 5726188262756267490L;

        private StartFilter() {
        }

        public boolean filter(Event event) throws Exception {
            return event.getName().equals("start");
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigrationTest$SubEventEndFilter.class */
    private static class SubEventEndFilter extends SimpleCondition<SubEvent> {
        private static final long serialVersionUID = 7056763917392056548L;

        private SubEventEndFilter() {
        }

        public boolean filter(SubEvent subEvent) throws Exception {
            return subEvent.getName().equals("end");
        }
    }

    @Parameterized.Parameters(name = "Migration Savepoint: {0}")
    public static Collection<MigrationVersion> parameters() {
        return Arrays.asList(MigrationVersion.v1_3, MigrationVersion.v1_4, MigrationVersion.v1_5, MigrationVersion.v1_6);
    }

    public CEPMigrationTest(MigrationVersion migrationVersion) {
        this.migrateVersion = migrationVersion;
    }

    @Test
    @Ignore
    public void writeAfterBranchingPatternSnapshot() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.1
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 2.0d, 10.0d);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 1L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent, 2L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent2, 3L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5L));
            OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/cep-migration-after-branching-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testRestoreAfterBranchingPattern() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.2
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 2.0d, 10.0d);
        Event event2 = new Event(42, "end", 1.0d);
        OneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            MigrationTestUtil.restoreFromSnapshot(keyedOneInputStreamOperatorTestHarness, OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + this.migrateVersion + "-snapshot"), this.migrateVersion);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event2, 5L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(3L, output.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof StreamRecord);
            StreamRecord streamRecord = (StreamRecord) poll;
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            Object poll2 = output.poll();
            Assert.assertTrue(poll2 instanceof StreamRecord);
            StreamRecord streamRecord2 = (StreamRecord) poll2;
            Assert.assertTrue(streamRecord2.getValue() instanceof Map);
            Map map = (Map) streamRecord.getValue();
            Assert.assertEquals(event, ((List) map.get("start")).get(0));
            Assert.assertEquals(subEvent, ((List) map.get("middle")).get(0));
            Assert.assertEquals(event2, ((List) map.get("end")).get(0));
            Map map2 = (Map) streamRecord2.getValue();
            Assert.assertEquals(event, ((List) map2.get("start")).get(0));
            Assert.assertEquals(subEvent2, ((List) map2.get("middle")).get(0));
            Assert.assertEquals(event2, ((List) map2.get("end")).get(0));
            Event event3 = new Event(42, "start", 2.0d);
            SubEvent subEvent3 = new SubEvent(42, "foo", 1.0d, 11.0d);
            Event event4 = new Event(42, "end", 2.0d);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event3, 21L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent3, 23L));
            OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(1L, 1L);
            keyedOneInputStreamOperatorTestHarness.close();
            keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.initializeState(snapshot);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event4, 25L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(50L));
            ConcurrentLinkedQueue output2 = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output2.size());
            Object poll3 = output2.poll();
            Assert.assertTrue(poll3 instanceof StreamRecord);
            StreamRecord streamRecord3 = (StreamRecord) poll3;
            Assert.assertTrue(streamRecord3.getValue() instanceof Map);
            Map map3 = (Map) streamRecord3.getValue();
            Assert.assertEquals(event3, ((List) map3.get("start")).get(0));
            Assert.assertEquals(subEvent3, ((List) map3.get("middle")).get(0));
            Assert.assertEquals(event4, ((List) map3.get("end")).get(0));
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    @Ignore
    public void writeStartingNewPatternAfterMigrationSnapshot() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.3
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 1L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent, 2L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5L));
            OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/cep-migration-starting-new-pattern-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testRestoreStartingNewPatternAfterMigration() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.4
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        Event event2 = new Event(42, "start", 5.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 2.0d, 10.0d);
        Event event3 = new Event(42, "end", 1.0d);
        OneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            MigrationTestUtil.restoreFromSnapshot(keyedOneInputStreamOperatorTestHarness, OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink" + this.migrateVersion + "-snapshot"), this.migrateVersion);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event2, 5L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent2, 6L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event3, 7L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(4L, output.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof StreamRecord);
            StreamRecord streamRecord = (StreamRecord) poll;
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            Object poll2 = output.poll();
            Assert.assertTrue(poll2 instanceof StreamRecord);
            StreamRecord streamRecord2 = (StreamRecord) poll2;
            Assert.assertTrue(streamRecord2.getValue() instanceof Map);
            Object poll3 = output.poll();
            Assert.assertTrue(poll3 instanceof StreamRecord);
            StreamRecord streamRecord3 = (StreamRecord) poll3;
            Assert.assertTrue(streamRecord3.getValue() instanceof Map);
            Map map = (Map) streamRecord.getValue();
            Assert.assertEquals(event, ((List) map.get("start")).get(0));
            Assert.assertEquals(subEvent, ((List) map.get("middle")).get(0));
            Assert.assertEquals(event3, ((List) map.get("end")).get(0));
            Map map2 = (Map) streamRecord2.getValue();
            Assert.assertEquals(event, ((List) map2.get("start")).get(0));
            Assert.assertEquals(subEvent2, ((List) map2.get("middle")).get(0));
            Assert.assertEquals(event3, ((List) map2.get("end")).get(0));
            Map map3 = (Map) streamRecord3.getValue();
            Assert.assertEquals(event2, ((List) map3.get("start")).get(0));
            Assert.assertEquals(subEvent2, ((List) map3.get("middle")).get(0));
            Assert.assertEquals(event3, ((List) map3.get("end")).get(0));
            Event event4 = new Event(42, "start", 2.0d);
            SubEvent subEvent3 = new SubEvent(42, "foo", 1.0d, 11.0d);
            Event event5 = new Event(42, "end", 2.0d);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event4, 21L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent3, 23L));
            OperatorSubtaskState snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(1L, 1L);
            keyedOneInputStreamOperatorTestHarness.close();
            keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.initializeState(snapshot);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event5, 25L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(50L));
            ConcurrentLinkedQueue output2 = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output2.size());
            Object poll4 = output2.poll();
            Assert.assertTrue(poll4 instanceof StreamRecord);
            StreamRecord streamRecord4 = (StreamRecord) poll4;
            Assert.assertTrue(streamRecord4.getValue() instanceof Map);
            Map map4 = (Map) streamRecord4.getValue();
            Assert.assertEquals(event4, ((List) map4.get("start")).get(0));
            Assert.assertEquals(subEvent3, ((List) map4.get("middle")).get(0));
            Assert.assertEquals(event5, ((List) map4.get("end")).get(0));
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    @Ignore
    public void writeSinglePatternAfterMigrationSnapshot() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.5
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        new Event(42, "start", 1.0d);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new SinglePatternNFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5L));
            OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/cep-migration-single-pattern-afterwards-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testSinglePatternAfterMigration() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.6
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(42, "start", 1.0d);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new SinglePatternNFAFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            MigrationTestUtil.restoreFromSnapshot(keyedOneInputStreamOperatorTestHarness, OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink" + this.migrateVersion + "-snapshot"), this.migrateVersion);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 5L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof StreamRecord);
            StreamRecord streamRecord = (StreamRecord) poll;
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            Assert.assertEquals(event, ((List) ((Map) streamRecord.getValue()).get("start")).get(0));
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    @Ignore
    public void writeAndOrSubtypConditionsPatternAfterMigrationSnapshot() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.7
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        SubEvent subEvent = new SubEvent(42, "start", 1.0d, 6.0d);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAComplexConditionsFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent, 5L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(6L));
            OperatorSnapshotUtil.writeStateHandle(keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L), "src/test/resources/cep-migration-conditions-flink" + this.flinkGenerateSavepointVersion + "-snapshot");
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testAndOrSubtypeConditionsAfterMigration() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigrationTest.8
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        SubEvent subEvent = new SubEvent(42, "start", 1.0d, 6.0d);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAComplexConditionsFactory()), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            MigrationTestUtil.restoreFromSnapshot(keyedOneInputStreamOperatorTestHarness, OperatorSnapshotUtil.getResourceFilename("cep-migration-conditions-flink" + this.migrateVersion + "-snapshot"), this.migrateVersion);
            keyedOneInputStreamOperatorTestHarness.open();
            SubEvent subEvent2 = new SubEvent(42, "end", 1.0d, 2.0d);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent2, 9L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof StreamRecord);
            StreamRecord streamRecord = (StreamRecord) poll;
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            Map map = (Map) streamRecord.getValue();
            Assert.assertEquals(subEvent, ((List) map.get("start")).get(0));
            Assert.assertEquals(subEvent2, ((List) map.get("start")).get(1));
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }
}
