Introduction au traitement Spark Graph avec GraphFrames

1. Introduction

Le traitement graphique est utile pour de nombreuses applications, des réseaux sociaux aux publicités. Dans un scénario Big Data, nous avons besoin d'un outil pour répartir cette charge de traitement.

Dans ce didacticiel, nous allons charger et explorer les possibilités de graphes à l'aide d'Apache Spark en Java. Pour éviter les structures complexes, nous utiliserons une API graphique Apache Spark simple et de haut niveau: l'API GraphFrames.

2. Graphiques

Tout d'abord, définissons un graphe et ses composants. Un graphe est une structure de données comportant des arêtes et des sommets. Les arêtes portent des informations qui représentent les relations entre les sommets.

Les sommets sont des points dans un espace à n dimensions, et les arêtes connectent les sommets selon leurs relations:

Dans l'image ci-dessus, nous avons un exemple de réseau social. Nous pouvons voir les sommets représentés par des lettres et les arêtes portant quel type de relation existe entre les sommets.

3. Configuration de Maven

Maintenant, commençons le projet en configurant la configuration Maven.

Ajoutons spark-graphx 2.11, graphframes et spark-sql 2.11 :

 org.apache.spark spark-graphx_2.11 2.4.4   graphframes graphframes 0.7.0-spark2.4-s_2.11   org.apache.spark spark-sql_2.11 2.4.4 

Ces versions d'artefacts prennent en charge Scala 2.11.

De plus, il se trouve que GraphFrames n'est pas dans Maven Central. Alors, ajoutons également le référentiel Maven nécessaire:

  SparkPackagesRepo //dl.bintray.com/spark-packages/maven  

4. Configuration Spark

Afin de travailler avec GraphFrames, nous devons télécharger Hadoop et définir la variable d'environnement HADOOP_HOME .

Dans le cas de Windows comme système d'exploitation, nous téléchargerons également le fichier winutils.exe approprié dans le dossier HADOOP_HOME / bin .

Ensuite, commençons notre code en créant la configuration de base:

SparkConf sparkConf = new SparkConf() .setAppName("SparkGraphFrames") .setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

Nous devrons également créer une SparkSession :

SparkSession session = SparkSession.builder() .appName("SparkGraphFrameSample") .config("spark.sql.warehouse.dir", "/file:C:/temp") .sparkContext(javaSparkContext.sc()) .master("local[*]") .getOrCreate();

5. Construction du graphique

Maintenant, nous sommes tous prêts à commencer avec notre code principal. Alors, définissons les entités pour nos sommets et arêtes, et créons l' instance GraphFrame .

Nous travaillerons sur les relations entre utilisateurs à partir d'un hypothétique réseau social.

5.1. Les données

Tout d'abord, pour cet exemple, définissons les deux entités comme utilisateur et relation :

public class User { private Long id; private String name; // constructor, getters and setters } public class Relationship implements Serializable { private String type; private String src; private String dst; private UUID id; public Relationship(String type, String src, String dst) { this.type = type; this.src = src; this.dst = dst; this.id = UUID.randomUUID(); } // getters and setters }

Ensuite, définissons quelques instances d' utilisateurs et de relations :

List users = new ArrayList(); users.add(new User(1L, "John")); users.add(new User(2L, "Martin")); users.add(new User(3L, "Peter")); users.add(new User(4L, "Alicia")); List relationships = new ArrayList(); relationships.add(new Relationship("Friend", "1", "2")); relationships.add(new Relationship("Following", "1", "4")); relationships.add(new Relationship("Friend", "2", "4")); relationships.add(new Relationship("Relative", "3", "1")); relationships.add(new Relationship("Relative", "3", "4"));

5.2. Instance GraphFrame

Maintenant, pour créer et manipuler notre graphique de relations, nous allons créer une instance de GraphFrame . Le constructeur GraphFrame attend deux instances de Dataset , la première représentant les sommets et la seconde, les arêtes:

Dataset userDataset = session.createDataFrame(users, User.class); Dataset relationshipDataset = session.createDataFrame(relationships, Relation.class); GraphFrame graph = new GraphFrame(userDataframe, relationshipDataframe);

Enfin, nous enregistrerons nos sommets et arêtes dans la console pour voir à quoi cela ressemble:

graph.vertices().show(); graph.edges().show();
+---+------+ | id| name| +---+------+ | 1| John| | 2|Martin| | 3| Peter| | 4|Alicia| +---+------+ +---+--------------------+---+---------+ |dst| id|src| type| +---+--------------------+---+---------+ | 2|622da83f-fb18-484...| 1| Friend| | 4|c6dde409-c89d-490...| 1|Following| | 4|360d06e1-4e9b-4ec...| 2| Friend| | 1|de5e738e-c958-4e0...| 3| Relative| | 4|d96b045a-6320-4a6...| 3| Relative| +---+--------------------+---+---------+

6. Opérateurs graphiques

Maintenant que nous avons une instance GraphFrame , voyons ce que nous pouvons en faire.

6.1. Filtre

GraphFrames nous permet de filtrer les arêtes et les sommets par une requête.

Ensuite, filtrons les sommets par la propriété name sur User :

graph.vertices().filter("name = 'Martin'").show();

Au niveau de la console, on peut voir le résultat:

+---+------+ | id| name| +---+------+ | 2|Martin| +---+------+

De plus, nous pouvons directement filtrer sur le graphique en appelant filterEdges ou filterVertices :

graph.filterEdges("type = 'Friend'") .dropIsolatedVertices().vertices().show();

Maintenant, puisque nous avons filtré les arêtes, il se peut que nous ayons encore des sommets isolés. Nous appellerons donc dropIsolatedVertices ().

En conséquence, nous avons un sous-graphe, toujours une instance de GraphFrame , avec uniquement les relations qui ont le statut «Friend»:

+---+------+ | id| name| +---+------+ | 1| John| | 2|Martin| | 4|Alicia| +---+------+

6.2. Degrés

Un autre ensemble de fonctionnalités intéressant est l' ensemble des degrés d'opérations. Ces opérations renvoient le nombre d'arêtes incidentes sur chaque sommet.

The degrees operation just returns the count of all edges of each vertex. On the other hand, inDegrees counts only incoming edges, and outDegrees counts only outgoing edges.

Let's count the incoming degrees of all vertices in our graph:

graph.inDegrees().show();

As a result, we have a GraphFrame that shows the number of incoming edges to each vertex, excluding those with none:

+---+--------+ | id|inDegree| +---+--------+ | 1| 1| | 4| 3| | 2| 1| +---+--------+

7. Graph Algorithms

GraphFrames also provides popular algorithms ready to use — let's take a look at some of them.

7.1. Page Rank

The Page Rank algorithm weighs the incoming edges to a vertex and transforms it into a score.

The idea is that each incoming edge represents an endorsement and makes the vertex more relevant in the given graph.

For example, in a social network, if a person is followed by various people, he or she will be ranked highly.

Running the page rank algorithm is quite straightforward:

graph.pageRank() .maxIter(20) .resetProbability(0.15) .run() .vertices() .show();

To configure this algorithm, we just need to provide:

  • maxIter – the number of iterations of page rank to run – 20 is recommended, too few will decrease the quality, and too many will degrade the performance
  • resetProbability – the random reset probability (alpha) – the lower it is, the bigger the score spread between the winners and losers will be – valid ranges are from 0 to 1. Usually, 0.15 is a good score

The response is a similar GraphFrame, though this time we see an additional column giving the page rank of each vertex:

+---+------+------------------+ | id| name| pagerank| +---+------+------------------+ | 4|Alicia|1.9393230468864597| | 3| Peter|0.4848822786454427| | 1| John|0.7272991738542318| | 2|Martin| 0.848495500613866| +---+------+------------------+

In our graph, Alicia is the most relevant vertex, followed by Martin and John.

7.2. Connected Components

The connected components algorithm finds isolated clusters or isolated sub-graphs. These clusters are sets of connected vertices in a graph where each vertex is reachable from any other vertex in the same set.

We can call the algorithm without any parameters via the connectedComponents() method:

graph.connectedComponents().run().show();

The algorithm returns a GraphFrame containing each vertex and the component to which each is connected:

+---+------+------------+ | id| name| component| +---+------+------------+ | 1| John|154618822656| | 2|Martin|154618822656| | 3| Peter|154618822656| | 4|Alicia|154618822656| +---+------+------------+

Our graph has only one component — this means that we do not have isolated sub-graphs. The component has an auto-generated id, which is 154618822656, in our case.

Although we have one more column here – the component id – our graph is still the same.

7.3. Triangle Counting

Triangle counting is commonly used as community detection and counting in a social network graph. A triangle is a set of three vertices, where each vertex has a relationship to the other two vertices in the triangle.

In a social network community, it's easy to find a considerable number of triangles connected to each other.

We can easily perform a triangle counting directly from our GraphFrame instance:

graph.triangleCount().run().show();

The algorithm also returns a GraphFrame with the number of triangles passing through each vertex.

+-----+---+------+ |count| id| name| +-----+---+------+ | 1| 3| Peter| | 2| 1| John| | 2| 4|Alicia| | 1| 2|Martin| +-----+---+------+

8. Conclusion

Apache Spark is a great tool for computing a relevant amount of data in an optimized and distributed way. And, the GraphFrames library allows us to easily distribute graph operations over Spark.

Comme toujours, le code source complet de l'exemple est disponible à l'adresse over sur GitHub.