← retour aux snippets

asyncio: gather avec timeouts et erreurs isolées

Lancer des tâches concurrentes avec timeout et gestion des échecs par tâche.

objectif

Lancer des tâches concurrentes avec timeout et gestion des échecs par tâche.

code minimal

import asyncio

async def work(n: int, delay: float):
    await asyncio.sleep(delay)
    return n

async def main():
    tasks = [asyncio.wait_for(work(1, 0.1), timeout=0.2),
             asyncio.wait_for(work(2, 0.5), timeout=0.2)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # attendu: [1, TimeoutError(...)] -> True si un élément est Exception
    print([isinstance(r, Exception) or r for r in results][0] == 1)

if __name__ == "__main__":
    asyncio.run(main())

utilisation

import asyncio
async def ok(): return 42
print(asyncio.run(ok()))

variante(s) utile(s)

import asyncio
async def worker(i): await asyncio.sleep(0.01); return i*i
async def run():
    res = await asyncio.gather(*[worker(i) for i in range(3)])
    print(sum(res))  # 0^2+1^2+2^2 = 5
asyncio.run(run())

notes

  • return_exceptions=True évite d’annuler toutes les tâches au premier échec.
  • Préférez wait_for pour forcer un timeout par tâche.