package org.apache.flink.sql.tests;

import java.io.PrintStream;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
import org.apache.flink.types.Row;

/* loaded from: input_file:org/apache/flink/sql/tests/StreamSQLTestProgram.class */
public class StreamSQLTestProgram {

    /* loaded from: input_file:org/apache/flink/sql/tests/StreamSQLTestProgram$Generator.class */
    public static class Generator implements SourceFunction<Row>, ResultTypeQueryable<Row>, CheckpointedFunction {
        private final int numKeys;
        private final int offsetSeconds;
        private final int sleepMs;
        private final int durationMs;
        private long ms = 0;
        private ListState<Long> state = null;

        public Generator(int i, float f, int i2, int i3) {
            this.numKeys = i;
            this.durationMs = i2 * 1000;
            this.offsetSeconds = i3;
            this.sleepMs = (int) (1000.0f / f);
        }

        public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
            long j = this.offsetSeconds * 2000;
            while (this.ms < this.durationMs) {
                synchronized (sourceContext.getCheckpointLock()) {
                    for (int i = 0; i < this.numKeys; i++) {
                        sourceContext.collect(Row.of(new Object[]{Integer.valueOf(i), Long.valueOf(this.ms + j), "Some payload..."}));
                    }
                    this.ms += this.sleepMs;
                }
                Thread.sleep(this.sleepMs);
            }
        }

        public void cancel() {
        }

        public TypeInformation<Row> getProducedType() {
            return Types.ROW(new TypeInformation[]{Types.INT, Types.LONG, Types.STRING});
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("state", LongSerializer.INSTANCE));
            Iterator it = ((Iterable) this.state.get()).iterator();
            while (it.hasNext()) {
                this.ms += ((Long) it.next()).longValue();
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.clear();
            this.state.add(Long.valueOf(this.ms));
        }
    }

    /* loaded from: input_file:org/apache/flink/sql/tests/StreamSQLTestProgram$GeneratorTableSource.class */
    public static class GeneratorTableSource implements StreamTableSource<Row>, DefinedRowtimeAttributes, DefinedFieldMapping {
        private final int numKeys;
        private final float recordsPerKeyAndSecond;
        private final int durationSeconds;
        private final int offsetSeconds;

        public GeneratorTableSource(int i, float f, int i2, int i3) {
            this.numKeys = i;
            this.recordsPerKeyAndSecond = f;
            this.durationSeconds = i2;
            this.offsetSeconds = i3;
        }

        public DataStream<Row> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
            return streamExecutionEnvironment.addSource(new Generator(this.numKeys, this.recordsPerKeyAndSecond, this.durationSeconds, this.offsetSeconds));
        }

        public TypeInformation<Row> getReturnType() {
            return Types.ROW(new TypeInformation[]{Types.INT, Types.LONG, Types.STRING});
        }

        public TableSchema getTableSchema() {
            return new TableSchema(new String[]{"key", "rowtime", "payload"}, new TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP, Types.STRING});
        }

        public String explainSource() {
            return "GeneratorTableSource";
        }

        public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
            return Collections.singletonList(new RowtimeAttributeDescriptor("rowtime", new ExistingField("ts"), new BoundedOutOfOrderTimestamps(100L)));
        }

        public Map<String, String> getFieldMapping() {
            HashMap hashMap = new HashMap();
            hashMap.put("key", "f0");
            hashMap.put("ts", "f1");
            hashMap.put("payload", "f2");
            return hashMap;
        }
    }

    /* loaded from: input_file:org/apache/flink/sql/tests/StreamSQLTestProgram$KeyBucketAssigner.class */
    public static final class KeyBucketAssigner implements BucketAssigner<Row, String> {
        private static final long serialVersionUID = 987325769970523326L;

        public String getBucketId(Row row, BucketAssigner.Context context) {
            return String.valueOf(row.getField(0));
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    /* loaded from: input_file:org/apache/flink/sql/tests/StreamSQLTestProgram$KillMapper.class */
    public static class KillMapper implements MapFunction<Row, Row>, CheckpointedFunction, ResultTypeQueryable {
        private int saveRecordCnt = 0;
        private int lostRecordCnt = 0;
        private ListState<Integer> state = null;

        public Row map(Row row) {
            if (this.saveRecordCnt == 1 && this.lostRecordCnt == 1) {
                throw new RuntimeException("Kill this Job!");
            }
            this.saveRecordCnt++;
            this.lostRecordCnt++;
            return row;
        }

        public TypeInformation getProducedType() {
            return Types.ROW(new TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP});
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("state", IntSerializer.INSTANCE));
            Iterator it = ((Iterable) this.state.get()).iterator();
            while (it.hasNext()) {
                this.saveRecordCnt += ((Integer) it.next()).intValue();
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.clear();
            this.state.add(Integer.valueOf(this.saveRecordCnt));
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        String required = fromArgs.getRequired("outputPath");
        String str = fromArgs.get("planner", "blink");
        EnvironmentSettings.Builder newInstance = EnvironmentSettings.newInstance();
        newInstance.inStreamingMode();
        if (str.equals("old")) {
            newInstance.useOldPlanner();
        } else if (str.equals("blink")) {
            newInstance.useBlinkPlanner();
        }
        EnvironmentSettings build = newInstance.build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
        executionEnvironment.enableCheckpointing(4000L);
        executionEnvironment.getConfig().setAutoWatermarkInterval(1000L);
        TableEnvironmentInternal create = StreamTableEnvironment.create(executionEnvironment, build);
        create.registerTableSourceInternal("table1", new GeneratorTableSource(10, 100.0f, 60, 0));
        create.registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
        create.toAppendStream(create.sqlQuery(String.format("SELECT   SUM(correct) AS correct,   TUMBLE_START(rowtime, INTERVAL '20' SECOND) AS rowtime FROM (%s) GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND)", String.format("SELECT   t1.key,   t2.rowtime AS rowtime,   t2.correct,  t2.wStart FROM table2 t1, (%s) t2 WHERE   t1.key = t2.key AND   t1.rowtime BETWEEN t2.rowtime AND t2.rowtime + INTERVAL '%d' SECOND", String.format("SELECT   key,   CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 ELSE 99 END AS correct,   TUMBLE_START(rowtime, INTERVAL '%d' SECOND) AS wStart,   TUMBLE_ROWTIME(rowtime, INTERVAL '%d' SECOND) AS rowtime FROM (%s) WHERE rowtime > TIMESTAMP '1970-01-01 00:00:01' GROUP BY key, TUMBLE(rowtime, INTERVAL '%d' SECOND)", 10, 10, String.format("SELECT   key,   rowtime,   COUNT(*) OVER (PARTITION BY key ORDER BY rowtime RANGE BETWEEN INTERVAL '%d' SECOND PRECEDING AND CURRENT ROW) AS cnt FROM table1", 1), 10), 10))), Types.ROW(new TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP})).map(new KillMapper()).setParallelism(1).addSink(StreamingFileSink.forRowFormat(new Path(required), (row, outputStream) -> {
            new PrintStream(outputStream).println(row.toString());
        }).withBucketAssigner(new KeyBucketAssigner()).withRollingPolicy(OnCheckpointRollingPolicy.build()).build()).setParallelism(1);
        executionEnvironment.execute();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1213540006:
                if (implMethodName.equals("lambda$main$67876c2d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/serialization/Encoder") && serializedLambda.getFunctionalInterfaceMethodName().equals("encode") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/io/OutputStream;)V") && serializedLambda.getImplClass().equals("org/apache/flink/sql/tests/StreamSQLTestProgram") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/Row;Ljava/io/OutputStream;)V")) {
                    return (row, outputStream) -> {
                        new PrintStream(outputStream).println(row.toString());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
