← Retour au blog

Observabilité des données 2.0 lineage actif

Lucian BLETAN

Observer la data, c’est connecter qualité, fraîcheur et dépendances. Un lineage actif permet d’anticiper l’impact d’un incident, de prioriser la correction et de communiquer vite aux bons propriétaires. L’objectif: moins de bruit, plus d’actions concrètes, des SLO lisibles, et des preuves (journaux, diffs, changelog) reliées au graphe.

prérequis

  • un catalogue avec propriétaires (owner, on-call) par dataset et tableau de bord.
  • un stockage des métriques (fraîcheur, volumes, erreurs) par dataset.
  • une table de lineage (arêtes source -> cible) alimentée en continu.
  • un canal incident et un runbook clair, testé.

aperçu rapide

  • mesurer ce qui compte: fraîcheur, volume attendu vs observé, schéma, coût par requête.
  • relier chaque dataset à ses dépendances et exposer l’impact potentiel.
  • alerter avec contexte: diff de schéma, changelog, SLO cassé, blast radius.
  • suivre un runbook simple: détection -> qualification -> owner -> correctif -> rétro.

métriques essentielles

  • fraîcheur par dataset
  • volume attendu vs observé
  • schéma conforme
  • coût par requête
-- fraîcheur: âge depuis la dernière mise à jour
SELECT dataset, NOW() - MAX(updated_at) AS age
FROM updates
GROUP BY dataset
ORDER BY age DESC;

-- volume attendu vs observé (alerte si écart > 30 %)
WITH stats AS (
  SELECT dataset,
         DATE_TRUNC('day', ts) AS d,
         COUNT(*) AS n
  FROM loads
  GROUP BY 1,2
),
baseline AS (
  SELECT dataset, AVG(n) AS n_avg
  FROM stats
  WHERE d < CURRENT_DATE - INTERVAL '7 day'
  GROUP BY 1
)
SELECT s.dataset, s.d, s.n, b.n_avg,
       ROUND(100.0 * (s.n - b.n_avg) / NULLIF(b.n_avg,0), 1) AS pct_diff
FROM stats s JOIN baseline b USING(dataset)
WHERE s.d = CURRENT_DATE - INTERVAL '1 day'
  AND ABS(s.n - b.n_avg) > 0.3 * b.n_avg
ORDER BY pct_diff DESC;

-- schéma conforme (diff avec information_schema)
WITH current AS (
  SELECT table_schema || '.' || table_name AS dataset,
         column_name, data_type
  FROM information_schema.columns
  WHERE table_schema NOT IN ('information_schema','pg_catalog')
),
expected AS (
  SELECT dataset, column_name, data_type
  FROM governance.schema_expected
)
SELECT COALESCE(c.dataset,e.dataset) AS dataset,
       c.column_name AS col_cur, e.column_name AS col_exp,
       c.data_type   AS type_cur, e.data_type   AS type_exp
FROM current c
FULL OUTER JOIN expected e USING(dataset, column_name)
WHERE c.data_type IS DISTINCT FROM e.data_type
   OR c.column_name IS NULL
   OR e.column_name IS NULL
ORDER BY dataset;

-- coût par requête (logs de facturation unifiés)
SELECT user_email, query_id, dataset,
       ROUND(cost_eur, 2) AS cost_eur,
       duration_ms
FROM finops.query_log
WHERE ts >= NOW() - INTERVAL '1 day'
ORDER BY cost_eur DESC
LIMIT 50;

lineage actif (exploitable)

  • lier chaque vue à ses sources
  • exposer l’impact potentiel: tableaux de bord touchés, owners, SLO
  • alerter avec contexte (diff, changelog, lien vers runbook)
-- table edges: from_dataset -> to_dataset (DAG)
-- table dashboards: dashboard_id -> dataset -> owner
-- table slo_status: dataset -> slo -> ok/ko -> details

-- impact: tous les descendants d'une source cassée
WITH RECURSIVE g AS (
  SELECT from_dataset, to_dataset FROM lineage.edges
),
seed AS (
  SELECT UNNEST(ARRAY['raw.orders','raw.customers']) AS broken_ds  -- seed incident
),
reach(ds) AS (
  SELECT broken_ds FROM seed
  UNION
  SELECT g.to_dataset
  FROM g JOIN reach ON g.from_dataset = reach.ds
)
SELECT r.ds AS impacted_dataset,
       d.dashboard_id,
       d.owner,
       s.slo,
       s.status,
       s.details
FROM reach r
LEFT JOIN lineage.dashboards d ON d.dataset = r.ds
LEFT JOIN lineage.slo_status s ON s.dataset = r.ds
ORDER BY impacted_dataset, dashboard_id;

cartographie lisible (verticale)

raw.orders

dim.orders

raw.customers

dim.customers

mart.sales_daily

bi.revenue_dashboard

bi.ops_dashboard

SLO par dataset (modèle)

  • fraîcheur maximale
  • complétude minimale
  • stabilité du schéma
slo:
  dataset: "mart.sales_daily"
  freshness_max: "PT2H"          # âge max autorisé
  completeness_min_rows: 100000  # lignes attendues min
  schema_mode: "compatible"      # compatible | strict
  page_on_call: "@data-oncall"
-- évaluer un SLO de fraîcheur
WITH last AS (
  SELECT MAX(updated_at) AS ts FROM updates WHERE dataset = 'mart.sales_daily'
)
SELECT CASE
  WHEN NOW() - (SELECT ts FROM last) <= INTERVAL '2 hour' THEN 'ok'
  ELSE 'ko'
END AS freshness_status;

alertes avec contexte

  • inclure le diff de schéma (colonnes ajoutées/supprimées, types modifiés)
  • lier au changelog (qui a déployé quoi, quand, message)
  • joindre l’impact (descendants, dashboards, owners)
-- diff schéma vs hier
WITH cur AS (
  SELECT column_name, data_type
  FROM information_schema.columns
  WHERE table_schema = 'mart' AND table_name = 'sales_daily'
),
yday AS (
  SELECT column_name, data_type
  FROM governance.schema_snapshot
  WHERE dataset = 'mart.sales_daily' AND snapshot_day = CURRENT_DATE - 1
)
SELECT 'added' AS change, c.column_name, c.data_type FROM cur c
LEFT JOIN yday y USING(column_name)
WHERE y.column_name IS NULL
UNION ALL
SELECT 'removed', y.column_name, y.data_type FROM yday y
LEFT JOIN cur c USING(column_name)
WHERE c.column_name IS NULL
UNION ALL
SELECT 'type_change', c.column_name, c.data_type || ' <- ' || y.data_type
FROM cur c JOIN yday y USING(column_name)
WHERE c.data_type <> y.data_type;

runbook incident

  • détection -> qualification -> owner
  • communication vers consommateurs
  • correctif + test de non-régression
  • rétro et action préventive
ConsumersOwnerOnCallMonitorConsumersOwnerOnCallMonitoralerte SLO ko + impact + diffassigner incidentdiagnostic + ETA correctifmessage impact + contournementcorrectif déployé + tests okclose + post mortem planifié

gabarit (minimal) d’incident

incident:
  id: "inc-2024-08-04-001"
  trigger: "freshness_slo_ko"
  dataset: "mart.sales_daily"
  impact:
    dashboards: ["bi.revenue_dashboard"]
    owners_ping: ["@bi", "@sales"]
  findings:
    cause: "retrait colonne country sans vue compat"
    fix: "vue de compat + migration dashboards"
  actions:
    - "ajouter test de schema compatible"
    - "documenter migration dans changelog"

instrumentation: stocker, relier, prouver

  • stocker les métriques dans une table stable.
  • relier chaque alerte à un change_id (commit, déploiement).
  • prouver via exports et journaux (audit).
CREATE TABLE IF NOT EXISTS lineage.metrics (
  ts TIMESTAMP NOT NULL,
  dataset TEXT NOT NULL,
  freshness_s TEXT,
  rows BIGINT,
  errors BIGINT,
  cost_eur DOUBLE PRECISION,
  change_id TEXT
);

CREATE TABLE IF NOT EXISTS lineage.changelog (
  change_id TEXT PRIMARY KEY,
  dataset TEXT,
  author TEXT,
  message TEXT,
  deployed_at TIMESTAMP
);

exemples complets

cas 1: prioriser ce qui casse vraiment

-- top incidents par blast radius (nombre de dashboards touchés)
WITH impact AS (
  SELECT i.incident_id, COUNT(DISTINCT d.dashboard_id) AS blast
  FROM incidents i
  JOIN lineage_edges le ON le.from_dataset = i.dataset
  JOIN dashboards d ON d.dataset = le.to_dataset
  WHERE i.created_at >= CURRENT_DATE - INTERVAL '30 day'
  GROUP BY 1
)
SELECT incident_id, blast
FROM impact
ORDER BY blast DESC
LIMIT 10;

explications: concentrez les efforts sur les incidents avec le plus grand périmètre d’impact.

cas 2: déclencher une alerte “volume vs baseline”

INSERT INTO alerts(dataset, kind, details, created_at)
SELECT s.dataset, 'volume_drift',
       JSON_BUILD_OBJECT('n', s.n, 'baseline', b.n_avg, 'pct', ROUND(100.0*(s.n-b.n_avg)/b.n_avg,1)),
       NOW()
FROM stats s JOIN baseline b USING(dataset)
WHERE s.d = CURRENT_DATE - INTERVAL '1 day'
  AND ABS(s.n - b.n_avg) > 0.3 * b.n_avg;

explications: alerte quand l’écart dépasse 30 %, avec contexte chiffré.

bonnes pratiques

  • une métrique = un propriétaire; publier les SLO dans le catalogue.
  • un graphe utile > un graphe “joli”: actions et owners visibles d’abord.
  • alerter peu mais bien: inclure impact, diff et lien runbook.
  • tester les runbooks comme des sauvegardes: régulièrement, réellement.
  • versionner les vues critiques et documenter les migrations.

pièges

  • graphe ornemental -> inutile -> actions et owners visibles, impact chiffré.
  • alertes sans contexte -> bruit -> ajouter diff, SLO, blast radius.
  • métriques opaques -> incompréhension -> noms clairs, unités, liens vers la définition.
  • schéma instable -> casse en cascade -> compatibilité déclarée, déprécation guidée.
  • coûts invisibles -> surprise -> journaliser le coût par requête et l’exposer.

faq

  • Comment démarrer si je n’ai pas de lineage outillé ? Commencez par une table edges simple (source -> cible) alimentée par vos orchestrateurs et vues. Même partielle, elle suffit pour un premier impact.

  • Faut-il des SLO pour tous les datasets ? Non. Ciblez d’abord les produits de données exposés (marts, APIs, exports) et les sources critiques.

  • Comment éviter les fausses alertes de volume ? Calculez une baseline robuste (médiane glissante) et ajoutez des jours spéciaux (jours fériés, fins de mois) dans vos règles.

  • Qui doit être on-call ? L’équipe propriétaire du produit de données impacté, avec un relais plateforme pour les incidents transverses.

  • Comment relier alertes et changements ? Exigez un change_id dès qu’un déploiement modifie un dataset et joignez-le aux métriques et aux alertes.