package org.apache.storm.starter;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.generated.AccessControl;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.tools.ant.taskdefs.WaitFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/starter/BlobStoreAPIWordCountTopology.class */
public class BlobStoreAPIWordCountTopology {
    private static ClientBlobStore store;
    private static String key = "key";
    private static String fileName = "blacklist.txt";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlobStoreAPIWordCountTopology.class);

    /* loaded from: input_file:org/apache/storm/starter/BlobStoreAPIWordCountTopology$FilterWords.class */
    public static class FilterWords extends BaseBasicBolt {
        boolean poll = false;
        long pollTime;
        Set<String> wordSet;

        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String string = tuple.getString(0);
            try {
                if (!this.poll) {
                    this.wordSet = BlobStoreAPIWordCountTopology.parseFile(BlobStoreAPIWordCountTopology.fileName);
                    this.pollTime = System.currentTimeMillis();
                    this.poll = true;
                } else if (System.currentTimeMillis() - this.pollTime > 5000) {
                    this.wordSet = BlobStoreAPIWordCountTopology.parseFile(BlobStoreAPIWordCountTopology.fileName);
                    this.pollTime = System.currentTimeMillis();
                }
                if (this.wordSet == null || this.wordSet.contains(string)) {
                    return;
                }
                basicOutputCollector.emit(new Values(new Object[]{string}));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"word"}));
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/BlobStoreAPIWordCountTopology$RandomSentenceSpout.class */
    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector _collector;

        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this._collector = spoutOutputCollector;
        }

        public void nextTuple() {
            Utils.sleep(100L);
            this._collector.emit(new Values(new Object[]{BlobStoreAPIWordCountTopology.access$000()}));
        }

        public void ack(Object obj) {
        }

        public void fail(Object obj) {
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"sentence"}));
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/BlobStoreAPIWordCountTopology$SplitSentence.class */
    public static class SplitSentence extends ShellBolt implements IRichBolt {
        public SplitSentence() {
            super(new String[]{"python", "splitsentence.py"});
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"word"}));
        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

    public static void prepare() {
        Config config = new Config();
        config.putAll(Utils.readStormConfig());
        store = Utils.getClientBlobStore(config);
    }

    public void buildAndLaunchWordCountTopology(String[] strArr) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new RandomSentenceSpout(), 5);
        topologyBuilder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        topologyBuilder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
        Config config = new Config();
        config.setDebug(true);
        try {
            config.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(strArr[0], config, topologyBuilder.createTopology());
        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private static void createBlobWithContent(String str, ClientBlobStore clientBlobStore, File file) throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException {
        AccessControl parseAccessControl = BlobStoreAclHandler.parseAccessControl("o::rwa");
        LinkedList linkedList = new LinkedList();
        linkedList.add(parseAccessControl);
        AtomicOutputStream createBlob = clientBlobStore.createBlob(str, new SettableBlobMeta(linkedList));
        createBlob.write(readFile(file).toString().getBytes());
        createBlob.close();
    }

    private static void updateBlobWithContent(String str, ClientBlobStore clientBlobStore, File file) throws KeyNotFoundException, AuthorizationException, IOException {
        AtomicOutputStream updateBlob = clientBlobStore.updateBlob(str);
        updateBlob.write(readFile(file).toString().getBytes());
        updateBlob.close();
    }

    private static String getRandomSentence() {
        String[] strArr = {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};
        return strArr[new Random().nextInt(strArr.length)];
    }

    private static Set<String> getRandomWordSet() {
        HashSet hashSet = new HashSet();
        Random random = new Random();
        String[] strArr = {"cow", "jumped", "over", "the", "moon", "apple", WaitFor.Unit.DAY, "doctor", "away", "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two"};
        for (int i = 0; i < 5; i++) {
            hashSet.add(strArr[random.nextInt(strArr.length)]);
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Set<String> parseFile(String str) throws IOException {
        File file = new File(str);
        HashSet hashSet = new HashSet();
        if (!file.exists()) {
            return hashSet;
        }
        StringTokenizer stringTokenizer = new StringTokenizer(readFile(file).toString(), "\r\n");
        while (stringTokenizer.hasMoreElements()) {
            hashSet.add(stringTokenizer.nextToken());
        }
        LOG.debug("parseFile {}", hashSet);
        return hashSet;
    }

    private static StringBuilder readFile(File file) throws IOException {
        StringBuilder sb = new StringBuilder();
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return sb;
            }
            sb.append(readLine);
            sb.append(System.lineSeparator());
        }
    }

    public static File createFile(String str) throws IOException {
        File file = new File(str);
        if (!file.exists()) {
            file.createNewFile();
        }
        writeToFile(file, getRandomWordSet());
        return file;
    }

    public static File updateFile(File file) throws IOException {
        writeToFile(file, getRandomWordSet());
        return file;
    }

    public static void writeToFile(File file, Set<String> set) throws IOException {
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file, false));
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            bufferedWriter.write(it.next());
            bufferedWriter.write(System.lineSeparator());
        }
        bufferedWriter.close();
    }

    public static void main(String[] strArr) {
        prepare();
        BlobStoreAPIWordCountTopology blobStoreAPIWordCountTopology = new BlobStoreAPIWordCountTopology();
        try {
            File createFile = createFile(fileName);
            createBlobWithContent(key, store, createFile);
            blobStoreAPIWordCountTopology.buildAndLaunchWordCountTopology(strArr);
            for (int i = 0; i < 10; i++) {
                updateBlobWithContent(key, store, updateFile(createFile));
                Utils.sleep(5000L);
            }
        } catch (KeyAlreadyExistsException e) {
            LOG.info("Key already exists {}", e);
        } catch (AuthorizationException | KeyNotFoundException | IOException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    static /* synthetic */ String access$000() {
        return getRandomSentence();
    }
}
