-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: démarrer avec apache spark #1131
Changes from 8 commits
c976849
5e91d54
7f9c768
5b6418d
d0f2871
ec8bbde
1db0d52
0d350f8
ac835a8
8b3fd0a
17f47b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,275 @@ | ||||||||||||||
--- | ||||||||||||||
contentType: article | ||||||||||||||
lang: fr | ||||||||||||||
date: '2024-07-12' | ||||||||||||||
slug: demarrer-apache-spark | ||||||||||||||
title: Démarrer avec Apache Spark | ||||||||||||||
excerpt: >- | ||||||||||||||
Le domaine de la data est présent au quotidient. La quantité de donnée est si grande que nous la nommons Big Data. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
Dans cet article, nous verrons comment traiter ce volume de données à l'aide du framework Apache Spark. | ||||||||||||||
categories: [] | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
authors: | ||||||||||||||
- tthuon | ||||||||||||||
keywords: [] | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
--- | ||||||||||||||
|
||||||||||||||
Lorsque l'on travaille dans l'univers de la data, nous effectuons principalements sur ces trois étapes : | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
- extraire la données de la source | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
- la transformer pour lui donner de la valeur | ||||||||||||||
- stocker le résultat | ||||||||||||||
|
||||||||||||||
Ces trois étapes décrivent un pipeline ETL : Extract, Transform, Load (Extraire, Transformer, Charger). | ||||||||||||||
|
||||||||||||||
Il existe une multitude de façon d'effectuer ce travail. Ici, nous utiliserons Apache Spark. | ||||||||||||||
|
||||||||||||||
Apache Spark est un framework qui permet de manipuler et transformer la données. Il s'appuie sur le framework Hadoop pour distribuer les calculs sur les différents noeuds du cluster. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
[https://spark.apache.org/](https://spark.apache.org/) | ||||||||||||||
|
||||||||||||||
Par simplicité, nous nommerons Spark pour désigner Apache Spark. | ||||||||||||||
|
||||||||||||||
## Mise en situation | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Rentrons dans le vif du sujet avec un cas concret. Je veux importer les données sur le nombre de passage de vélo sur un point données afin d'effectuer une étude sur l'usage du vélo en ville. | ||||||||||||||
|
||||||||||||||
Prenons par exemple la ville de Nantes qui met à disposition un jeu de données https://data.nantesmetropole.fr/explore/dataset/244400404_comptages-velo-nantes-metropole/information | ||||||||||||||
|
||||||||||||||
Nous prenons ce fichier et le déposons dans le dossier `source/244400404_comptages-velo-nantes-metropole.csv`. | ||||||||||||||
|
||||||||||||||
Voici un échantillon du fichier | ||||||||||||||
|
||||||||||||||
```csv | ||||||||||||||
Numéro de boucle;Libellé;Total;Probabilité de présence d'anomalies;Jour de la semaine;Boucle de comptage;Date formatée;Vacances | ||||||||||||||
0674;Pont Haudaudine vers Sud;657;;2;0674 - Pont Haudaudine vers Sud;2021-03-16;Hors Vacances | ||||||||||||||
0674;Pont Haudaudine vers Sud;689;;4;0674 - Pont Haudaudine vers Sud;2021-03-18;Hors Vacances | ||||||||||||||
0674;Pont Haudaudine vers Sud;589;;5;0674 - Pont Haudaudine vers Sud;2021-03-26;Hors Vacances | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
## Installation d'Apache Spark | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Il existe plusieurs façon d'install Apache Spark : soit en prenant le binaire, soit avec Docker avec [Jupyter Docker Stacks](https://jupyter-docker-stacks.readthedocs.io/en/latest/). | ||||||||||||||
|
||||||||||||||
Nous allons effectuer l'installation avec le binaire Spark. Ce processus est plus long et complexe, mais il est intéressant car il permet de mieux comprendre les différents éléments. | ||||||||||||||
|
||||||||||||||
Il sera nécessaire d'avoir au moins Python 3.8 et Java 17. | ||||||||||||||
|
||||||||||||||
Pour installer Java Runtime sous Ubuntu | ||||||||||||||
|
||||||||||||||
```shell | ||||||||||||||
sudo apt install openjdk-17-jre-headless | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
### Installer le binaire Apache Spark | ||||||||||||||
|
||||||||||||||
Aller sur la page de Téléchargement [https://spark.apache.org/downloads.html](https://spark.apache.org/downloads.html) et télécharger le package en tgz. Prenez la dernière version disponible (il s'agit de la 3.5.1 au moment de l'écriture de l'article). | ||||||||||||||
|
||||||||||||||
Une fois téléchargé, décomppresser dans un dossier, par exemple `~/Apps/spark`. | ||||||||||||||
|
||||||||||||||
Dans le fichier `.bashrc`, ajouter une variable d'environnement qui pointera vers le dossier du binaire Spark. | ||||||||||||||
|
||||||||||||||
```text | ||||||||||||||
# ~/.bashrc | ||||||||||||||
export SPARK_HOME=~/Apps/spark/spark-3.5.1-bin-hadoop3 | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
Vous êtes prêt. Il restera à installer le package Python qui permet de manipuler Spark | ||||||||||||||
|
||||||||||||||
### Installation de PySpark | ||||||||||||||
|
||||||||||||||
PySpark est une bibliothèque en Python qui fait la correspondance vers les appels Java Spark. | ||||||||||||||
|
||||||||||||||
Dans notre dossier qui contient le projet, nous avons au préalable créé un environnement virtuel. | ||||||||||||||
|
||||||||||||||
```shell | ||||||||||||||
virtualenv venv | ||||||||||||||
. venv/bin/activate | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
Une fois l'environnement virtual activé, nous pouvons installer pyspark. | ||||||||||||||
|
||||||||||||||
```shell | ||||||||||||||
pip install pyspark==3.5.1 | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
PySpark est installé ! | ||||||||||||||
|
||||||||||||||
## Création de notre pipeline ETL avec Apache Spark | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Nous allons effectuer les 3 étapes de notre pipeline | ||||||||||||||
- extraire la données de la source | ||||||||||||||
- la transformer pour lui donner de la valeur | ||||||||||||||
- stocker le résultat | ||||||||||||||
|
||||||||||||||
### Lecture de la donnée source | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Pour lire la donnée source avec pyspark, il faut tout d'abord créer une session Spark. | ||||||||||||||
|
||||||||||||||
Cette session Spark va piloter les différents travaux à effectuer. | ||||||||||||||
|
||||||||||||||
```python | ||||||||||||||
from pyspark.sql import SparkSession, DataFrame | ||||||||||||||
|
||||||||||||||
spark = SparkSession.builder.appName("Bike calculation").getOrCreate() | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
Ensuite, nous instruire le chargement du fichier CSV. | ||||||||||||||
|
||||||||||||||
Le fichier contient un en-tête et le caractère de séparation est un point-virgule. Il sera nécessaire de le spécifier lors du chargement. | ||||||||||||||
|
||||||||||||||
```python | ||||||||||||||
source_file = "source/244400404_comptages-velo-nantes-metropole.csv" | ||||||||||||||
df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file) | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
<div class="admonition note" markdown="1"><p class="admonition-title">Note</p> | ||||||||||||||
La nom de la variable `df` signifie Dataframe. Avec Spark, une fois la donnée chargé, nous manipulons des dataframes. | ||||||||||||||
</div> | ||||||||||||||
|
||||||||||||||
On peut ajouter les instructions `.show()` et `.printSchema()` pour afficher les premiers éléments. | ||||||||||||||
|
||||||||||||||
```python | ||||||||||||||
df.printSchema() | ||||||||||||||
df.show() | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
Vous pouvez dès à présent exécuter le fichier pour voir le résultat. | ||||||||||||||
|
||||||||||||||
```shell | ||||||||||||||
(venv) [thierry@travail:~/eleven/data] | ||||||||||||||
% python main.py | ||||||||||||||
(...) | ||||||||||||||
root | ||||||||||||||
|-- Numéro de boucle: string (nullable = true) | ||||||||||||||
|-- Libellé: string (nullable = true) | ||||||||||||||
|-- Total: string (nullable = true) | ||||||||||||||
|-- Probabilité de présence d'anomalies: string (nullable = true) | ||||||||||||||
|-- Jour de la semaine: string (nullable = true) | ||||||||||||||
|-- Boucle de comptage: string (nullable = true) | ||||||||||||||
|-- Date formatée: string (nullable = true) | ||||||||||||||
|-- Vacances: string (nullable = true) | ||||||||||||||
|
||||||||||||||
+----------------+--------------------+-----+-----------------------------------+------------------+--------------------+-------------+--------------------+ | ||||||||||||||
|Numéro de boucle| Libellé|Total|Probabilité de présence d'anomalies|Jour de la semaine| Boucle de comptage|Date formatée| Vacances| | ||||||||||||||
+----------------+--------------------+-----+-----------------------------------+------------------+--------------------+-------------+--------------------+ | ||||||||||||||
| 0674|Pont Haudaudine v...| 657| null| 2|0674 - Pont Hauda...| 2021-03-16| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 689| null| 4|0674 - Pont Hauda...| 2021-03-18| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 589| null| 5|0674 - Pont Hauda...| 2021-03-26| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 591| null| 4|0674 - Pont Hauda...| 2021-04-15| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 481| null| 2|0674 - Pont Hauda...| 2021-05-04|Vacances de print...| | ||||||||||||||
| 0674|Pont Haudaudine v...| 583| null| 1|0674 - Pont Hauda...| 2021-05-10| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 421| null| 6|0674 - Pont Hauda...| 2021-05-22| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 279| null| 7|0674 - Pont Hauda...| 2021-05-23| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 512| null| 6|0674 - Pont Hauda...| 2021-06-12| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 338| null| 7|0674 - Pont Hauda...| 2021-06-13| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 948| null| 2|0674 - Pont Hauda...| 2021-06-15| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 688| null| 1|0674 - Pont Hauda...| 2021-06-21| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 381| null| 6|0674 - Pont Hauda...| 2021-07-10| Vacances d'été| | ||||||||||||||
| 0674|Pont Haudaudine v...| 413| null| 6|0674 - Pont Hauda...| 2021-07-17| Vacances d'été| | ||||||||||||||
| 0674|Pont Haudaudine v...| 648| null| 5|0674 - Pont Hauda...| 2021-07-23| Vacances d'été| | ||||||||||||||
| 0675|Pont Haudaudine v...| 541| null| 3|0675 - Pont Hauda...| 2021-01-06| Hors Vacances| | ||||||||||||||
| 0675|Pont Haudaudine v...| 549| null| 4|0675 - Pont Hauda...| 2021-01-07| Hors Vacances| | ||||||||||||||
| 0675|Pont Haudaudine v...| 520| null| 1|0675 - Pont Hauda...| 2021-01-18| Hors Vacances| | ||||||||||||||
| 0675|Pont Haudaudine v...| 184| null| 7|0675 - Pont Hauda...| 2021-01-24| Hors Vacances| | ||||||||||||||
| 0675|Pont Haudaudine v...| 418| null| 1|0675 - Pont Hauda...| 2021-02-01| Hors Vacances| | ||||||||||||||
+----------------+--------------------+-----+-----------------------------------+------------------+--------------------+-------------+--------------------+ | ||||||||||||||
only showing top 20 rows | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
La lecture s'est bien passé. Nous retrouvons toutes les colonnes attendue. Passons à la transformation. | ||||||||||||||
|
||||||||||||||
### Transformation de la données | ||||||||||||||
|
||||||||||||||
Avant de stocker notre donnée à un endroit, nous pouvons faire quelques transformation élémentaire. Dans une architecture orienté Datalake, la donnée est très peu transformé lorsqu'il est stocké dans le lac de donnée. Dans ce paradigme, ce sont les consommateurs qui vont transformer cette donnée pour donner de la valeur. Par exemple, effectuer une aggrégation pour ensuite la consommer sur un outil de Data Visualisation (aka DataViz). | ||||||||||||||
|
||||||||||||||
D'après les premières observations, | ||||||||||||||
- la colonne "Boucle de comptage" semble être une concaténation de "Numéro de boucle" et "Libellé" | ||||||||||||||
- la colonne "Date formatée" semble être une date | ||||||||||||||
- la colonne "Jour de la semaine" est extrapolé depuis la colonne "Date formatée" | ||||||||||||||
- la colonne "Probabilité de présence d'anomalies" m'indique si la ligne est de qualité ou non | ||||||||||||||
|
||||||||||||||
Je vais conserver les colonnes qui m'interesse et typer les colonnes. Je prévois de les stocker au format parquet car ça va me permettre de conserver le typage des colonnes, et d'effectuer un partitionnement par jour. | ||||||||||||||
|
||||||||||||||
<div class="admonition important" markdown="1"><p class="admonition-title">Important</p> | ||||||||||||||
Un Dataframe est un objet immutable. Lors de l'ajout d'instruction, une nouvelle instance de Dataframe est renvoyé. | ||||||||||||||
</div> | ||||||||||||||
|
||||||||||||||
Voici la transformation | ||||||||||||||
|
||||||||||||||
```python | ||||||||||||||
df_clean = ( | ||||||||||||||
df.select( | ||||||||||||||
col("Numéro de boucle").alias("loop_number"), | ||||||||||||||
col("Libellé").alias("label"), | ||||||||||||||
col("Total").cast(IntegerType()).alias("total"), | ||||||||||||||
col("Date formatée").cast(DateType()).alias("date"), | ||||||||||||||
col("Vacances").alias("holiday_name"), | ||||||||||||||
) | ||||||||||||||
.where(col("Probabilité de présence d'anomalies").isNull()) | ||||||||||||||
) | ||||||||||||||
df_clean.show() | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
Et le résultat | ||||||||||||||
|
||||||||||||||
```shell | ||||||||||||||
(venv) [thierry@travail:~/eleven/data] | ||||||||||||||
% python main.py | ||||||||||||||
+-----------+--------------------+-----+----------+--------------------+ | ||||||||||||||
|loop_number| label|total| date| holiday_name| | ||||||||||||||
+-----------+--------------------+-----+----------+--------------------+ | ||||||||||||||
| 0674|Pont Haudaudine v...| 657|2021-03-16| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 689|2021-03-18| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 589|2021-03-26| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 591|2021-04-15| Hors Vacances| | ||||||||||||||
| 0674|Pont Haudaudine v...| 481|2021-05-04|Vacances de print...| | ||||||||||||||
| 0674|Pont Haudaudine v...| 583|2021-05-10| Hors Vacances| | ||||||||||||||
+-----------+--------------------+-----+----------+--------------------+ | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
### Stockage du résultat en Parquet | ||||||||||||||
|
||||||||||||||
Nous allons stocker le résultat au format parquet. Ce format offre l'avantage de | ||||||||||||||
- stocker la données en colonne | ||||||||||||||
- conserver le typage des colonnes | ||||||||||||||
- partitionner les données pour optimiser les requêtes | ||||||||||||||
|
||||||||||||||
Pour cela, il faut faire appel au DataFrameWriter. Le support du format parquet est natif. | ||||||||||||||
|
||||||||||||||
```python | ||||||||||||||
df_clean.write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet") | ||||||||||||||
``` | ||||||||||||||
|
||||||||||||||
Ainsi, dans l'arboresence, nous avons nos données partitionné par date. | ||||||||||||||
|
||||||||||||||
## Conclusion | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Bravo, vous venez de créer votre premier pipeline Spark. Un nouveau monde s'ouvre à vous. A travers cet article, nous avons vu l'installation de Spark et PySpark. Avec la création du pipeline, nous avons lu la source de données, effectuées quelques transformation, et enfin stocké la données à un endroit. Ce stockage permettra à d'autre corps de métier de la data de l'exploiter. | ||||||||||||||
|
||||||||||||||
## Références | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Code complet | ||||||||||||||
|
||||||||||||||
```python | ||||||||||||||
from pyspark.sql import SparkSession | ||||||||||||||
from pyspark.sql.functions import col | ||||||||||||||
from pyspark.sql.types import IntegerType, DateType | ||||||||||||||
|
||||||||||||||
spark = SparkSession.builder.appName("Bike calculation").getOrCreate() | ||||||||||||||
|
||||||||||||||
source_file = "source/244400404_comptages-velo-nantes-metropole.csv" | ||||||||||||||
df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file) | ||||||||||||||
|
||||||||||||||
df_clean = ( | ||||||||||||||
df.select( | ||||||||||||||
col("Numéro de boucle").alias("loop_number"), | ||||||||||||||
col("Libellé").alias("label"), | ||||||||||||||
col("Total").cast(IntegerType()).alias("total"), | ||||||||||||||
col("Date formatée").cast(DateType()).alias("date"), | ||||||||||||||
col("Vacances").alias("holiday_name"), | ||||||||||||||
) | ||||||||||||||
.where(col("Probabilité de présence d'anomalies").isNull()) | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
df_clean.write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet") | ||||||||||||||
``` | ||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.