Construire un pipeline ETL, c’est le quotidien du data engineer. Trop souvent, on tombe dans le piège de la complexité : Spark pour 500 Mo de CSV, Airflow pour un job qui tourne en 30 secondes, Pandas qui explose en mémoire sur un fichier un peu trop large. En 2026, Polars a changé la donne : un moteur DataFrame en Rust, accessible en Python, qui écrase tout en performance avec une API expressive. Voici comment monter un pipeline complet en 30 minutes.

Pourquoi Polars plutôt que Pandas ou Spark ?

Un rapide comparatif avant de coder :

Critère Pandas Spark Polars
Démarrage Instantané 10-30s (JVM) Instantané
Mémoire Élevée (tout en RAM) Distribuée Faible (exécution lazy + streaming)
Performance 10M lignes ~8s (groupby) ~2s (8 workers) ~0.3s (single-thread)
Courbe d’apprentissage Faible Élevée Moyenne (API chaînée)
Idéal pour Exploration, prototypage Clusters, To+ de données ETL mono-machine, 100 Mo – 100 Go

Polars est le choix optimal pour 90 % des pipelines ETL qui ne justifient pas un cluster distribué. Avec son mode lazy, il optimise automatiquement l’ordre des opérations et ne lit que les colonnes nécessaires.

Architecture de notre pipeline


[CSV source] + [API REST] → Polars (extract) → Polars (transform)
  → Validation (data quality) → PostgreSQL (load)

On va :

  1. Extraire des ventes depuis un CSV et enrichir avec une API externe (taux de change)
  2. Transformer : nettoyage, jointures, agrégations fenêtrées
  3. Valider la qualité des données avant chargement
  4. Charger dans PostgreSQL avec upsert (INSERT ON CONFLICT)

Étape 1 : Extraction depuis CSV et API

import polars as pl
import requests
from datetime import datetime

# 1. Extraction CSV
ventes = pl.read_csv(
    "data/ventes_brutes.csv",
    schema_overrides={
        "date_vente": pl.Date,
        "montant": pl.Float64,
        "quantite": pl.Int64
    }
)

# 2. Enrichissement via API externe (taux de change EUR→USD)
def get_taux_change(date_vente: str) -> float:
    """Récupère le taux EUR/USD pour une date donnée."""
    url = f"https://api.frankfurter.app/{date_vente}"
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    return resp.json()["rates"]["USD"]

# Application du taux ligne par ligne (pour une API simple)
taux_map = {
    d: get_taux_change(str(d))
    for d in ventes.select("date_vente").unique().to_series().to_list()
}

ventes = ventes.with_columns(
    pl.col("date_vente")
      .replace_strict(taux_map, default=1.08)
      .alias("taux_usd")
)

ventes = ventes.with_columns(
    (pl.col("montant") * pl.col("taux_usd")).alias("montant_usd")
)

print(f"✓ {ventes.height} lignes extraites et enrichies")
print(ventes.head(3))

Étape 2 : Transformation avec Polars lazy

Le mode lazy est le super-pouvoir de Polars. Au lieu d’exécuter chaque opération immédiatement, il construit un plan d’exécution optimisé :

# Pipeline lazy — rien n'est exécuté avant .collect()
pipeline = (
    ventes.lazy()
    # Nettoyage
    .filter(pl.col("montant").is_not_null() & (pl.col("montant") > 0))
    .filter(pl.col("quantite").is_not_null() & (pl.col("quantite") > 0))
    .with_columns(
        pl.col("categorie").fill_null("Non classé"),
        pl.col("pays").str.to_uppercase()
    )
    # Agrégation : total par pays et mois
    .with_columns(
        pl.col("date_vente").dt.truncate("1mo").alias("mois")
    )
    .group_by(["mois", "pays", "categorie"])
    .agg([
        pl.col("montant").sum().alias("ca_total"),
        pl.col("montant_usd").sum().alias("ca_total_usd"),
        pl.col("quantite").sum().alias("unites_vendues"),
        pl.len().alias("nb_transactions")
    ])
    .sort(["mois", "ca_total"], descending=[False, True])
)

# Exécution optimisée
resultat = pipeline.collect()
print(f"✓ Agrégation terminée : {resultat.height} lignes")

La magie du lazy : Polars pousse les filtres avant les agrégations, ne lit que les colonnes utilisées, et peut streamer les données si elles dépassent la RAM.

Étape 3 : Validation qualité des données

Avant d’injecter dans PostgreSQL, on vérifie la cohérence. Pas besoin de Great Expectations pour un pipeline simple : une fonction de validation suffit :

from typing import List
import sys

class DataQualityError(Exception):
    pass

def valider_donnees(df: pl.DataFrame) -> List[str]:
    """Valide le DataFrame et retourne les alertes."""
    alertes = []
    row_count = df.height

    # 1. Volume minimal
    if row_count < 10:
        raise DataQualityError(f"Volume insuffisant : {row_count} lignes")
    if row_count < 100:
        alertes.append(f"⚠️ Volume faible : {row_count} lignes")

    # 2. Pas de valeurs nulles dans les colonnes critiques
    for col in ["mois", "pays", "ca_total"]:
        null_count = df.select(pl.col(col).null_count()).item()
        if null_count > 0:
            raise DataQualityError(f"{null_count} NULL dans {col}")

    # 3. Montants positifs
    negatifs = df.filter(pl.col("ca_total") < 0).height
    if negatifs > 0:
        raise DataQualityError(f"{negatifs} CA négatifs détectés")

    # 4. Distribution suspecte
    moyenne_ca = df.select(pl.col("ca_total").mean()).item()
    ecart_type = df.select(pl.col("ca_total").std()).item()
    outliers = df.filter(
        pl.col("ca_total") > moyenne_ca + 5 * ecart_type
    )
    if outliers.height > 0:
        alertes.append(
            f"⚠️ {outliers.height} outliers (>5σ) : "
            f"{outliers.select('ca_total').to_series().to_list()}"
        )

    return alertes

# Exécution de la validation
alertes = valider_donnees(resultat)
for a in alertes:
    print(a, file=sys.stderr)
print(f"✓ Validation OK : {resultat.height} lignes prêtes pour PostgreSQL")

Étape 4 : Chargement dans PostgreSQL avec upsert

On utilise psycopg2 avec INSERT ON CONFLICT pour un chargement incrémental (pas de doublons) :

import psycopg2
from psycopg2.extras import execute_values

# Connexion
conn = psycopg2.connect(
    host="localhost", port=5432,
    dbname="analytics", user="etl_user", password="secure_pass"
)
cur = conn.cursor()

# Création de la table si elle n'existe pas
cur.execute("""
    CREATE TABLE IF NOT EXISTS analytics.ca_mensuel (
        mois DATE NOT NULL,
        pays VARCHAR(10) NOT NULL,
        categorie VARCHAR(100) NOT NULL,
        ca_total NUMERIC(12,2),
        ca_total_usd NUMERIC(12,2),
        unites_vendues INTEGER,
        nb_transactions INTEGER,
        updated_at TIMESTAMP DEFAULT NOW(),
        PRIMARY KEY (mois, pays, categorie)
    );
""")

# Upsert : INSERT … ON CONFLICT DO UPDATE
lignes = resultat.rows()
colonnes = resultat.columns

execute_values(
    cur,
    f"""
    INSERT INTO analytics.ca_mensuel
        ({', '.join(colonnes)})
    VALUES %s
    ON CONFLICT (mois, pays, categorie)
    DO UPDATE SET
        ca_total = EXCLUDED.ca_total,
        ca_total_usd = EXCLUDED.ca_total_usd,
        unites_vendues = EXCLUDED.unites_vendues,
        nb_transactions = EXCLUDED.nb_transactions,
        updated_at = NOW()
    """,
    lignes,
    page_size=1000
)

conn.commit()
print(f"✓ {len(lignes)} lignes chargées dans PostgreSQL")

# Log d'exécution
cur.execute(
    "INSERT INTO analytics.etl_logs (pipeline, lignes, statut) VALUES (%s, %s, %s)",
    ("ca_mensuel", len(lignes), "success")
)
conn.commit()

cur.close()
conn.close()

Le script complet orchestré

#!/usr/bin/env python3
"""Pipeline ETL CA Mensuel — Polars → PostgreSQL
Usage : python etl_ca_mensuel.py [--date YYYY-MM-DD]
"""
import argparse
import logging
import sys
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--date", help="Date d'exécution (YYYY-MM-DD)")
    args = parser.parse_args()

    date_exec = args.date or datetime.now().strftime("%Y-%m-%d")
    logger.info(f"Démarrage pipeline ETL — date: {date_exec}")

    try:
        # 1. Extract
        logger.info("Extraction des données…")
        ventes = extract_ventes()
        logger.info(f"Extraction OK : {ventes.height} lignes")

        # 2. Transform
        logger.info("Transformation…")
        resultat = transform_ventes(ventes)
        logger.info(f"Transformation OK : {resultat.height} lignes agrégées")

        # 3. Validate
        logger.info("Validation qualité…")
        alertes = valider_donnees(resultat)
        for a in alertes:
            logger.warning(a)
        logger.info("Validation OK")

        # 4. Load
        logger.info("Chargement PostgreSQL…")
        charger_postgresql(resultat)
        logger.info("Chargement terminé avec succès")

    except Exception as e:
        logger.error(f"Pipeline échoué : {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Ordonnancer avec cron ou systemd timer

Pour un pipeline quotidien, un simple cron suffit :

# /etc/cron.d/etl-analytics
# Tous les jours à 6h UTC
0 6 * * * etl_user /usr/bin/python3 /opt/etl/etl_ca_mensuel.py >> /var/log/etl/ca_mensuel.log 2>&1

Pour des dépendances plus complexes (attendre que le CSV soit livré, notifier Slack en cas d’échec), passez à Apache Airflow ou Prefect. Le script Python reste le même — seul l’orchestrateur change.

Performance et bonnes pratiques

  • Toujours utiliser le mode lazy (.lazy()) pour les pipelines : Polars optimise l’ordre des opérations et réduit la mémoire utilisée.
  • Streaming pour les gros fichiers : .collect(streaming=True) traite les données par blocs sans tout charger en RAM.
  • Scan, pas read : pl.scan_csv() au lieu de pl.read_csv() en lazy pour ne pas lire le fichier avant le .collect().
  • Index PostgreSQL : créez un index sur la clé d’upsert (mois, pays, categorie) — c’est la PK, donc automatique, mais vérifiez sur les grosses tables.
  • Variables d’environnement pour les credentials : jamais de mot de passe en dur dans le script.

Conclusion

Avec Python + Polars + PostgreSQL, vous avez une stack ETL complète, performante et maintenable, en moins de 200 lignes de code. Pas de JVM, pas de cluster YARN, pas de vendor lock-in. Juste un script Python que vous pouvez versionner, tester et déployer en CI/CD.

En 2026, cette stack est le standard pour tout pipeline qui traite entre 100 Mo et 100 Go de données. Au-delà, Spark et les formats de table ouverts (Iceberg, Delta Lake) prennent le relais. Mais pour 90 % des besoins data d’une PME ou d’une scale-up, Polars fait le job — et le fait vite.

Sources : Polars Documentation, PostgreSQL INSERT ON CONFLICT, Psycopg2 Documentation, Frankfurter API (taux de change)

W
WP Admin Lab

Architecte web full-stack. WordPress, performance, data et sécurité. Notes de terrain, tests reproductibles et retours d'expérience.