package com.huawei.streaming.operator.inputstream;

import com.huawei.streaming.common.RandomValueGen;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.event.Attribute;
import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.IEmitter;
import com.huawei.streaming.operator.IInputStreamOperator;
import com.huawei.streaming.serde.StreamSerDe;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/inputstream/HeadStreamSourceOp.class */
public class HeadStreamSourceOp implements IInputStreamOperator {
    private static final long serialVersionUID = 4660203255085836607L;
    private static final Logger LOG = LoggerFactory.getLogger(HeadStreamSourceOp.class);
    private static final int MAX_TIME = 1000000000;
    private transient HeadStream headStream;
    private transient ScheduledExecutorService scheduler;
    private IEmitter emitter;
    private StreamSerDe serde;
    private StreamingConfig config;
    private TimeUnit timeUnit = TimeUnit.SECONDS;
    private int period = 1;
    private int eventNumPerPeriod = 1;
    private boolean isSchedue = false;
    private long interval = 0;
    private int totalNum = 0;
    private long delay = 0;
    private int counts = 0;

    /* loaded from: input_file:com/huawei/streaming/operator/inputstream/HeadStreamSourceOp$HeadStream.class */
    public static class HeadStream {
        private IEventType eventType;
        private RandomValueGen randomGen = new RandomValueGen();

        public HeadStream(IEventType iEventType) {
            if (null == iEventType) {
                HeadStreamSourceOp.LOG.error("The output eventType of HeadStream is empty.");
                throw new RuntimeException("The output eventType of HeadStream is empty.");
            }
            this.eventType = iEventType;
        }

        public Object[] getOutput() {
            Object[] objArr = new Object[this.eventType.getSize()];
            Attribute[] allAttributes = this.eventType.getAllAttributes();
            for (int i = 0; i < allAttributes.length; i++) {
                objArr[i] = genRandomValue(allAttributes[i].getAttDataType());
            }
            return objArr;
        }

        private Object genRandomValue(Class cls) {
            return cls.getName().endsWith("Integer") ? Integer.valueOf(this.randomGen.getInteger(-1, false)) : cls.getName().endsWith("Double") ? Double.valueOf(this.randomGen.getDouble(-1, false)) : cls.getName().endsWith("Float") ? Double.valueOf(this.randomGen.getFloat(-1, false)) : cls.getName().endsWith("Boolean") ? Boolean.valueOf(this.randomGen.getBoolean()) : cls.getName().endsWith("String") ? this.randomGen.getString("generated", true) : Integer.valueOf(this.randomGen.getInteger(-1, false));
        }
    }

    /* loaded from: input_file:com/huawei/streaming/operator/inputstream/HeadStreamSourceOp$HeadStreamThreadFactory.class */
    private static class HeadStreamThreadFactory implements ThreadFactory {
        private HeadStreamThreadFactory() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, HeadStreamSourceOp.class.getName());
            thread.setDaemon(true);
            return thread;
        }
    }

    /* loaded from: input_file:com/huawei/streaming/operator/inputstream/HeadStreamSourceOp$ScheduleRunner.class */
    public class ScheduleRunner implements Runnable {
        private IEmitter emitObject;

        public ScheduleRunner() {
        }

        public ScheduleRunner setEmitter(IEmitter iEmitter) {
            this.emitObject = iEmitter;
            return this;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (HeadStreamSourceOp.this.totalNum <= 0) {
                    this.emitObject.emit(HeadStreamSourceOp.this.headStream.getOutput());
                } else if (HeadStreamSourceOp.this.counts < HeadStreamSourceOp.this.totalNum) {
                    this.emitObject.emit(HeadStreamSourceOp.this.headStream.getOutput());
                    HeadStreamSourceOp.access$308(HeadStreamSourceOp.this);
                }
            } catch (StreamingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        this.config = streamingConfig;
        initParameters(streamingConfig);
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public StreamingConfig getConfig() {
        return this.config;
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void initialize() throws StreamingException {
        LOG.info("Init HeadStreamSource Operator...");
        this.headStream = new HeadStream(this.serde.getSchema());
        if (this.delay > 0) {
            LOG.info("Preparing send... delay={}", Long.valueOf(this.delay));
            try {
                Thread.sleep(this.delay);
            } catch (InterruptedException e) {
                LOG.error("HeadStream operator delay error", e);
                throw new RuntimeException(e);
            }
        }
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public void execute() throws StreamingException {
        if (this.isSchedue && this.scheduler == null) {
            LOG.info("Init scheduler...");
            this.scheduler = Executors.newSingleThreadScheduledExecutor(new HeadStreamThreadFactory());
            ScheduleRunner scheduleRunner = new ScheduleRunner();
            scheduleRunner.setEmitter(this.emitter);
            if (this.eventNumPerPeriod == this.period) {
                this.interval = 1L;
            } else if (this.eventNumPerPeriod <= this.period) {
                this.interval = this.period / this.eventNumPerPeriod;
            } else {
                if (this.period * MAX_TIME < this.eventNumPerPeriod) {
                    LOG.error("Too huge event num, not supported. event num={}", Integer.valueOf(this.eventNumPerPeriod));
                    throw new RuntimeException("Too huge event num, not supported.");
                }
                this.interval = (this.period * 1000000000) / this.eventNumPerPeriod;
                this.timeUnit = TimeUnit.NANOSECONDS;
            }
            LOG.info("Launch scheduler... fixed Interval={}, delay={}", Long.valueOf(this.interval), Long.valueOf(this.delay));
            this.scheduler.scheduleAtFixedRate(scheduleRunner, this.delay, this.interval, this.timeUnit);
        }
        if (!this.isSchedue || this.scheduler == null) {
            if (this.totalNum == 0) {
                this.emitter.emit(this.headStream.getOutput());
            } else if (this.counts < this.totalNum) {
                this.emitter.emit(this.headStream.getOutput());
                this.counts++;
            }
        }
    }

    @Override // com.huawei.streaming.operator.IStreamOperator
    public void destroy() throws StreamingException {
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public void setEmitter(IEmitter iEmitter) {
        this.emitter = iEmitter;
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public void setSerDe(StreamSerDe streamSerDe) {
        this.serde = streamSerDe;
    }

    @Override // com.huawei.streaming.operator.IInputStreamOperator
    public StreamSerDe getSerDe() {
        return this.serde;
    }

    private void initParameters(StreamingConfig streamingConfig) throws StreamingException {
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_HEADSTREAM_TIMEUNIT)) {
            this.timeUnit = TimeUnit.valueOf(streamingConfig.getStringValue(StreamingConfig.OPERATOR_HEADSTREAM_TIMEUNIT));
        }
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_HEADSTREAM_PERIOD)) {
            this.period = streamingConfig.getIntValue(StreamingConfig.OPERATOR_HEADSTREAM_PERIOD);
        }
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_HEADSTREAM_EVENTNUMPERPERIOD)) {
            this.eventNumPerPeriod = streamingConfig.getIntValue(StreamingConfig.OPERATOR_HEADSTREAM_EVENTNUMPERPERIOD);
        }
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_HEADSTREAM_ISSCHEDULE)) {
            this.isSchedue = streamingConfig.getBooleanValue(StreamingConfig.OPERATOR_HEADSTREAM_ISSCHEDULE);
        }
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_HEADSTREAM_TOTALNUMBER)) {
            this.totalNum = streamingConfig.getIntValue(StreamingConfig.OPERATOR_HEADSTREAM_TOTALNUMBER);
        }
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_HEADSTREAM_DELAYTIME)) {
            this.delay = streamingConfig.getLongValue(StreamingConfig.OPERATOR_HEADSTREAM_DELAYTIME);
        }
    }

    static /* synthetic */ int access$308(HeadStreamSourceOp headStreamSourceOp) {
        int i = headStreamSourceOp.counts;
        headStreamSourceOp.counts = i + 1;
        return i;
    }
}
