Processamento de Dados Massivos/Projeto e implementação de aplicações Big Data/Mineração de Itemsets Frequentes: diferenças entre revisões

Origem: Wikilivros, livros abertos por um mundo aberto.
[edição não verificada][edição não verificada]
Conteúdo apagado Conteúdo adicionado
Linha 18: Linha 18:


=== Motivação ===
=== Motivação ===

É fato que, atualmente, uma absurda quantidade de dados é gerada a cada dia. A análise desses dados comumente revela informações de grande importância. Uma abordagem padrão para se fazer esse tipo de análise é fazer uso de estratégias de mineração de dados, que muitas vezes são fortemente dependentes da geração de itemsets frequentes. Nesse sentido, o investimento em estratégias eficientes para mineração de itemsets frequentes em grandes volumes de dados é de grande importância.


=== Algoritmo ===
=== Algoritmo ===

Revisão das 05h24min de 16 de fevereiro de 2013

Descrição da aplicação

Esta seção apresenta uma discussão acerca do problema de mineração de itemsets frequentes no contexto de ambientes de dados massivos (big data).

Denominação

Mineração de Itemsets Frequentes (MIFs)

Dado um conjunto de transações , o objetivo da Mineração de Itemsets Frequentes é encontrar todos os conjuntos de itens (itemsets) tais que o suporte seja maior ou igual a um suporte mínimo previamente estabelecido. Define-se suporte de um itemset como sendo a porcentagem de transações onde este itemset aparece.

Formalmente, podemos definir a tarefa de minerar itemsets frequentes como:

Seja um conjunto de itens (o conjunto de todos os artigos de um supermercado, por exemplo). Seja uma base de dados de transações, isto é, uma tabela de duas colunas, a primeira corresponde ao TID (identificador da transação) e o segundo corresponde à transação propriamente dita, ou seja, um conjunto de itens (por exemplo, os itens comprados por um cliente). Os elementos de são chamados de transações. Um itemset é um subconjunto não vazio de . Diz-se que uma transação suporta um itemset se .

Contexto

Motivação

É fato que, atualmente, uma absurda quantidade de dados é gerada a cada dia. A análise desses dados comumente revela informações de grande importância. Uma abordagem padrão para se fazer esse tipo de análise é fazer uso de estratégias de mineração de dados, que muitas vezes são fortemente dependentes da geração de itemsets frequentes. Nesse sentido, o investimento em estratégias eficientes para mineração de itemsets frequentes em grandes volumes de dados é de grande importância.

Algoritmo

Exemplo de entrada e saída de qualquer algoritmo de mineração de itemsets frequentes

Exemplo de funcionamento

Exemplo de funcionamento do algoritmo SON[1]. Processa uma partição por vez. Objetivo caber na RAM.

Requisitos

Nessa seção, apresentam-se os requisitos de escalabilidade, armazenamento, latência e tolerância a falhas considerados no projeto.

Escalabilidade

Ao duplicar o número de processadores disponíveis para processamento, é desejável que o processamento execute duas vezes mais rápido. Isto é, almeja-se obter um speed-up próximo ao linear. De fato, esse resultado foi alcançado pela abordagem aqui apresentada.

Armazenamento

Em muitos cenários reais o volume de dados é tal que não é possível armazená-lo em apenas uma máquina. Sendo necessário um conjunto de máquinas para comportar esse volume. Esse é o cenário ideal para análise da aplicação aqui desenvolvida.

Latência

Consideramos nesse projeto que a identificação de itemsets frequentes seja um processo offline e portanto não assumimos nenhum requisito de latência. Deseja-se porém obter speed-up próximo ao linear.

Tolerância a falhas

Dado o grande volume de dados e o tempo necessário para processamento, é comum que aplicações de big-data possuam tolerancia a falhas. Embora na presente aplicação tal quesito também seja importante, não entraremos no mérito de propor estratégias de tolerancia a falhas nesse trabalho. Diversos frameworks implementam tolerancia a falhas, evitando que o programador tenha tais preocupaçoes. Portanto, deixaremos isso por conta do framework a ser utilizado. Além disso, a existencia ou não de tolerancia a falhas não está intrinsecamente relacionada ao algoritmo. Assim, para o contexto em questão assumimos que isso não é crítico.

Projeto

Oportunidade de paralelização

Conforme já mencionado em seções anteriores, é comum que aplicações de mineração de dados dependam da extração de itemsets frequentes em grandes volumes de dados. Muitas vezes esses volumes não podem ser armazenado em uma única máquina. Nesse sentido, uma abordagem natural é particionar a base de dados de forma que cada nó seja responsável por armazenar e processar um subconjunto da totalidade.

A oportunidade de paralelização explorada nesse trabalho é o particionamento da base de dados por transações. Dessa forma, cada nó é responsável por um subconjunto das transações. Vale observar que o particionamento por trasações é mais indicado em casos onde a quantidade de transações é muito maior que a quantidade de itens presentes em cada transação. Caso contrário, o particionamento por itens poderia ser mais mais indicado.

A estratégia adotada nesse trabalho é semelhante à oportunidade de paralelização explorada pelo algoritmo Parallel SON (PSON)[2], um algoritmo paralelo para extração itemsets frequentes. Grosso modo, o PSON é uma versão paralela do algoritmo SON. Assim como no algoritmo SON, no PSON a dependência de dados proveniente do princípio Apriori também não é problema. Na infraestrutura de computação, a qual pode ser descrita como um grafo onde cada nó é uma máquina, a dependência maior é intra nó.

O algoritmo SON, e por consequência o PSON, se beneficia do seguinte princípio:

Um itemset frequente global, certamente é frequente local em pelo menos uma das partições.

A primeira fase do algoritmo PSON consiste divisão da base de dados por transações. Em seguida, extrai-se o conjunto de itemsets frequentes maximais (IFMs) em cada partição. Note que esse passo que pode ser feito paralelamente. A próxima etapa é a união dos conjuntos de IFMs gerados, tal união consiste então no conjunto de candidatos ao título de itemsets frequentes globais. O último passo é a contagem de cada elemento do conjunto potência de cada candidato em todas as partições e posterior soma do número de ocorrências e filtragem dos itemsets frequentes globais.

Padrões de acesso aos dados

Padrão de acesso aos dados

Na abordagem aqui apresentada, os dados são particionados por transações. Cada nó de processamento atua em um subconjunto das transações existentes na base original. Especificamente, cada partição é gravada em um arquivo distinto.

Considerando que a base já esteja particionada, cada partição é acessada em dois momentos durante o processamento. Em cada acesso, as transações correspondentes à cada partição são lidas para a memória principal e é feito o processamento. Portanto, para que o processamento ocorra de forma eficiente o particionamento da base de dados deve considerar um parâmetro importante, o tamanho da memória principal. Assim, nenhuma partição deve ser maior que a memória principal, caso contrário o overhead de causado pelo gerenciamento da memória virtual (e consequentemente uso da memória secundária) poderia reduzir drasticamente a eficiência do processamento.

O primeiro acesso refere-se à geração dos itemsets frequentes em cada partição. O segundo, ocorre após a união dos itemsets frequentes maximais gerados na primeira etapa. Nesse momento, cada partição é novamente lida novamente do disco. A figura à direita dessa página apresenta um diagrama temporal que representa visualmente o padrão de acesso aos dados adotado nesse trabalho. Nessa figura, cubos representam processos e cada cilindro representa uma partições dos dados.

Vale observar que, conceitualmente, apenas uma leitura é necessária, visto que as mesmas partições são lidas duas vezes. Porém, do ponto de vista de projeto há beneficios em separar as leituras. Tais benefícios ficaram claros ao longo do texto. Grosso modo, a base apenas precisa estar na memória principal durante a montagem da estrutura de dados requisitada pelo algoritmo de extração de itemsets frequentes a ser usado em cada partição e durante a contagem dos itens na última fase; portanto, remover esses dados da memória principal libera espaço para a própria extração de itemsets frequentes locais e outros processamentos. Além disso, não há perdas significatívas de desempenho.

Outro ponto importante é, dependendo de como a presente estratégia é implementada, haverá mais ou menos acessos a disco. Caso a implementação seja feita em Hadoop, por exemplo, haveria mais acessos visto que grande parte da comunicação ocorre por meio de arquivos.

Padrões de comunicação

Padrão de comunicação

Para tornar mais clara a explicação do método faz-se necessária uma distinção clara entre o nó central e os nós de processamento de transações. O nó central é responsável por gerenciar a computação dos nós que processam transações (ou doravante, nós de processamento). A figura à direita dessa página apresenta um diagrama que representa os padrões de comunicação da abodagem proposta.

No contexto de trocas de mensagens, o processamento pode ser descrito como segue. O nó central envia uma mensagem para cada nó de processamento, o objetivo é enviar os parâmetros necessários e iniciar a identificação dos itemsets frequentes em cada partição. Tão logo cada nó de processamento termine esse passo, uma mensagem contendo o conjunto de itemsets frequentes maximais é enviada ao nó central, que faz a união dos conjuntos recebidos e envia uma nova mensagem a cada nó de processamento contendo os itemsets frequentes maximais gerados. Nesse momento, os nós de processamento fazem o desmembramento de cada conjunto maximal, por meio da geração do conjunto potência, e efetua a contagem de cada elemento em sua respectiva partição. Ao fim dessa contagem, cada nó de processamento envia uma mensagem ao nó central, que por sua vez efetua a totalização e faz a filtragem deixando como resultado apenas os itemsets frequentes maximais globais.

Em suma, o processamento envolve quatro momentos de troca de mensagem:

  1. Nó central envia mensagem aos nós de processamento para iniciar a computação dos IFMs locais
  2. Nós de processamento enviam mensagem ao nó central contendo os IFMs locais
  3. Nó central envia mensagem aos nós de processamento contendo a união dos IFMs locais
  4. Nós de processamento enviam ao nó central mensagens contendo as contagens parciais

Note que o nó central sempre envia mensagens idênticas aos nós de processamento (salvo o nome dos arquivos na mensagem de inicialização), que por sua vez enviam sempre mesagens (potencialmente) distintas ao nó central.

Linha do tempo integrada

A figura abaixo apresenta uma linha do tempo integrada que exemplifica o comportamento do método em termos de acesso aos dados, processamento executado e trocas de mensagens.

Linha do tempo integrada

Implementação

Nesta seção, são discutidos os principais aspectos de implementação do algoritmo.

Estratégia de paralelização

Algumas ferramentas e abstrações podem auxiliar o projeto e desenvolvimento de aplicações no contexto de processamento massivo e big-data. Encontrar aquela que mais naturalmente se adequa ao problema em questão é uma etapa importante do projeto. Dentre as ferramentas e abstrações disponíveis, destacam-se:

  1. Ferramentas para processamento de streams;
  2. Processamento descrito em forma de fluxo em grafos;
  3. Ferramentas para processamento de grafos grandes;
  4. Abstração MapReduce.

A oportunidade de paralelização aqui descrita pode ser implementada por meio de duas chamadas MapReduce[3], conforme descrito a seguir.

Etapa Objetivo
1o mapeamento Extração dos itemsets frequentes maximais (IFMs) locais de cada partição
1a redução União dos IFMs
2o mapeamento Contagens locais baseadas na união dos IFMs
2a redução Totalização e filtragem pelo suporte

A estratégia supracidata foi implementada em C++ usando o framework MapReduce++, descrito nas próximas seção. As quatro caixas abaixo apresentam o código das funções de mapeamento e redução nas duas chamadas do MapReduce. O código é apresentado apenas a título de curiosidade, o intuito é mostrar como tais funções são implementadas. O leitor pode pular esses códigos sem prejuízo de intrepretação e entendimento da abodagem adotata para implementar a estratégia proposta.

class FMIMapper : public MaPI_StrMapper {
public:
	FMIMapper(MaPI_StrMapReduce * mr, MaPI_StrSerializer * s):MaPI_StrMapper(mr,s){};
	virtual vector< pair<string,string> > map( pair<string,string> a ) 
	{ 
		vector< pair<string,string> > mapped; 
		// ... some parts are omited
		vector<Transaction> transactions = transactionReader.read(filename); // Building transaction set
		ItemsetMiner * miner = new Eclat(vocabulary, minsupport, maxrulesize); // Calling Eclat
		InvList * invlist = miner->mine(transactions);
		for (pair< set<unsigned>,set<unsigned> > mfi : (*invlist) )
		{
			stringstream str; str << mfi.second.size() << ' ';
			vector<unsigned> termids(mfi.first.begin(),mfi.first.end());
			for (unsigned termid : termids) str << vocabulary.terms[termid] << ' ';
			mapped.push_back(make_pair(a.second,str.str()));
		}
		return mapped; 
	};
};
class FMIReducer : public MaPI_StrReducer {
public:
	FMIReducer(MaPI_StrMapReduce * mr, MaPI_StrSerializer * s):MaPI_StrReducer(mr,s){};
	virtual pair<string,string> reduce( pair<string, vector<string> > bs ) 
	{ 
		stringstream reduced;
		for (unsigned i = 0 ; i < bs.second.size() ; ++i) reduced << bs.second[i] << '\n';
		return pair<string,string>(bs.first,reduced.str()); 
	};
};
class SubsetCountMapper : public MaPI_StrMapper {
public:
	SubsetCountMapper(MaPI_StrMapReduce * mr, MaPI_StrSerializer * s):MaPI_StrMapper(mr,s){};
	virtual vector< pair<string,string> > map( pair<string,string> a ) 
	{ 
		vector< pair<string,string> > mapped; 
		string filename = a.first;
		string filename_subsets = a.second;
		SubsetHandler subsetHandler;
		vector<pair<string, unsigned> > subset_counting = subsetHandler.countsubsets(filename, filename_subsets);
		for (pair<string,unsigned> itemset : subset_counting)
		{
			stringstream str; str << itemset.second << ' ';
			mapped.push_back(make_pair(itemset.first,str.str()));
		}
		return mapped; 
	};
};
class SubsetCountReducer : public MaPI_StrReducer {
public:
	SubsetCountReducer(MaPI_StrMapReduce * mr, MaPI_StrSerializer * s):MaPI_StrReducer(mr,s){};
	virtual pair<string,string> reduce( pair<string, vector<string> > bs ) 
	{ 
		unsigned sum(0);
		for (unsigned i = 0 ; i < bs.second.size() ; ++i) sum += atoi(bs.second[i].c_str());
		stringstream reduced; reduced << sum;
		return pair<string,string>(bs.first,reduced.str()); 
	};
};

Estratégia de armazenamento

Cada partição da base de dados é armazenada em um arquivo transações. Trata-se de um arquivo de caracteres (textual), porém nada impede o uso de arquivos binários de bloco ou mesmo gerenciadores de banco de dados.

Estratégia de comunicação

A implementação feita nesse trabalho baseia-se no framework MaPI, que integra o projeto MapReduce++ (Conforme discute-se na próxima seção). Esse framework é uma camada de abstração sobre MPI, ou Message Passing Interface. Portanto, a comunicação é feita por trocas de mensagens.


Execução

Plataforma e ferramentas

A estratégia aqui apresentada foi implementada usando MapReduce++.

MapReduce++

MapReduce++[4] é um projeto open-source cujo objetivo é disponibilizar diferentes implementações da abstração MapReduce na linguagem de programação C++. Esse projeto define uma interface padrão para desenvolvimento de bibliotecas MapReduce, que serve como base para suas implementações. Atualmente, MapReduce++ contém duas implementações paralelas do modelo MapReduce:

  • MapMP - uma biblioteca MapReduce para multi-processadores (memória compartilhada) baseada em OpenMP
  • MaPI[5] - um framework MapReduce para multi-computadores (memória distribuída) baseada em Message Passing Interface (MPI)

Há também uma terceira versão, chamada SeqMR, que tem como objetivo auxiliar o usuário durante a etapa de desenvolvimento. Trata-se de uma versão sequencial para desenvolvimento. Portanto, caso o usuário não possua o ambiente de execução, este pode desenvolver a aplicação MapReduce usando SeqMR que não requer nenhum recurso além do gcc/g++. Finalizada a etapa de desenvovimento, o usuário deve então fazer pequenas adaptações no código para explorar o paralelismo intrinseco às tarefas de mapeamento e redução. São alterações como mudar a herança das classes de SeqMR para MapMP, no caso de memória compartilhada, ou para MaPI no caso de memória distribuída. Nesse último caso, faz-se necessária também a criação de serializadores e a inicialização dos servidores de mapeamento e redução.

Uma boa forma começar o desenvolvimento de aplicações usando implementações baseadas em MapReduce++ (alternativa àquela apresentada no paragrafo anterior) é baseando nos exemplos disponíveis no projeto. Visto que os dados da aplicação aqui discutida podem não caber em uma só máquina, a implementação MaPI é a mais indicada para o nosso propósito. A seguir, é apresentado um passo-a-passo desde o download até a execussão de um programa MaPI.

MaPI: Download

Baixe a versão mais atual do MapReduce++ no SourceForge

http://sourceforge.net/projects/mapreducepp/

MaPI: Instalação no Ubuntu Linux

Para desenvolver e executar programas MaPI, é necessário ter compilador g++ e suporte para desenvolvimento e execução de programas baseados em MPI.

Instale o g++ com o seguinte comando no terminal:

$ sudo apt-get install g++

Para suporte a MPI, pode-se usar a implementação OpenMPI (recomendada):

$ sudo apt-get install openmpi-bin libopenmpi-dev

ou MPICH2, se preferir:

$ sudo apt-get install mpich2

MaPI: Exemplo de aplicação

A seguir apresentamos um exemplo de aplicação, onde o MapReduce é descrito pela herança de duas classes abstratas. Especificamente, são implementadas as funções map de MaPI_Mapper e reduce de MaPI_Reducer. Neste exemplo, um vetor contendo três elementos (chave,valor) de tipo (int,char) é mapeado por uma função que apenas transmite a entrada para a saída (isto é, não faz nada) e reduzido por uma função que agrupa elementos de mesma chave. O código é apresentado abaixo (nota: algumas partes são omitidas, o códito completo está disponível no SourceForge).

#include "../MaPI/MaPI.h"
#include <iostream>
#define MRType int,char,int,char,string
class MyMapper : public MaPI_Mapper<MRType> {
public:
  virtual vector< pair<int,char> > map( pair<int,char> a )
  {
    vector< pair<int,char> > m(1,a);
    cout << "\tMapping..\n";
    return m;
  };
};
class MyReducer : public MaPI_Reducer<MRType> {
public:
  virtual pair<int,string> reduce( pair<int, vector<char> > bs )
  {
    string reduced;
    for(unsigned i=0;i<bs.second.size();i++) reduced += bs.second[i];
    cout << "\tReducing..\n";
    return make_pair(bs.first,reduced);
  };
};
int main(int argc,char** argv)
{
  MaPI_MapReduce<MRType> mapReduce;
  MySerializer serializer;
  MyMapper mapper(&mapReduce,&serializer);
  MyReducer reducer(&mapReduce,&serializer);
  mapReduce.initServers(argc,argv);
  vector< pair<int,char> > input;
  input.push_back(make_pair(1,'a'));
  input.push_back(make_pair(2,'b'));
  input.push_back(make_pair(1,'c'));
  vector< pair<int,string> > output = mapReduce.run(mapper,reducer,input);
  cout << output << endl;
  return 0;
}

O usuário deve especificar os seguintes cinco tipos:

  • da chave e do valor de entrada
  • da chave e do valor intermediário
  • do valor de redução

Para compilar e executar o programa o usuário deve usar a seguinte linha de comando, onde "-np 3" significa que mpirun usará três processos:

$ mpiCC ex01.cpp -o program && mpirun -np 3 ex01

Ao executar o comando acima, a aplicação de exemplo produzirá a seguinte saída, onde a última linha é o resultado produzido pelo MapReduce.

Mapping..
Mapping..
Mapping..
Reducing..
Reducing..
vector(2) [pair(1 , "ac") , pair(2 , "b")]

MaPI: Configuração do cluster

Até então, os testes foram executados em uma máquina apenas. Para usar um cluster, é necessária a instalação do ssh client e server.

$ sudo apt-get install openssh-server openssh-client

Como iniciar e parar um servidor ssh:

$ sudo /etc/init.d/ssh start
$ sudo /etc/init.d/ssh stop

Avaliação

Nesta Seção serão apresentados os experimentos computacionais e uma discussão analítica acerca dos resultados.

Carga de trabalho

Para execução dos experimentos computacionais, foram utilizados várias bases de dados de diversos tamanhos. As bases de dados se dividem em duas categorias. Na primeira categoria variou-se o número de transações (N) e fixou-se o número de itens por transação (D). Enquanto na segunda categoria ocorreu o oposto, ou seja, o número de transações (N) foi fixado e o número de itens por transação (D) foi variado. A seguir a tabela apresenta as bases de dados utilizadas e seus respectivos tamanhos (MB):

Categoria 1 Categoria 2
N = 100.000 e D = 10 (2.7 MB) N = 300.000 e D = 10 (8.4 MB)
N = 300.000 e D = 10 (8.4 MB) N = 300.000 e D = 20 (14.2 MB)
N = 500.000 e D = 10 (13.7 MB ) N = 300.000 e D = 50 (31 MB)
N = 1.000.000 e D = 10 (27,9 MB) ---
N = 3.000.000 e D = 10 (82,7 MB) ---

As duas categorias de base de dados de testes expressam a dimensionalidade dos problemas de mineração de itemsets frequetnes em dados massivos, ou seja, no mundo real as grandes bases de dados crescem em termos de número de transação e em número de itens por transação. Por isso foram utilizadas as duas variações.

Avaliação experimental

Planejamento experimental

O objetivo deste trabalho é apresentar uma solução paralela distribuída que melhore a performance, em termos de tempo de execução, a tarefa de mineração de itemsets frequentes. Para tanto, será apresentado resultados de testes variando-se o número de transações de uma base de dados e também o número de itens por transação, conforme detalhado na Seção anterior.

Para cada base de teste, foi executado a solução apresentada utilizando-se 1, 2, 4 e 8 nós de processamento e foi recolhido o seu tempo de processamento total. Não foi considerado o tempo para transmitir as bases de dados para cada nó do cluster, ou seja, todas as bases de testes foram replicadas em todos os nós de processamento.

Ambiente Computacional

Para execução dos testes foi utilizado um cluster com oito nós de processamento. Cada nó do cluster é composto de uma VCPU (Virtual CPU) com 2 GB de memória RAM e 10 GB de Disco. O sistema operacional utilizado em cada nó foi o Ubuntu 12.04.

Análise dos resultados

A seguir são apresentados os gráficos demonstrando o SpeedUp e tempo de execução utilizando-se as bases de dados de testes da primeira categoria (onde variou-se o número de transações).

Speedup obtido para diferentes números de transações
Tempo de Execução obtido para diferentes números de transações

Também foi coletado o tempo de execução e SpeedUp para a segunda categoria de bases de testes (onde variou-se o número de itens por transação), os resultados são apresentados nos gráficos a seguir:

Speedup obtido para diferentes valores de itens por transação
Tempo de Execucação obtido para diferentes valores de itens por transação

Escalabilidade

Tendências

Identificação de gargalos

Análise de projeto e implementação

Referências

  1. A. Savasere, E. Omiecinski, and S.B. Navathe, An efficient algorithm for mining association rules in large databases. Intl. Conf. on Very Large Databases, pp. 432–444, 1995.
  2. Tao Xiao; Chunfeng Yuan; Yihua Huang; PSON: A Parallelized SON Algorithm with MapReduce for Mining Frequent Sets. Parallel Architectures, Algorithms and Programming (PAAP), 2011.
  3. Dean, J., and Ghemawat, S. Mapreduce: simplified data processing on large clusters. In Proceedings of OSDI'04: Sixth Symposium on Operating System Design and Implementation, 2004.
  4. MapReduce++ no SourceForge
  5. Ribas, S; Perché, M; Coelho, IM; Munhoz, PA; Souza, MJF; Aquino, ALL; A Framework for Developing Parallel Optimization Algorithms. Informatics. ACTA Press, 2010.