Saltar para o conteúdo

Processamento de Dados Massivos/Projeto e implementação de aplicações Big Data/Avaliação do algoritmo PageRank

Origem: Wikilivros, livros abertos por um mundo aberto.

Descrição da Aplicação

[editar | editar código-fonte]

Denominação

[editar | editar código-fonte]

Algoritmo PageRank.

A busca de informação não é uma tarefa trivial. A área da recuperação de informação vem lidando com esse problema há décadas, antes mesmo da criação da Web, desenvolvendo sistemas para busca de jornais, artigos científicos e patentes. A busca na Web se torna mais difícil ainda, devido a sua natureza evolutiva de constante crescimento e modificação.

Uma solução proposta para o problema de busca de páginas na Web que é utilizado pelo Google é o algoritmo PageRank. Ele tenta medir a importância de cada página levando em consideração a topologia da rede formada pelas ligações entre as páginas através dos hyperlinks. Mesmo sendo proposto inicialmente para páginas da Web, ele pode ser aplicado em qualquer rede, onde o resultado final do algoritmo é um valor numérico atribuído a cada nodo, medindo sua respectiva influência naquela rede.

Numa rede com n nodos, atribui-se inicialmente o valor de 1/n para cada um. Depois disso, realizamos uma regra de atualização um determinado número k de vezes. A regra de atualização é a seguinte: cada nodo compartilha seu valor atual de PageRank igualmente com todos os nodos que segue, e cada nodo atualiza seu valor para ser a soma das partes que recebeu. A teoria do PageRank diz que até mesmo uma pessoa que clica em links aleatoriamente vai, eventualmente, parar de clicar. A probabilidade, a qualquer passo, que uma pessoa continue clicando é dado por um fator de amortecimento (damping factor) d, que geralmente é configurado para ter o valor 0,85. O fator de amortecimento é subtraído de 1 e o resultado é adicionado ao produto do fator de amortecimento pela soma das partes que o nodo recebeu.

1  para cada vertice :
2      
3      
4  para cada iteracao , enquanto :
5      para cada vertice :
6          
7      

Exemplo de Funcionamento

[editar | editar código-fonte]

Na figura abaixo é possível visualizar os nodos de um grafo com seus respectivos valores de PageRank. Podemos observar que o vértice B é o que tem o maior valor, e pode ser considerado o mais relevante, de acordo com o conceito do algoritmo. Observe também que o vértice E, mesmo tendo muitas arestas de entrada, não tem um PageRank alto, o que indica que os nodos que estão ligados a ele não tem tanta relevância. Da mesma forma, o vértice C tem um PageRank alto não por ter muitas arestas, mas justamente por ser referenciado pelo vértice B.

Fase "Apply" do GraphLab
  • Escalabilidade: A escalabilidade é necessária devido ao fato da rede ser dinâmica, ou seja, novos vértices e ligações podem ser criados ou removidos a qualquer instante, e do grande volume de dados que chega na faixa dos milhões de vértices e bilhões de arestas.
  • Armazenamento: A aplicação não demanda muito armazenamento em memória principal devido para o cálculo em si, devido às propriedades comutativa e associativa do cálculo do valor PageRank, não sendo necessário armazenar num mesmo instante todos os valores utilizados. Entretanto, devido a magnitude dos grafos trabalhados na aplicação, é necessário uma quantidade significativa de espaço em disco para armazenar o grafo. Já o resultado (saída) do algoritmo demanda pouco espaço, pois é necessário apenas os IDs dos vértices e seus respectivos valores de PageRank finais.
  • Latência: O dinamismos da rede torna o cálculo do PageRank necessário constantemente. A taxa de atualização depende do cenário de aplicação, podendo ser semanal, diário ou até mesmo horário. Nesse caso, o cálculo do valor de PageRank de todos os vértices deve ser feito em tempo hábil.
  • Tolerância a falhas: O algoritmo necessita que os valores de PageRank dos vértices estejam sincronizados em relação às iterações, ou seja, somente valores de uma mesma iteração devem ser utilizados para atualização. Como para o cálculo de um vértice é necessário os valores de todos os vizinhos, é necessário que haja uma estabilidade dos nodos de computação.

Paralelizações existentes

[editar | editar código-fonte]

As primeiras propostas de paralelização do algoritmo PageRank foram desenvolvidas implementando a versão matricial do algoritmo em resolvedores de sistema linear paralelos[1].

Outra estratégia utilizada é o esquema "URL Host", que aproveita a estrutura hierárquica da WEB para fazer a parelização. Basicamente o que ele faz é agrupar os vértices de página de um mesmo domínio (host) e fazer a computação distribuída. Apesar de eficiente, essa estratégia fica limitada a grafos que representem páginas de internet, sendo inadequada para uma abordagem mais genérica[2].

Existem algumas versões de PageRank que fornecem valores aproximados, que na maioria das vezes é justificado pela eficiência no tempo de execução, e uma vertente desses algoritmos faz uma abordagem paralela. Uma dessas abordagens utiliza um esquema MapReduce que utiliza um modelo probabilístico e utiliza o método de Monte Carlo para calcular os valores. [3]

Há ainda tentativas de implementação do algoritmo em placas gráficas (GPUs), que são arquiteturas fortemente paralelas e com enorme poder de processamento[4]. Apesar de eficientes, a maioria dessas abordagens fica limitada pela memória local limitada, então para aplicações práticas de alto porte, é necessária uma estratégia auxiliar.

Oportunidades de paralelização

[editar | editar código-fonte]

Como o cálculo do rank de um nodo em uma iteração depende do rank na iteração anterior dos nodos que apontam para este nodo, este passo de calcular o novo rank pode ser executado paralelamente para cada nodo.

O valor do rank anterior não pode ser mudado enquanto esse passo é executado e isso pode ser evitado. Ao terminar esse passo, é necessário sincronizar as linhas de execução para que a próxima iteração leia valores de ranks atualizados.

A paralelização pode ser ampliada se os somatórios de ranks forem divididos. A lista de nodos apontadores seria dividida e o rank parcial seria calculado separadamente, já que soma em ordem diferente tem o mesmo resultado. Entretanto isso cria uma etapa adicional para combinar os resultados parciais.

Entretanto essa oportunidade paralelização não garante balanceamento de carga entre cada thread, porque enquanto um nodo pode ser apontado por apenas um nodo, outro pode ser apontado por inúmeros. O balanceamento pode ser melhorado fazendo as threads somarem números aproximados de nodos; os nodos que tem mais apontadores seriam divididos entre mais threads, enquanto uma thread pode fazer o somatório para mais de um nodo.

Padrões de acesso aos dados

[editar | editar código-fonte]

Cada nodo tem como dados o cojunto de nodos para o qual ele aponta e o valor de page rank atual. É necessário que o nodo em cada iteração veja um grafo invertido, que tenha a lista de nodos que apontam para ele. E ele precisará saber o valor do rank atual desses nodos junto com o número de nodos para os quais eles apontam ou apenas a divisão entre esses dois valores.

Quando um algoritmo paralelo executa no mesmo computador, as threads podem ter memória compartilhada e isso permite que cada thread leia todas as informações que precisar sobre o grafo e o custo disso é relativamente baixo.

Quando o volume de dados é muito grande, o processamento teria tempo de execução longo e os dados podem não caber em um só disco rígido. A solução seria fazer processamento é distribuído. Isso requer que cada computador do cluster receba os dados necessários dos nodos que ele precisará e ao final de cada iteração o nodo precisará saber o novo valor de rank de cada nodo que ele precisa e isso gera muitas mensagens de comunicação na rede.

Padrões de comunicação

[editar | editar código-fonte]

É necessário fazer comunicação para sincronizar as threads no final de cada iteração para que o novo valor de rank seja atualizado no momento certo para não prejudicar as outras threads que ainda estejam executando e também para começar a próxima iteração lendo valores de rank atualizados. Essa sincronização pode ser vista como duas barreiras, em que a primeira espera todos os somatórios terminar e a segunda espera todos os ranks serem atualizados.

Em processamento distribuído, o novo valor de rank deve ser enviado para todas as threads que usarão esse valor na próxima iteração. Assim, o número de mensagens trocadas na rede em cada iteração seria igual ao número de arestas do grafo, e esse grande número de mensagens pode sobrecarregar a rede.

Linha do tempo integrada

[editar | editar código-fonte]

Quando há uma thread por nodo, cada thread processa de maneira independente sua lista de apontadores e no final é necessário esperar todas as outras threads terminar para poder atualizar o rank do nodo e receber novos valores de rank. Além da comunicação para sincronização, em implementação distribuída, é necessário comunicação para enviar e receber valores do rank calculados durante a iteração.

Desenvolvimento

[editar | editar código-fonte]

A implementação do PageRank usando Hadoop executa um número de iterações definido como argumento do programa. Cada iteração tem uma fase de "map" e "reduce", além das fases de leitura de dados da iteração anterior e escrita para o próxima iteração. O programa começa lendo um arquivo de entrada contendo o grafo e carrega os dados do grafo no Hadoop.

O "damping factor" é um argumento opcional do programa. O valor padrão é 0.85 e para desativar basta definir o valor como 1.0.

Estratégias de paralelização

[editar | editar código-fonte]

A etapa "map" é executada paralelamente para cada dado. Ela tem como entrada a chave sendo o identificador do nodo e como valor o rank atual do nó. Cada nodo passa no "map" uma vez e tem como saída cada nodo apontado por ele e o valor de rank dividido pelo número de nodos apontados.

    public void map(
        Text uid,
        DoubleWritable data,
        OutputCollector<Text, DoubleWritable> output,
        Reporter reporter
    ) throws IOException {
        List<Text> followees = followees_list.get(uid);
        DoubleWritable dw = new DoubleWritable(data.get() / followees.size() );
        Text followee = new Text();
        for (Text f : followees  ){
            followee.set( f );
            output.collect(followee, dw);
        }
        output.collect(uid, new DoubleWritable(0)); //Para manter no grafo
    }

A etapa "reduce" tem como entrada a chave sendo o identificador do nodo e uma lista de valores de rank/número de nodos apontados já calculados na etapa anterior. Essa etapa vai somar os valores dessa lista e retornar o novo valor de rank para o nodo identificado pela chave.

    public void reduce(
        Text key, 
        Iterator<DoubleWritable> values, 
        OutputCollector<Text, DoubleWritable> output, 
        Reporter reporter
    ) throws IOException {
        double pagerank = 0;
        while (values.hasNext()){
            DoubleWritable dw = values.next();
            pagerank += dw.get();
        }
        pagerank *= damping_facotr;
        pagerank += first_term; // first = (1-damping_facotr)/number_of_nodes
        output.collect(key, new DoubleWritable(pagerank));
    }

Como foi falado, essa estratégia não garante balanceamento de carga na etapa "reduce", porque o tempo de execução para nodos com mais apontadores seria mais longo por ter mais valores no iterador para somar.

Estratégias de armazenamento

[editar | editar código-fonte]

Ao terminar uma iteração os dados são salvos no sistema de arquivos virtual distribuído do Hadoop (HDFS) para serem lidos no início da próxima iteração. Esse sistema de arquivos é abstraído para o programador.

Estratégias de comunicação

[editar | editar código-fonte]

A gerência da execução é feita pela função principal do programa que inicia cada iteração de map-reduce. Assim, o Hadoop é encarregado de criar as threads e dividir a carga, sincronizar e transportar dados nas etapas map e reduce, e escrever a saída da iteração.

Foi implementada a versão não normalizada do PageRank, ou seja, ao somar os valores de PageRank de todos os vértices o resultado é igual ao número de vértices, e não igual a 1. Observe que caso existam vértices "dangling" (não têm aresta de saída) a soma não sera totalizada, mas a ordenação relativa dos vértices será a mesma.

Estratégias de paralelização

[editar | editar código-fonte]

O GraphLab provê um paradigma de programação ideal para se trabalhar com grafos. O principal elemento da plataforma é o "Vertex Program", que é executado em cada vértice do grafo. O "Vertex Program" tem 3 fases de execução: (1) gather, que é executada nas arestas adjacentes ao vértice e retorna um valor, (2) apply, que agrega os valores retornados pela fase anterior e (3) scatter, que novamente é executado nas arestas adjacentes do vértice.

Utilizando esse pardigma, o algoritmo PageRank será feito por paralelização de dados a nível de vértice. Como para o cálculo do valor de PageRank de um vértice só é necessário valores de vértices adjacentes, temos uma ferramenta que se adequa muito bem para a resolução do problema. Cada vértice armazenará apenas seu respectivo valor de PageRank e a atuzalização será feita através de um "Vertex Program", que também armazena um valor auxíliar de erro.

Na fase "gather" apenas as arestas de entrada são utilizadas, das quais o vértice recebe os valores de PageRank para atualizar seu próprio valor. Cada aresta (u, v) ativa na fase "gather" vai retornar o valor PR(u)/O(u), que corresponde a um valor parcial da soma de atualização.

Fase "Gather" do GraphLab no algoritmo PageRank

A fase "apply" vai receber os valores parciais já somados, então basta atualizar o valor de acordo com o dumping factor. Além disso, ainda na fase "apply", vamos armazenar o valor de error calculando a diferença entre o novo e o antigo valor de PageRank, que será utilizado para decidir quais vértices deverão ser atualizados.

Fase "Apply" do GraphLab no algoritmo PageRank

A fase "scatter" é utilizada para ativar vértices que deverão ter seus valores atualizados na próxima iteração do algoritmo. Portanto, caso o valor de erro do vértice seja menor do que o valor máximo de tolerância, nenhuma aresta será ativa. Caso o valor de erro seja maior, serão ativas as arestas de saída. Cada aresta (u, v) ativa na fase "scatter" vai assinalar o vértice v para execução na próxima iteração. Ou seja, um vértice só será convocado para execução na próxima iteração se pelo menos um de seus vizinhos de entrada ainda não tiver convergido, pois são justamente os vértices utilizados para o cálculo de seu PageRank.

Fase "Scatter" do GraphLab no algoritmo PageRank

Estratégias de armazenamento

[editar | editar código-fonte]

O armazenamento é feito através do protocolo NFS, de forma que o acesso à base de dados seja transparente é igual entre os vértices. A base de dados pode ser dividida em arquivos, sendo uma opção de balanceamento explícito, entretanto como o protoclo já NFS provê o acesso aos vértices, optamos por deixar o balanceamento a carga do framework.

Estratégias de comunicação

[editar | editar código-fonte]

A comunicação será feita sempre entre vértices e de forma transparente, devido a utilização do paradigma de programação do GraphLab. É interessante observar que, como a versão do algoritmo PageRank implementada é a não normalizada, não é necessário o compartilhamento de valores globais entre os vértices, como o número totais de vértices, que seria necessário para a versão normalizada.

Implementação

[editar | editar código-fonte]

Plataformas e ferramentas

[editar | editar código-fonte]

Utilizamos um ambiente virtual com Ubuntu OpenStack com quatro servidores virtuais com Hadoop e GraphLab configurados para executar os programas distribuídos. Cada máquina virtual tem um processador de dois núcleos, 4GB de Ram e 10GB de HD, rodando Ubuntu 12.04.

Integração de plataformas e ferramentas

[editar | editar código-fonte]

Fizemos uma implementação em Apache Hadoop 1.0.4, usando linguagem Java. O Hadoop tem o recurso HDFS (Hadoop Distributes File System) para armazenamento de dados.

A versão do GraphLab utilizada foi a 2.1.4434, e a versão do MPI foi o MPICH2 versão 1.2.1. Além disso foi utilizado o daemon NFS do Linux para compartilhar as bases de dados.


Detalhes de implementação

[editar | editar código-fonte]

Fizemos uma implementação em Apache Hadoop 1.0.4, usando linguagem Java.

O Hadoop tem o recurso HDFS (Hadoop Distributes File System) para armazenamento de dados.

A implementação foi feita em linguagem C++, com o objetivo de ser mais genérica possível, podendo executar em qualquer tipo de grafo.

#include <string>
#include <fstream>

#include <graphlab.hpp>

#define DAMPING_FACTOR 0.85  
#define MAX_ERROR 1e-6

// Estruturas de Dados
typedef double Vertice;
typedef graphlab::empty Aresta;
typedef graphlab::distributed_graph<Vertice, Aresta> Grafo;

// Algoritmo
class PageRank :
    public graphlab::ivertex_program<Grafo, double>,
    public graphlab::IS_POD_TYPE 
{
    private:
        double error; 
    public:
        edge_dir_type gather_edges(icontext_type &context, const vertex_type &vertex) const {
            return graphlab::IN_EDGES;
        }
        gather_type gather(icontext_type &context, const vertex_type &vertex, edge_type &edge) const {
            return edge.source().data() / edge.source().num_out_edges();
        }
        void apply(icontext_type &context, vertex_type &vertex, const gather_type &total) {
            double valor_novo = (1 - DAMPING_FACTOR) + DAMPING_FACTOR*total;
            error = std::fabs(vertex.data() - valor_novo);
            vertex.data() = valor_novo;
        }
        edge_dir_type scatter_edges(icontext_type &context, const vertex_type &vertex) const { 
            if (error > MAX_ERROR) {
                return graphlab::OUT_EDGES;
            }
            else {
                return graphlab::NO_EDGES;
            }
        }
        void scatter(icontext_type &context, const vertex_type &vertex, edge_type &edge) const {
            context.signal(edge.target());
        }
};

void pagerank_valor_inicial(Grafo::vertex_type& vertice) {
    vertice.data() = 1.0; 
}

// Saida
class pagerank_writer {
    public:
        std::string save_vertex(Grafo::vertex_type v) {
            std::stringstream saida_linha;
            saida_linha << v.id() << " " << v.data() << std::endl;
            return saida_linha.str();
        }
        std::string save_edge(Grafo::edge_type e) { 
            return ""; 
        }
};

// Main
int main(int argc, char** argv) {

   // Inicializa GraphLab
   graphlab::mpi_tools::init(argc, argv);
   graphlab::distributed_control dc;

   // Parametros de Entrada
   graphlab::command_line_options clopts("Algoritmo PageRank.");
   std::string arquivo_entrada_grafo = "/shared/data/grafo.adj";
   std::string arquivo_saida = "/shared/results/temp";
   std::string formato_grafo = "adj";
   clopts.attach_option("entrada", arquivo_entrada_grafo, "Arquivo de entrada do grafo");
   clopts.attach_option("formato", formato_grafo, "Formato do arquivo do grafo: 'tsv' ou 'adj'");
   clopts.attach_option("saida", arquivo_saida, "Arquivo de saida com os valores de PageRank calculados para cada vertice");
   if(!clopts.parse(argc, argv)) {
       dc.cout() << "ERRO: Argumentos de linha de comando invalidos." << std::endl;
       return EXIT_FAILURE;
   }

   // Carrega o grafo
   Grafo grafo(dc);
   grafo.load_format(arquivo_entrada_grafo, formato_grafo);
   grafo.finalize();

   // Executa algoritmo
   grafo.transform_vertices(pagerank_valor_inicial);
   graphlab::omni_engine<PageRank> engine(dc, grafo, "synchronous");
   engine.signal_all();
   engine.start();
   const float tempo = engine.elapsed_seconds();
   dc.cout() << "Terminou de executar em " << tempo << " segundos." << std::endl;

   // Saida
   grafo.save(arquivo_saida, pagerank_writer(), false, true, false);

   // Finaliza GraphLab
   graphlab::mpi_tools::finalize();

   return EXIT_SUCCESS;

}

Carga de trabalho

[editar | editar código-fonte]

Foi utilizado bases de dados gerada aleatoriamente utilizando algoritmo de geração de grafos aleatórios [5]. Os números de vértices utilizados foram variados de 100 mil até 900 mil, com a probabilidade de formação de aresta fixa em 0.0001.

Também foi utilizada uma base de dados real coleatada da rede social do Google+, mas por motivos de tempo não foi possível a avaliação da execução.

Avaliação experimental

[editar | editar código-fonte]
Tamanho da entrada
[editar | editar código-fonte]

Nesta análise medimos o tempo de execução do algoritmo variando o tamanho da entrada, ou seja, o número de vértices no Grafo.

No grafo abaixo fizemos a avaliação para diversas configurações de processamento, variando de 1 nodo de processamento até 8. Podemos observar que o crescimento quadrático está presente em todas as configurações, contunde com a complexidade do algoritmo.

Tempo de execução variando tamanho da entrada

Neste gráfico está representado o valor de speedup em relação a configuração de 1 nodo de processamento e 8 nodos de processamento (4 máquinas virtuais com 2 núcleos cada). É interessante observar que para valores abaixo de 500 mil nodos o uso de processamento paralelo não é vantajoso (speedup < 1), devido ao overhead causado pelo framework.

Speedup variando tamanho da entrada
Escalabilidade
[editar | editar código-fonte]

Nesta análisa avaliamos o comportamento do algoritmo quando adicionamos diferentes números de núcleos de processamento. Podemos observar na figura abaixo que o algoritmo é razoavelmente escalável, indicando que o aumento de nodos para para processamento paralelo vale a pena e pode ser realizado facilmente para melhorar o desempenho do algoritmo.

Tempo de execução para escalabilidade

Análise de resultados

[editar | editar código-fonte]

Nesta análisase contabilizamos o número de iterações necessárias para a conversão do algoritmo. Podemos observar que para valores de entrada maiores o número de iteraçoes é menor, indicando uma conversão mais rápida em termos do número de iterações, apesar do tempo de execução ser maior.

Número de iterações em relação ao tamanho da entrada

Análise Crítica

[editar | editar código-fonte]

A utilização do framework GraphLab teve resultados satisfatórios. A programação é relativamente fácil, apesar da configuração e instalação no sistema demandar tempo. O Hadoop, apesar da implementação do PageRank ser possível, não é tão trivial. Se levarmos em conta a facilidade de programação e o desempenho do algoritmo, escolheríamos o framework GraphLab ao invés do Hadoop. Isso mostra que o Hadoop não é bom em todas as situações.

  1. Atish Das Sarma, Anisur Rahaman Molla, Gopal Pandurangan, Eli Upfal, "Fast Distributed PageRank Computation", CoRR, 2012
  2. Christian Kohlschütter , Ru Chirita , Wolfgang Nejdl, "Efficient parallel computation of PageRank", ECIR, 2006
  3. Bahman Bahmani, Kaushik Chakrabarti, Dong Xin, "Fast Personalized PageRank on MapReduce"
  4. WU, T. et al., "Efficient pagerank and spmv computation on amd gpus". ICPP, 2010
  5. Vladimir Batagelj and Ulrik Brandes, "Efficient generation of large random networks", Phys. Rev. E, 71, 036113, 2005.