package org.apache.flink.streaming.tests;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/tests/SlidingWindowCheckMapper.class */
public class SlidingWindowCheckMapper extends RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
    private static final long serialVersionUID = -744070793650644485L;
    private transient ValueState<List<Tuple2<Event, Integer>>> eventsSeenSoFar;
    private transient ValueState<Long> lastSequenceNumber;
    private final int slideFactor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SlidingWindowCheckMapper(int i) {
        this.slideFactor = i;
    }

    public void open(Configuration configuration) {
        this.eventsSeenSoFar = getRuntimeContext().getState(new ValueStateDescriptor("eventsSeenSoFar", new ListTypeInfo(new TupleTypeInfo(new TypeInformation[]{TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO}))));
        this.lastSequenceNumber = getRuntimeContext().getState(new ValueStateDescriptor("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO));
    }

    public void flatMap(Tuple2<Integer, List<Event>> tuple2, Collector<String> collector) throws Exception {
        List<Tuple2<Event, Integer>> list = (List) Optional.ofNullable(this.eventsSeenSoFar.value()).orElseGet(Collections::emptyList);
        List<Event> list2 = (List) tuple2.f1;
        Optional<Event> verifyWindowContiguity = verifyWindowContiguity(list2, collector);
        Long l = (Long) this.lastSequenceNumber.value();
        List<Tuple2<Event, Integer>> verifyPreviousOccurences = verifyPreviousOccurences(list, list2, l, collector);
        if (verifyWindowContiguity.isPresent()) {
            updateLastSeenSequenceNumber(verifyWindowContiguity.get(), l, collector);
        }
        this.eventsSeenSoFar.update(verifyPreviousOccurences);
    }

    private void updateLastSeenSequenceNumber(Event event, Long l, Collector<String> collector) throws IOException {
        long sequenceNumber = event.getSequenceNumber();
        if (l == null || sequenceNumber > l.longValue()) {
            this.lastSequenceNumber.update(Long.valueOf(sequenceNumber));
        } else if (sequenceNumber < l.longValue()) {
            failWithSequenceNumberDecreased(event, l, collector);
        }
    }

    private void failWithSequenceNumberDecreased(Event event, Long l, Collector<String> collector) {
        collector.collect(String.format("Last event in current window (%s) has sequence number lower than seen so far (%d)", event, l));
    }

    private List<Tuple2<Event, Integer>> verifyPreviousOccurences(List<Tuple2<Event, Integer>> list, List<Event> list2, Long l, Collector<String> collector) {
        List<Tuple2<Event, Integer>> arrayList = new ArrayList<>();
        ArrayList arrayList2 = new ArrayList();
        for (Tuple2<Event, Integer> tuple2 : list) {
            if (list2.contains(tuple2.f0)) {
                arrayList2.add(tuple2.f0);
                preserveOrDiscardIfSeenSlideFactorTimes(arrayList, tuple2);
            } else {
                failWithEventNotSeenAlertMessage(tuple2, list2, collector);
            }
        }
        addNotSeenValues(arrayList, list2, arrayList2, l, collector);
        return arrayList;
    }

    private void addNotSeenValues(List<Tuple2<Event, Integer>> list, List<Event> list2, List<Event> list3, Long l, Collector<String> collector) {
        list2.stream().filter(event -> {
            return !list3.contains(event);
        }).forEach(event2 -> {
            if (l == null || event2.getSequenceNumber() > l.longValue()) {
                list.add(Tuple2.of(event2, 1));
            } else {
                failWithEventSeenTooManyTimesMessage(event2, collector);
            }
        });
    }

    private void failWithEventSeenTooManyTimesMessage(Event event, Collector<String> collector) {
        collector.collect(String.format("Alert: event %s seen more than %d times", event, Integer.valueOf(this.slideFactor)));
    }

    private void preserveOrDiscardIfSeenSlideFactorTimes(List<Tuple2<Event, Integer>> list, Tuple2<Event, Integer> tuple2) {
        int intValue = ((Integer) tuple2.f1).intValue() + 1;
        if (intValue != this.slideFactor) {
            list.add(Tuple2.of(tuple2.f0, Integer.valueOf(intValue)));
        }
    }

    private void failWithEventNotSeenAlertMessage(Tuple2<Event, Integer> tuple2, List<Event> list, Collector<String> collector) {
        collector.collect(String.format("Alert: event %s did not belong to %d consecutive windows. Event seen so far %d times.Current window: %s", tuple2.f0, Integer.valueOf(this.slideFactor), tuple2.f1, list));
    }

    private Optional<Event> verifyWindowContiguity(List<Event> list, Collector<String> collector) {
        return list.stream().sorted(Comparator.comparingLong((v0) -> {
            return v0.getSequenceNumber();
        })).reduce((event, event2) -> {
            if (event2.getSequenceNumber() - 1 != event.getSequenceNumber()) {
                collector.collect("Alert: events in window out ouf order!");
            }
            return event2;
        });
    }

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((Tuple2<Integer, List<Event>>) obj, (Collector<String>) collector);
    }
}
