Apache Spark: différences entre les Dataframes, les Datasets et les RDD

1. Vue d'ensemble

Apache Spark est un système de traitement de données rapide et distribué. Il traite les données en mémoire et utilise la mise en cache en mémoire et une exécution optimisée pour des performances rapides. Il fournit des API de haut niveau pour les langages de programmation populaires tels que Scala, Python, Java et R.

Dans ce tutoriel rapide, nous allons passer en revue trois des concepts de base de Spark: les dataframes, les ensembles de données et les RDD.

2. DataFrame

Spark SQL a introduit une abstraction de données tabulaire appelée DataFrame depuis Spark 1.3. Depuis lors, il est devenu l'une des fonctionnalités les plus importantes de Spark. Cette API est utile lorsque nous voulons gérer des données distribuées structurées et semi-structurées.

Dans la section 3, nous discuterons des ensembles de données distribués résilients (RDD). Les DataFrames stockent les données de manière plus efficace que les RDD, car ils utilisent les capacités immuables, en mémoire, résilientes, distribuées et parallèles des RDD, mais ils appliquent également un schéma aux données. DataFrames traduit également le code SQL en opérations RDD de bas niveau optimisées.

Nous pouvons créer des DataFrames de trois manières:

  • Conversion de RDD existants
  • Exécution de requêtes SQL
  • Chargement de données externes

L'équipe Spark a introduit SparkSession dans la version 2.0, elle unifie tous les différents contextes, garantissant que les développeurs n'auront pas à se soucier de créer différents contextes:

SparkSession session = SparkSession.builder() .appName("TouristDataFrameExample") .master("local[*]") .getOrCreate(); DataFrameReader dataFrameReader = session.read();

Nous analyserons le fichier Tourist.csv :

Dataset data = dataFrameReader.option("header", "true") .csv("data/Tourist.csv");

Depuis Spark 2.0 DataFrame est devenu un Dataset de type Row , nous pouvons donc utiliser un DataFrame comme alias pour un Dataset .

Nous pouvons sélectionner des colonnes spécifiques qui nous intéressent. Nous pouvons également filtrer et regrouper par colonne donnée:

data.select(col("country"), col("year"), col("value")) .show(); data.filter(col("country").equalTo("Mexico")) .show(); data.groupBy(col("country")) .count() .show();

3. Ensembles de données

Un ensemble de données est un ensemble de données structurées fortement typées . Ils fournissent le style de programmation orienté objet familier ainsi que les avantages de la sécurité de type puisque les ensembles de données peuvent vérifier la syntaxe et détecter les erreurs au moment de la compilation.

Dataset est une extension de DataFrame, nous pouvons donc considérer un DataFrame comme une vue non typée d'un ensemble de données.

L'équipe Spark a publié l' API Dataset dans Spark 1.6 et, comme ils l'ont mentionné: «l'objectif de Spark Datasets est de fournir une API qui permet aux utilisateurs d'exprimer facilement des transformations sur des domaines d'objets, tout en offrant les avantages de performance et de robustesse de l'exécution Spark SQL moteur".

Tout d'abord, nous devons créer une classe de type TouristData :

public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters }

Pour mapper chacun de nos enregistrements au type spécifié, nous devrons utiliser un encodeur. Les encodeurs traduisent entre les objets Java et le format binaire interne de Spark :

// SparkSession initialization and data load Dataset responseWithSelectedColumns = data.select(col("region"), col("country"), col("year"), col("series"), col("value").cast("double"), col("footnotes"), col("source")); Dataset typedDataset = responseWithSelectedColumns .as(Encoders.bean(TouristData.class));

Comme avec DataFrame, nous pouvons filtrer et regrouper par colonnes spécifiques:

typedDataset.filter((FilterFunction) record -> record.getCountry() .equals("Norway")) .show(); typedDataset.groupBy(typedDataset.col("country")) .count() .show();

Nous pouvons également faire des opérations comme filtrer par colonne correspondant à une certaine plage ou calculer la somme d'une colonne spécifique, pour en obtenir la valeur totale:

typedDataset.filter((FilterFunction) record -> record.getYear() != null && (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear())  record.getValue() != null && record.getSeries() .contains("expenditure")) .groupBy("country") .agg(sum("value")) .show();

4. RDD

Le jeu de données distribué résilient ou RDD est l'abstraction de programmation principale de Spark. Il représente une collection d'éléments qui est: immuable, résilient et distribué .

Un RDD encapsule un grand ensemble de données, Spark distribuera automatiquement les données contenues dans les RDD à travers notre cluster et parallélisera les opérations que nous effectuons sur eux .

Nous ne pouvons créer des RDD que par des opérations de données dans un stockage stable ou des opérations sur d'autres RDD.

La tolérance aux pannes est essentielle lorsque nous traitons de grands ensembles de données et que les données sont distribuées sur des machines en cluster. Les RDD sont résilients en raison de la mécanique de récupération de panne intégrée de Spark. Spark repose sur le fait que les RDD mémorisent comment ils ont été créés afin que nous puissions facilement retracer la lignée pour restaurer la partition .

There are two types of operations we can do on RDDs: Transformations and Actions.

4.1. Transformations

We can apply Transformations to an RDD to manipulate its data. After this manipulation is performed, we'll get a brand-new RDD, since RDDs are immutable objects.

We'll check how to implement Map and Filter, two of the most common transformations.

First, we need to create a JavaSparkContext and load the data as an RDD from the Tourist.csv file:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD tourists = sc.textFile("data/Tourist.csv");

Next, let's apply the map function to get the name of the country from each record and convert the name to uppercase. We can save this newly generated dataset as a text file on disk:

JavaRDD upperCaseCountries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase(); }).distinct(); upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

If we want to select only a specific country, we can apply the filter function on our original tourists RDD:

JavaRDD touristsInMexico = tourists .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico")); touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Actions

Actions will return a final value or save the results to disc, after doing some computation on the data.

Two of the recurrently used actions in Spark are Count and Reduce.

Let's count the total countries on our CSV file:

// Spark Context initialization and data load JavaRDD countries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1]; }).distinct(); Long numberOfCountries = countries.count();

Now, we'll calculate the total expenditure by country. We'll need to filter the records containing expenditure in their description.

Instead of using a JavaRDD, we'll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let's check it next:

JavaRDD touristsExpenditure = tourists .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure")); JavaPairRDD expenditurePairRdd = touristsExpenditure .mapToPair(line -> { String[] columns = line.split(COMMA_DELIMITER); return new Tuple2(columns[1], Double.valueOf(columns[6])); }); List
    
      totalByCountry = expenditurePairRdd .reduceByKey((x, y) -> x + y) .collect();
    

5. Conclusion

Pour résumer, nous devons utiliser des DataFrames ou des ensembles de données lorsque nous avons besoin d'API spécifiques à un domaine, nous avons besoin d'expressions de haut niveau telles que l'agrégation, la somme ou les requêtes SQL. Ou lorsque nous voulons une sécurité de type au moment de la compilation.

D'un autre côté, nous devrions utiliser les RDD lorsque les données ne sont pas structurées et que nous n'avons pas besoin d'implémenter un schéma spécifique ou lorsque nous avons besoin de transformations et d'actions de bas niveau.

Comme toujours, tous les exemples de code sont disponibles à l'adresse over sur GitHub.