Construire un pipeline ETL, c’est le quotidien du data engineer. Trop souvent, on tombe dans le piège de la complexité : Spark pour 500 Mo de CSV, Airflow pour un job qui tourne en 30 secondes, Pandas qui explose en mémoire sur un fichier un peu trop large. En 2026, Polars a changé la donne : un moteur DataFrame en Rust, accessible en Python, qui écrase tout en performance avec une API expressive. Voici comment monter un pipeline complet en 30 minutes.
Pourquoi Polars plutôt que Pandas ou Spark ?
Un rapide comparatif avant de coder :
| Critère | Pandas | Spark | Polars |
|---|---|---|---|
| Démarrage | Instantané | 10-30s (JVM) | Instantané |
| Mémoire | Élevée (tout en RAM) | Distribuée | Faible (exécution lazy + streaming) |
| Performance 10M lignes | ~8s (groupby) | ~2s (8 workers) | ~0.3s (single-thread) |
| Courbe d’apprentissage | Faible | Élevée | Moyenne (API chaînée) |
| Idéal pour | Exploration, prototypage | Clusters, To+ de données | ETL mono-machine, 100 Mo – 100 Go |
Polars est le choix optimal pour 90 % des pipelines ETL qui ne justifient pas un cluster distribué. Avec son mode lazy, il optimise automatiquement l’ordre des opérations et ne lit que les colonnes nécessaires.
Architecture de notre pipeline
[CSV source] + [API REST] → Polars (extract) → Polars (transform)
→ Validation (data quality) → PostgreSQL (load)
On va :
- Extraire des ventes depuis un CSV et enrichir avec une API externe (taux de change)
- Transformer : nettoyage, jointures, agrégations fenêtrées
- Valider la qualité des données avant chargement
- Charger dans PostgreSQL avec upsert (INSERT ON CONFLICT)
Étape 1 : Extraction depuis CSV et API
import polars as pl
import requests
from datetime import datetime
# 1. Extraction CSV
ventes = pl.read_csv(
"data/ventes_brutes.csv",
schema_overrides={
"date_vente": pl.Date,
"montant": pl.Float64,
"quantite": pl.Int64
}
)
# 2. Enrichissement via API externe (taux de change EUR→USD)
def get_taux_change(date_vente: str) -> float:
"""Récupère le taux EUR/USD pour une date donnée."""
url = f"https://api.frankfurter.app/{date_vente}"
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.json()["rates"]["USD"]
# Application du taux ligne par ligne (pour une API simple)
taux_map = {
d: get_taux_change(str(d))
for d in ventes.select("date_vente").unique().to_series().to_list()
}
ventes = ventes.with_columns(
pl.col("date_vente")
.replace_strict(taux_map, default=1.08)
.alias("taux_usd")
)
ventes = ventes.with_columns(
(pl.col("montant") * pl.col("taux_usd")).alias("montant_usd")
)
print(f"✓ {ventes.height} lignes extraites et enrichies")
print(ventes.head(3))
Étape 2 : Transformation avec Polars lazy
Le mode lazy est le super-pouvoir de Polars. Au lieu d’exécuter chaque opération immédiatement, il construit un plan d’exécution optimisé :
# Pipeline lazy — rien n'est exécuté avant .collect()
pipeline = (
ventes.lazy()
# Nettoyage
.filter(pl.col("montant").is_not_null() & (pl.col("montant") > 0))
.filter(pl.col("quantite").is_not_null() & (pl.col("quantite") > 0))
.with_columns(
pl.col("categorie").fill_null("Non classé"),
pl.col("pays").str.to_uppercase()
)
# Agrégation : total par pays et mois
.with_columns(
pl.col("date_vente").dt.truncate("1mo").alias("mois")
)
.group_by(["mois", "pays", "categorie"])
.agg([
pl.col("montant").sum().alias("ca_total"),
pl.col("montant_usd").sum().alias("ca_total_usd"),
pl.col("quantite").sum().alias("unites_vendues"),
pl.len().alias("nb_transactions")
])
.sort(["mois", "ca_total"], descending=[False, True])
)
# Exécution optimisée
resultat = pipeline.collect()
print(f"✓ Agrégation terminée : {resultat.height} lignes")
La magie du lazy : Polars pousse les filtres avant les agrégations, ne lit que les colonnes utilisées, et peut streamer les données si elles dépassent la RAM.
Étape 3 : Validation qualité des données
Avant d’injecter dans PostgreSQL, on vérifie la cohérence. Pas besoin de Great Expectations pour un pipeline simple : une fonction de validation suffit :
from typing import List
import sys
class DataQualityError(Exception):
pass
def valider_donnees(df: pl.DataFrame) -> List[str]:
"""Valide le DataFrame et retourne les alertes."""
alertes = []
row_count = df.height
# 1. Volume minimal
if row_count < 10:
raise DataQualityError(f"Volume insuffisant : {row_count} lignes")
if row_count < 100:
alertes.append(f"⚠️ Volume faible : {row_count} lignes")
# 2. Pas de valeurs nulles dans les colonnes critiques
for col in ["mois", "pays", "ca_total"]:
null_count = df.select(pl.col(col).null_count()).item()
if null_count > 0:
raise DataQualityError(f"{null_count} NULL dans {col}")
# 3. Montants positifs
negatifs = df.filter(pl.col("ca_total") < 0).height
if negatifs > 0:
raise DataQualityError(f"{negatifs} CA négatifs détectés")
# 4. Distribution suspecte
moyenne_ca = df.select(pl.col("ca_total").mean()).item()
ecart_type = df.select(pl.col("ca_total").std()).item()
outliers = df.filter(
pl.col("ca_total") > moyenne_ca + 5 * ecart_type
)
if outliers.height > 0:
alertes.append(
f"⚠️ {outliers.height} outliers (>5σ) : "
f"{outliers.select('ca_total').to_series().to_list()}"
)
return alertes
# Exécution de la validation
alertes = valider_donnees(resultat)
for a in alertes:
print(a, file=sys.stderr)
print(f"✓ Validation OK : {resultat.height} lignes prêtes pour PostgreSQL")
Étape 4 : Chargement dans PostgreSQL avec upsert
On utilise psycopg2 avec INSERT ON CONFLICT pour un chargement incrémental (pas de doublons) :
import psycopg2
from psycopg2.extras import execute_values
# Connexion
conn = psycopg2.connect(
host="localhost", port=5432,
dbname="analytics", user="etl_user", password="secure_pass"
)
cur = conn.cursor()
# Création de la table si elle n'existe pas
cur.execute("""
CREATE TABLE IF NOT EXISTS analytics.ca_mensuel (
mois DATE NOT NULL,
pays VARCHAR(10) NOT NULL,
categorie VARCHAR(100) NOT NULL,
ca_total NUMERIC(12,2),
ca_total_usd NUMERIC(12,2),
unites_vendues INTEGER,
nb_transactions INTEGER,
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (mois, pays, categorie)
);
""")
# Upsert : INSERT … ON CONFLICT DO UPDATE
lignes = resultat.rows()
colonnes = resultat.columns
execute_values(
cur,
f"""
INSERT INTO analytics.ca_mensuel
({', '.join(colonnes)})
VALUES %s
ON CONFLICT (mois, pays, categorie)
DO UPDATE SET
ca_total = EXCLUDED.ca_total,
ca_total_usd = EXCLUDED.ca_total_usd,
unites_vendues = EXCLUDED.unites_vendues,
nb_transactions = EXCLUDED.nb_transactions,
updated_at = NOW()
""",
lignes,
page_size=1000
)
conn.commit()
print(f"✓ {len(lignes)} lignes chargées dans PostgreSQL")
# Log d'exécution
cur.execute(
"INSERT INTO analytics.etl_logs (pipeline, lignes, statut) VALUES (%s, %s, %s)",
("ca_mensuel", len(lignes), "success")
)
conn.commit()
cur.close()
conn.close()
Le script complet orchestré
#!/usr/bin/env python3
"""Pipeline ETL CA Mensuel — Polars → PostgreSQL
Usage : python etl_ca_mensuel.py [--date YYYY-MM-DD]
"""
import argparse
import logging
import sys
from datetime import datetime
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--date", help="Date d'exécution (YYYY-MM-DD)")
args = parser.parse_args()
date_exec = args.date or datetime.now().strftime("%Y-%m-%d")
logger.info(f"Démarrage pipeline ETL — date: {date_exec}")
try:
# 1. Extract
logger.info("Extraction des données…")
ventes = extract_ventes()
logger.info(f"Extraction OK : {ventes.height} lignes")
# 2. Transform
logger.info("Transformation…")
resultat = transform_ventes(ventes)
logger.info(f"Transformation OK : {resultat.height} lignes agrégées")
# 3. Validate
logger.info("Validation qualité…")
alertes = valider_donnees(resultat)
for a in alertes:
logger.warning(a)
logger.info("Validation OK")
# 4. Load
logger.info("Chargement PostgreSQL…")
charger_postgresql(resultat)
logger.info("Chargement terminé avec succès")
except Exception as e:
logger.error(f"Pipeline échoué : {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Ordonnancer avec cron ou systemd timer
Pour un pipeline quotidien, un simple cron suffit :
# /etc/cron.d/etl-analytics
# Tous les jours à 6h UTC
0 6 * * * etl_user /usr/bin/python3 /opt/etl/etl_ca_mensuel.py >> /var/log/etl/ca_mensuel.log 2>&1
Pour des dépendances plus complexes (attendre que le CSV soit livré, notifier Slack en cas d’échec), passez à Apache Airflow ou Prefect. Le script Python reste le même — seul l’orchestrateur change.
Performance et bonnes pratiques
- Toujours utiliser le mode lazy (
.lazy()) pour les pipelines : Polars optimise l’ordre des opérations et réduit la mémoire utilisée. - Streaming pour les gros fichiers :
.collect(streaming=True)traite les données par blocs sans tout charger en RAM. - Scan, pas read :
pl.scan_csv()au lieu depl.read_csv()en lazy pour ne pas lire le fichier avant le.collect(). - Index PostgreSQL : créez un index sur la clé d’upsert
(mois, pays, categorie)— c’est la PK, donc automatique, mais vérifiez sur les grosses tables. - Variables d’environnement pour les credentials : jamais de mot de passe en dur dans le script.
Conclusion
Avec Python + Polars + PostgreSQL, vous avez une stack ETL complète, performante et maintenable, en moins de 200 lignes de code. Pas de JVM, pas de cluster YARN, pas de vendor lock-in. Juste un script Python que vous pouvez versionner, tester et déployer en CI/CD.
En 2026, cette stack est le standard pour tout pipeline qui traite entre 100 Mo et 100 Go de données. Au-delà, Spark et les formats de table ouverts (Iceberg, Delta Lake) prennent le relais. Mais pour 90 % des besoins data d’une PME ou d’une scale-up, Polars fait le job — et le fait vite.
Sources : Polars Documentation, PostgreSQL INSERT ON CONFLICT, Psycopg2 Documentation, Frankfurter API (taux de change)
Commentaires (0)
Laisser un commentaire
Les commentaires sont modérés. Questions WordPress, cybersécurité ou dev web bienvenues.