Processamento de Dados Massivos/Projeto e implementação de aplicações Big Data/Processamento de streams de tweets: diferenças entre revisões

Origem: Wikilivros, livros abertos por um mundo aberto.
Conteúdo apagado Conteúdo adicionado
Criou nova página com '=Geeser - A tool for processing raw tweet streams= ==Application Description== ===Definition=== Geeser Project intends to give a toolbox for data analyists to work with m...'
(Sem diferenças)

Revisão das 14h13min de 14 de fevereiro de 2013

Geeser - A tool for processing raw tweet streams

Application Description

Definition

Geeser Project intends to give a toolbox for data analyists to work with massive tweet streams in a distributed and scalable environment. For that, Geeser is based on Storm, an open-source fault-tolerant stream processing system. Geeser implements natural language processing activities and simple statistics, such as word counting and trending topics.

In this first stage of the project (that is shown here), we are proposing a distributed way to preprocess tweet streams to extract main textual features. More than that, we are showing practical use of Storm stream processing framework in a challenging context.

Context

Processing tweet streams is a great challenge for data analysts and for Twitter's infrastructure as well. Storm was developed specially to tackle the problem of processing such streams. In this context, the amount of messages may vary through time. Consequently, the load of the system is not constant.

At this stage it is possible to process each message individually. Then it is quite easy to distribute this algorithm. The challenge here is to minimize bandwidth usage. Another goal is to create a system that is robust enough to hold the unbalanced load through time.

Storm Framework

Storm is a distributed, fault-tolerant realtime computation system. It was originaly cocieved by Twitter on a successfull attempt to distribute it's processing workload. bal bal abel

Main Concepts

Storm is mainly composed by 2 types of processing units: Bolts and Spouts. Spouts generate streams, while Bolts consume and produce new streams in it's output. It's model is similar to the traditional producer-consumer approach seen on early networks applications. The simplicity of producer-consumer aproach works out to be one storm greatest strenghts.

{fig 1}

The graph generated by the conection of bolts and spouts is called topology. It maps what the cluster will do. It is a very simple abstraction that helps software engeneers to work the modules spearatelly. The comunication protocol between the processing units is based on tuples, similarlly to JSON. Therefore, in CAP Theorem, Storm attends to the Avalability and Partition Tolerance requirement. Because tuples might get out of order in the the process.

Storm permits multiple processes in each Spout and Bolt allowing data paralelism inside the processing unit. It is a good idea to make tuples as indepent as possible in order to use the system full parallelism capacity.

Components

Storm's requires 3 main software components to work: Nimbus, Zookeeper, and Workers. Nimbus is the component responsible for code deployment on the worker nodes. Tha Apache Zookeeper is a software for load control on the nodes. Zookeeper load is quite low since its only function is to choose which node will process the next tuple. If fault tolerance is a requirement, the number of Zookeerper processes should be increased, for most cases, only one running is enough. For details on how to install such requirements, check on the Install section.

{fig 2}

The system bootstrap works as follows. All worker nodes report to the Zookeeper as soon as the code is submited to Nimbus. Then the binary code is submited to each worker node. When the worker nodes are ready to take a job, Zookeeper sends each node a tuple to be processed. And this is done until a spout sends a terminate signal.

Development tests

Storm has a mode for software testing in a single machine. This can be achieved by setting a single working node or with the code submissions method. Unit testing is also quite easy since we can use Java for that.

Multilanguage

Is easy to implement Bolts and Spouts in other languages than Java. The simplest way to do that is implementing ShellBolt class (I am explaning that on the API section).

Storm's API

Storm has a mode for software testing in a single machine. This can be achieved by setting a single working node or with the code submissions method

Code Submission methods

Apache Maven

Maven is a software project management tool that uses a project oriented model (POM), which is written in XML. It helps to build a jar package that is easy deliverable to nimbus. To use it we use the following command to compile it:

Leiningen

A alternative to maven is the Leiningen, which automates project similarly to Maven but it is written to work with Clojure methods. That way it is possible to write topologies em Clojure and deploy them using:

Major drawbacks

Network Every big data system has to worry with two major bottle necks: Network and processing. Storms deal fine with processing bottlenecks. The data parallelism and Zookeeper garantees a good load ddistribution. However, the system doesn't optimize the network usage. The tuples may have different sizes as the comunication between nodes may have different latencies and bandwidth.

I believe that there is room for optimizing the network bottleneck by dynamically adjusting the communication based on the link latency and on the network saturation. For instance, if the comunication between two bolts is very intensive in a moment perhabs it is best that both run on the same worker machine.

Dependency The system depends on a bunch of software that may loose compatibility in a long range support. This may be critical to storm project. There are some big data systems that are more stable because they depend on fewer third party software.

Examples of Storm Usage

In this section I am covering simple usages of storm that help to didacally understand how storm work. In here we are working with examples ssen in the storm-starter.

Exclamation Topology

The exclamation topology is a very simple topology that has only one objective: Add exclamation marks at the end of random words. In this example, we have two instances of the same object ExclamationBolt. The tuple in this case is just a simple string. One interesting fact in this example is that the order is not important so we can create a superscalar topology.

{fig 3}

In this case, we have 10 processes for the spout, 3 for Exclamation Bolt 1 and 2 for exclamation Bolt 2. The run is quite fast and the overhead is only done.

Word Count Topology

The Word Count topology is another simple topology that is used to count words in sentences. For that, a spout randomly emits a set of 5 different sentences. Then, there is a bolt implemented in Python to split the sentences. Finally, a bolt to count word frequencies:

{fig 4}

Maybe a good improvement in this Topology would be adding some persistency in the word count structure. That way it would be possible for another bolt to consult a certain word frequency. I am showing this on the following sections. For now, I am focusing on the sytax and the results and the implementation of this.

Single Join Topology

This is the most complex example in this section. In here it is necessary to implement methods that do the join considering that the tuples came unordered. For that, the communication buffer is used to wait until the correct example. Also, timeouts are used to solve starvation problems in this approach. Because of that Joins in Storm's topologies might introduce bottle necks that should be avoided at all costs

{fig 5}

Requirements

As it was said in previously, the main requirement in our system is scalability. It is necessary that we can process a high volume of tweets in the same time. To make matters even worse, we have to deal with a highly dinamic load.

Other important feature of this system is that it need to be fault tolerant. We can't allow to loose a stream due to a machine crash or a network failure.

Opportunities

The main parallelism opportunities in this project is the possibility to include other activities running while the preprocessing run. And this is quite easy to be done using Storm framework. In here, I am showing a simple application that

Project

Stages

The Geeser Project propose several stages. In each stage, a different activity is added to work on the stream parallely. For instance, word count and trending topics on second stage and entity disambiguation on the third stage.

For each stage, I propose to write a set of spouts and bolts that are necessary to reach the corresponding objective. That way, developers can mount a topology according their necessity.

Communication patterns

The communication will be fully detailed in the project final API. It is based on the non-structured database JSON. This is a standard communication on Storm's protocol, which is based only on tuples.

This is a simple protocol and ideal to work on databases which tends to have several processing nodes that are independent.

Proposed topologies on Geeser's first stage

Basically, the main topology proposed on the first stage is composed of a single spout which reads the tweet stream from a file to simulate a connection to Twitter's API. This was the only option since the virtual machines had only 10 GB of space.

The following bolts, projects the full JSON tweet to comunicate only the text of the message to the next bolt. This following bolt will process those texts.

Implementation

The Storm nodes need to implement some functions to work:

Code Examples

Each spout and bolt is a class. Multilang bolts may be implemented using classes that inherits ShelBolt, which runs the bolt in a virtual shell.

The comunication is implemented by the mother class. Usually, we use BaseBasicBolt and BaseRichSpout for implementing bolts and spouts. In those cases, we have to implement some functions that concerns the activity done by the bolt or spout.

Bolt Interface

prepare() This funcion is called before any tuple is transmited to a certain bolt.

execute(Tuple tuple, BasicOutputCollector collector) This funcion is called when a new tuple gets to the bolt. It should call other funcions important to processing. And it is done call collector.emit(Values) to send another tuple to the next bolt

ack(); Bolts may also add ack to be sure that a tuple was delivered to the next bolt.

declareOutputFields(OutputFieldsDeclarer declarer) Declare the Output Filed on the outgoing tuple.

getComponentConfiguration() Get the fields in the Ingoing tuple.

Communication

Communication process is completely abstracted from the developer. We only need to minimize the comsuption of this resourse since, as I said previously, Storm does not manage it well.


Python Bolts

Avaliation

Load on Bolts

Nimbus Logs