package org.apache.flink.streaming.tests.queryablestate;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Random;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.tests.queryablestate.LabelSurrogate;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/tests/queryablestate/QsStateProducer.class */
public class QsStateProducer {

    /* loaded from: input_file:org/apache/flink/streaming/tests/queryablestate/QsStateProducer$EmailSource.class */
    private static class EmailSource extends RichSourceFunction<Email> {
        private static final long serialVersionUID = -7286937645300388040L;
        private volatile transient boolean isRunning;
        private transient Random random;

        private EmailSource() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.random = new Random();
            this.isRunning = true;
        }

        public void run(SourceFunction.SourceContext<Email> sourceContext) throws Exception {
            Thread.sleep(10000L);
            int length = LabelSurrogate.Type.values().length;
            while (this.isRunning) {
                int nextInt = this.random.nextInt(100);
                EmailId emailId = new EmailId(Integer.toString(this.random.nextInt()));
                Instant minus = Instant.now().minus((TemporalAmount) Duration.ofDays(1L));
                String format = String.format("foo #%d", Integer.valueOf(nextInt));
                LabelSurrogate labelSurrogate = new LabelSurrogate(LabelSurrogate.Type.values()[nextInt % length], "bar");
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Email(emailId, minus, format, labelSurrogate));
                }
                Thread.sleep(30L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/tests/queryablestate/QsStateProducer$TestFlatMap.class */
    private static class TestFlatMap extends RichFlatMapFunction<Email, Object> implements CheckpointedFunction {
        private static final long serialVersionUID = 7821128115999005941L;
        private transient MapState<EmailId, EmailInformation> state;
        private transient int count;

        private TestFlatMap() {
        }

        public void open(Configuration configuration) {
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("state", TypeInformation.of(new TypeHint<EmailId>() { // from class: org.apache.flink.streaming.tests.queryablestate.QsStateProducer.TestFlatMap.1
            }), TypeInformation.of(new TypeHint<EmailInformation>() { // from class: org.apache.flink.streaming.tests.queryablestate.QsStateProducer.TestFlatMap.2
            }));
            mapStateDescriptor.setQueryable("state");
            this.state = getRuntimeContext().getMapState(mapStateDescriptor);
            this.count = -1;
        }

        public void flatMap(Email email, Collector<Object> collector) throws Exception {
            this.state.put(email.getEmailId(), new EmailInformation(email));
            this.count = Iterables.size(this.state.keys());
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
            System.out.println("Count on snapshot: " + this.count);
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) {
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Email) obj, (Collector<Object>) collector);
        }
    }

    public static void main(String[] strArr) throws Exception {
        RocksDBStateBackend memoryStateBackend;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        String required = fromArgs.getRequired("tmp-dir");
        String required2 = fromArgs.getRequired("state-backend");
        boolean z = -1;
        switch (required2.hashCode()) {
            case -1077756671:
                if (required2.equals("memory")) {
                    z = 2;
                    break;
                }
                break;
            case 3277:
                if (required2.equals("fs")) {
                    z = true;
                    break;
                }
                break;
            case 1368770220:
                if (required2.equals("rocksdb")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                memoryStateBackend = new RocksDBStateBackend(required);
                break;
            case true:
                memoryStateBackend = new FsStateBackend(required);
                break;
            case true:
                memoryStateBackend = new MemoryStateBackend();
                break;
            default:
                throw new RuntimeException("Unsupported state backend " + required2);
        }
        executionEnvironment.setStateBackend(memoryStateBackend);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(0L);
        executionEnvironment.addSource(new EmailSource()).keyBy(new KeySelector<Email, String>() { // from class: org.apache.flink.streaming.tests.queryablestate.QsStateProducer.1
            private static final long serialVersionUID = -1480525724620425363L;

            public String getKey(Email email) throws Exception {
                return QsConstants.KEY;
            }
        }).flatMap(new TestFlatMap());
        executionEnvironment.execute();
    }
}
