Processamento de Dados Massivos/Projeto e implementação de aplicações Big Data/Processamento de streams de tweets

Origem: Wikilivros, livros abertos por um mundo aberto.

Geeser - A tool for processing raw tweet streams[editar | editar código-fonte]

Application Description[editar | editar código-fonte]

Definition[editar | editar código-fonte]

Geeser Project intends to give a toolbox for data analyists to work with massive tweet streams in a distributed and scalable environment. For that, Geeser is based on Storm, an open-source fault-tolerant stream processing system. Geeser implements natural language processing activities and simple statistics, such as word counting and trending topics.

In this first stage of the project (that is shown here), we are proposing a distributed way to preprocess tweet streams to extract main textual features. More than that, we are showing practical use of Storm stream processing framework in a challenging context. It is import to stress that this project do not aim do optimize or speedup any specific algorithm. My only concern is to make it feasible to run some algorithms in massive streams.

Context[editar | editar código-fonte]

Processing tweet streams is a great challenge for data analysts and for Twitter's infrastructure as well. Storm was developed specially to tackle the problem of processing such streams. In this context, the amount of messages may vary through time. Consequently, the load of the system is not constant.

At this stage it is possible to process each message individually. Then it is quite easy to distribute this algorithm. The challenge here is to minimize bandwidth usage. Another goal is to create a system that is robust enough to hold the unbalanced load through time.

Storm Framework[editar | editar código-fonte]

Storm is a distributed, fault-tolerant realtime computation system. It was originaly cocieved by Twitter on a successfull attempt to distribute it's processing workload.

Main Concepts[editar | editar código-fonte]

Storm is mainly composed by 2 types of processing units: Bolts and Spouts. Spouts generate streams, while Bolts consume and produce new streams in it's output. It's model is similar to the traditional producer-consumer approach seen on early networks applications. The simplicity of producer-consumer aproach works out to be one storm greatest strenghts.

Example of storm topology
Example of storm topology


The graph generated by the conection of bolts and spouts is called topology. It maps what the cluster will do. It is a very simple abstraction that helps software engeneers to work the modules spearatelly. The comunication protocol between the processing units is based on tuples, similarlly to JSON. Therefore, in CAP Theorem, Storm attends to the Avalability and Partition Tolerance requirement. Because tuples might get out of order in the the process.

Storm permits multiple processes in each Spout and Bolt allowing data paralelism inside the processing unit. It is a good idea to make tuples as indepent as possible in order to use the system full parallelism capacity.

Components[editar | editar código-fonte]

Storm's requires 3 main software components to work: Nimbus, Zookeeper, and Workers. Nimbus is the component responsible for code deployment on the worker nodes. Tha Apache Zookeeper is a software for load control on the nodes. Zookeeper load is quite low since its only function is to choose which node will process the next tuple. If fault tolerance is a requirement, the number of Zookeerper processes should be increased, for most cases, only one running is enough. For details on how to install such requirements, check on the Install section.

Storm Components
Storm Components

The system bootstrap works as follows. All worker nodes report to the Zookeeper as soon as the code is submited to Nimbus. Then the binary code is submited to each worker node. When the worker nodes are ready to take a job, Zookeeper sends each node a tuple to be processed. And this is done until a spout sends a terminate signal.

Development tests[editar | editar código-fonte]

Storm has a mode for software testing in a single machine. This can be achieved by setting a single working node or with the code submissions method. Unit testing is also quite easy since we can use Java for that.

Multilanguage[editar | editar código-fonte]

Is easy to implement Bolts and Spouts in other languages than Java. The simplest way to do that is implementing ShellBolt class (I am explaning that on the API section).

Storm's API[editar | editar código-fonte]

Storm has a mode for software testing in a single machine. This can be achieved by setting a single working node or with the code submissions method

Code Submission methods[editar | editar código-fonte]

Apache Maven

Maven is a software project management tool that uses a project oriented model (POM), which is written in XML. It helps to build a jar package that is easy deliverable to nimbus. To use it we use the following command to compile it:

Leiningen

A alternative to maven is the Leiningen, which automates project similarly to Maven but it is written to work with Clojure methods. That way it is possible to write topologies em Clojure and deploy them using:

Major drawbacks[editar | editar código-fonte]

Network Every big data system has to worry with two major bottle necks: Network and processing. Storms deal fine with processing bottlenecks. The data parallelism and Zookeeper garantees a good load ddistribution. However, the system doesn't optimize the network usage. The tuples may have different sizes as the comunication between nodes may have different latencies and bandwidth.

I believe that there is room for optimizing the network bottleneck by dynamically adjusting the communication based on the link latency and on the network saturation. For instance, if the comunication between two bolts is very intensive in a moment perhabs it is best that both run on the same worker machine.

Dependency The system depends on a bunch of software that may loose compatibility in a long range support. This may be critical to storm project. There are some big data systems that are more stable because they depend on fewer third party software.

Examples of Storm Usage[editar | editar código-fonte]

In this section I am covering simple usages of storm that help to didacally understand how storm work. In here we are working with examples ssen in the storm-starter.

Exclamation Topology[editar | editar código-fonte]

The exclamation topology is a very simple topology that has only one objective: Add exclamation marks at the end of random words. In this example, we have two instances of the same object ExclamationBolt. The tuple in this case is just a simple string. One interesting fact in this example is that the order is not important so we can create a superscalar topology.

Exclamation Topology
Exclamation Topology

In this case, we have 10 processes for the spout, 3 for Exclamation Bolt 1 and 2 for exclamation Bolt 2. The run is quite fast and the overhead is only done. We can see a sample of the output in the figure below

Output from the Explamation Topology run

Word Count Topology[editar | editar código-fonte]

The Word Count topology is another simple topology that is used to count words in sentences. For that, a spout randomly emits a set of 5 different sentences. Then, there is a bolt implemented in Python to split the sentences. Finally, a bolt to count word frequencies:

WordCountTopology
WordCountTopology

Maybe a good improvement in this Topology would be adding some persistency in the word count structure. That way it would be possible for another bolt to consult a certain word frequency. I am showing this on the following sections. For now, I am focusing on the sytax and the results and the implementation of this. Bellow, an example of the output:

Output from the Word Count Topology run

Single Join Topology[editar | editar código-fonte]

This is the most complex example in this section. In here it is necessary to implement methods that do the join considering that the tuples came unordered. For that, the communication buffer is used to wait until the correct example. Also, timeouts are used to solve starvation problems in this approach. Because of that Joins in Storm's topologies might introduce bottle necks that should be avoided at all costs

SingleJoinTopology
SingleJoinTopology

Output from the Single Join Topology

Requirements[editar | editar código-fonte]

As it was said in previously, the main requirement in our system is scalability. It is necessary that we can process a high volume of tweets in the same time. To make matters even worse, we have to deal with a highly dinamic load.

Other important feature of this system is that it need to be fault tolerant. We can't allow to loose a stream due to a machine crash or a network failure.

Opportunities[editar | editar código-fonte]

The main parallelism opportunities in this project is the possibility to include other activities running while the preprocessing run. And this is quite easy to be done using Storm framework. In here, I am showing a simple application that

Project[editar | editar código-fonte]

Stages[editar | editar código-fonte]

The Geeser Project propose several stages. In each stage, a different activity is added to work on the stream parallely. For instance, word count and trending topics on second stage and entity disambiguation on the third stage.

For each stage, I propose to write a set of spouts and bolts that are necessary to reach the corresponding objective. That way, developers can mount a topology according their necessity:

  1. Stage 1: Basic Spouts and Raw Tweet textual processing
  2. Stage 2: Word Counting and Trending Topics Bolts
  3. Stage 3: Entity Disambiguation Bolt

Communication patterns[editar | editar código-fonte]

The communication will be fully detailed in the project final API. It is based on the non-structured database JSON. This is a standard communication on Storm's protocol, which is based only on tuples.

This is a simple protocol and ideal to work on databases which tends to have several processing nodes that are independent.

Proposed topologies on Geeser's first stage[editar | editar código-fonte]

Basically, the main topology proposed on the first stage is composed of a single spout which reads the tweet stream from a file to simulate a connection to Twitter's API. This was the only option since the virtual machines had only 10 GB of space.

The following bolts, projects the full JSON tweet to comunicate only the text of the message to the next bolt. This following bolt will process those texts.

Proposed Tweet Processing Topology
Proposed Tweet Processing Topology

Implementation[editar | editar código-fonte]

The Storm nodes need to implement some functions to work:

Code Examples[editar | editar código-fonte]

Each spout and bolt is a class. Multilang bolts may be implemented using classes that inherits ShelBolt, which runs the bolt in a virtual shell.

The comunication is implemented by the mother class. Usually, we use BaseBasicBolt and BaseRichSpout for implementing bolts and spouts. In those cases, we have to implement some functions that concerns the activity done by the bolt or spout.

Bolt Interface[editar | editar código-fonte]

void prepare()

This funcion is called before any tuple is transmited to a certain bolt.

int execute(Tuple tuple, BasicOutputCollector collector)

This funcion is called when a new tuple gets to the bolt. It should call other funcions important to processing. And it is done call collector.emit(Values) to send another tuple to the next bolt

void fail(Object id);
void ack(Object id);

Bolts may also add ack to be sure that a tuple was delivered to the next bolt.

void declareOutputFields(OutputFieldsDeclarer declarer)

Declare the Output Filed on the outgoing tuple.

Map<String, Object> getComponentConfiguration()

Get the configuration of the bolt set by the conf class.

Spout Interface[editar | editar código-fonte]

void open()

Execute before the first tuple is sent. It can be used, for instance, to establish connections to Twitter's sample API

void nextTuple()

Iterates over each tuple generated. Emiting it using emit function.

void close()

It is called when nextTuple() returns. Used for finishing connections

Map<String, Object> getComponentConfiguration()

Get the configuration of the spout set by the conf class.

void fail(Object id) 
void ack(Object id)

Dealing with communication fails.

void declareOutputFields(OutputFieldsDeclarer declarer)

Declare the Output Filed on the outgoing tuple.

Topology Builder[editar | editar código-fonte]

Here is the example of the topology builder:

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new TwitterFileSpout(), 1);
        builder.setBolt("project", new FilterTweet(), 5)
                 .shuffleGrouping("spout");
        builder.setBolt("filter", new FilterTweet(), 5)
                 .shuffleGrouping("project");

        builder.setBolt("print", new PrinterBolt(), 1)
                 .shuffleGrouping("filter");

        Config conf = new Config();
        conf.setDebug(false);

        conf.setNumWorkers(3);
        if(args!=null && args.length > 0) {
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            cluster.shutdown();
        }
    }

Communication[editar | editar código-fonte]

Communication process is completely abstracted from the developer. We only need to minimize the consuption of network bandwith since Storm does not manage it well.

Python Bolts[editar | editar código-fonte]

We have python libraries that implements Storm's protocol. It is quite easy to use such libraries. I am showing the code used of the language cleaner bolt. In that code, tup is the incoming tuple. and emit sends the outgoing tuple.

import storm
class StreamFilterBolt(storm.BasicBolt):
    def process(self, tup):
        try:
                text = cleanText(tup.values[0])
        except:
                text = ''
        storm.emit([text])

StreamFilterBolt().run()

Avaliation[editar | editar código-fonte]

Considering the requirements of this project which focus scalability and fault tolerance over latency. I evaluated the Storm capabilities of distributing process and making a massive stream scalable to be processed. The main results of this project is to build the foundation to many bolts and spouts designed specially to the Web Observatory project. It also helps the software development processes because each bolt is a black box that may be implemented by many different people in different languages.

Load on Bolts[editar | editar código-fonte]

For simple Twitter jobs, Storm managed to distribute jobs in a fair way. None worker node recived more jobs than another. This is a good result that shows that our system scales well enough for a huge twitter load.

Load on Bolts
Load on Bolts

Nimbus Logs[editar | editar código-fonte]

Now I am showing the final output of the proposed topology. It process raw tweet to return trigrams without hashtags, char repetition, links, trigrams generations and other details.

Tweet Processing Topology log
Tweet Processing Topology log

Conclusion[editar | editar código-fonte]

In this project, it is shown that it is possible to create a complex distributed system for processing massive tweet streams. This system is scalable and very flexible. Topologies can be modified for several different purposes making it ideal to the Web Observatory and Data Science research projects. Even though, speedup gains may be not aplicable (since we should be able to serially process big data in a single computer), there is an enourmous gain in scalability.

The main contribution here is to check the viability and add knowledge to Web Observatory of Storm framework. This framework helps not only software development but make it possible to deal with massive streams. The gain for this project is imensurable in many terms.