package storm.starter;

import java.util.Map;
import org.apache.derby.iapi.sql.compile.TypeCompiler;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseStatefulBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:storm/starter/StatefulTopology.class */
public class StatefulTopology {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StatefulTopology.class);

    /* loaded from: input_file:storm/starter/StatefulTopology$PrinterBolt.class */
    public static class PrinterBolt extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            System.out.println(tuple);
            StatefulTopology.LOG.debug("Got tuple {}", tuple);
            basicOutputCollector.emit(tuple.getValues());
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"value"}));
        }
    }

    /* loaded from: input_file:storm/starter/StatefulTopology$StatefulSumBolt.class */
    private static class StatefulSumBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
        String name;
        KeyValueState<String, Long> kvState;
        long sum;
        private OutputCollector collector;

        StatefulSumBolt(String str) {
            this.name = str;
        }

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }

        public void execute(Tuple tuple) {
            this.sum += ((Number) tuple.getValueByField("value")).longValue();
            StatefulTopology.LOG.debug("{} sum = {}", this.name, Long.valueOf(this.sum));
            this.kvState.put(TypeCompiler.SUM_OP, Long.valueOf(this.sum));
            this.collector.emit(tuple, new Values(new Object[]{Long.valueOf(this.sum)}));
            this.collector.ack(tuple);
        }

        public void initState(KeyValueState<String, Long> keyValueState) {
            this.kvState = keyValueState;
            this.sum = ((Long) this.kvState.get(TypeCompiler.SUM_OP, 0L)).longValue();
            StatefulTopology.LOG.debug("Initstate, sum from saved state = {} ", Long.valueOf(this.sum));
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"value"}));
        }
    }

    public static void main(String[] strArr) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new RandomIntegerSpout());
        topologyBuilder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout");
        topologyBuilder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum");
        topologyBuilder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
        Config config = new Config();
        config.setDebug(false);
        if (strArr != null && strArr.length > 0) {
            config.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(strArr[0], config, topologyBuilder.createTopology());
            return;
        }
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("test", config, topologyBuilder.createTopology());
        Utils.sleep(40000L);
        localCluster.killTopology("test");
        localCluster.shutdown();
    }
}
