package org.apache.storm.starter.bolt;

import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.storm.starter.tools.NthLastModifiedTimeTracker;
import org.apache.storm.starter.tools.SlidingWindowCounter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
import org.nustaq.serialization.coders.FSTJsonEncoder;

/* loaded from: input_file:org/apache/storm/starter/bolt/RollingCountBolt.class */
public class RollingCountBolt extends BaseRichBolt {
    private static final long serialVersionUID = 5537727428628598519L;
    private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
    private static final int NUM_WINDOW_CHUNKS = 5;
    private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = 300;
    private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 60;
    private static final String WINDOW_LENGTH_WARNING_TEMPLATE = "Actual window length is %d seconds when it should be %d seconds (you can safely ignore this warning during the startup phase)";
    private final SlidingWindowCounter<Object> counter;
    private final int windowLengthInSeconds;
    private final int emitFrequencyInSeconds;
    private OutputCollector collector;
    private NthLastModifiedTimeTracker lastModifiedTracker;

    public RollingCountBolt() {
        this(300, 60);
    }

    public RollingCountBolt(int i, int i2) {
        this.windowLengthInSeconds = i;
        this.emitFrequencyInSeconds = i2;
        this.counter = new SlidingWindowCounter<>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
    }

    private int deriveNumWindowChunksFrom(int i, int i2) {
        return i / i2;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds));
    }

    public void execute(Tuple tuple) {
        if (!TupleUtils.isTick(tuple)) {
            countObjAndAck(tuple);
        } else {
            LOG.debug("Received tick tuple, triggering emit of current window counts");
            emitCurrentWindowCounts();
        }
    }

    private void emitCurrentWindowCounts() {
        Map<Object, Long> countsThenAdvanceWindow = this.counter.getCountsThenAdvanceWindow();
        int secondsSinceOldestModification = this.lastModifiedTracker.secondsSinceOldestModification();
        this.lastModifiedTracker.markAsModified();
        if (secondsSinceOldestModification != this.windowLengthInSeconds) {
            LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, Integer.valueOf(secondsSinceOldestModification), Integer.valueOf(this.windowLengthInSeconds)));
        }
        emit(countsThenAdvanceWindow, secondsSinceOldestModification);
    }

    private void emit(Map<Object, Long> map, int i) {
        for (Map.Entry<Object, Long> entry : map.entrySet()) {
            this.collector.emit(new Values(new Object[]{entry.getKey(), entry.getValue(), Integer.valueOf(i)}));
        }
    }

    private void countObjAndAck(Tuple tuple) {
        this.counter.incrementCount(tuple.getValue(0));
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{FSTJsonEncoder.OBJ, "count", "actualWindowLengthInSeconds"}));
    }

    public Map<String, Object> getComponentConfiguration() {
        HashMap hashMap = new HashMap();
        hashMap.put("topology.tick.tuple.freq.secs", Integer.valueOf(this.emitFrequencyInSeconds));
        return hashMap;
    }
}
