package org.apache.flink.streaming.examples.statemachine;

import java.lang.invoke.SerializedLambda;
import java.util.Properties;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.examples.statemachine.dfa.State;
import org.apache.flink.streaming.examples.statemachine.event.Alert;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
import org.apache.flink.streaming.examples.statemachine.kafka.KafkaStandaloneGenerator;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/StateMachineExample.class */
public class StateMachineExample {

    /* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/StateMachineExample$StateMachineMapper.class */
    static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {
        private ValueState<State> currentState;

        StateMachineMapper() {
        }

        public void open(Configuration configuration) {
            this.currentState = getRuntimeContext().getState(new ValueStateDescriptor("state", State.class));
        }

        public void flatMap(Event event, Collector<Alert> collector) throws Exception {
            State state = (State) this.currentState.value();
            if (state == null) {
                state = State.Initial;
            }
            State transition = state.transition(event.type());
            if (transition == State.InvalidTransition) {
                collector.collect(new Alert(event.sourceAddress(), state, event.type()));
            } else if (transition.isTerminal()) {
                this.currentState.clear();
            } else {
                this.currentState.update(transition);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Event) obj, (Collector<Alert>) collector);
        }
    }

    public static void main(String[] strArr) throws Exception {
        SourceFunction eventsGeneratorSource;
        System.out.println("Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]");
        System.out.println("Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]");
        System.out.println("Options for both the above setups: ");
        System.out.println("\t[--backend <file|rocks>]");
        System.out.println("\t[--checkpoint-dir <filepath>]");
        System.out.println("\t[--async-checkpoints <true|false>]");
        System.out.println("\t[--incremental-checkpoints <true|false>]");
        System.out.println("\t[--output <filepath> OR null for stdout]");
        System.out.println();
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        if (fromArgs.has("kafka-topic")) {
            String str = fromArgs.get("kafka-topic");
            String str2 = fromArgs.get("brokers", KafkaStandaloneGenerator.BROKER_ADDRESS);
            System.out.printf("Reading from kafka topic %s @ %s\n", str, str2);
            System.out.println();
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", str2);
            SourceFunction flinkKafkaConsumer = new FlinkKafkaConsumer(str, new EventDeSerializer(), properties);
            flinkKafkaConsumer.setStartFromLatest();
            flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
            eventsGeneratorSource = flinkKafkaConsumer;
        } else {
            double d = fromArgs.getDouble("error-rate", 0.0d);
            int i = fromArgs.getInt("sleep", 1);
            System.out.printf("Using standalone source with error rate %f and sleep delay %s millis\n", Double.valueOf(d), Integer.valueOf(i));
            System.out.println();
            eventsGeneratorSource = new EventsGeneratorSource(d, i);
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(2000L);
        String str3 = fromArgs.get("backend", "memory");
        if ("file".equals(str3)) {
            executionEnvironment.setStateBackend(new FsStateBackend(fromArgs.get("checkpoint-dir"), fromArgs.getBoolean("async-checkpoints", false)));
        } else if ("rocks".equals(str3)) {
            executionEnvironment.setStateBackend(new RocksDBStateBackend(fromArgs.get("checkpoint-dir"), fromArgs.getBoolean("incremental-checkpoints", false)));
        }
        String str4 = fromArgs.get("output");
        executionEnvironment.getConfig().setGlobalJobParameters(fromArgs);
        SingleOutputStreamOperator flatMap = executionEnvironment.addSource(eventsGeneratorSource).keyBy((v0) -> {
            return v0.sourceAddress();
        }).flatMap(new StateMachineMapper());
        if (str4 == null) {
            flatMap.print();
        } else {
            flatMap.writeAsText(str4, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        }
        executionEnvironment.execute("State machine job");
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1053128871:
                if (implMethodName.equals("sourceAddress")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/examples/statemachine/event/Event") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.sourceAddress();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
