objectif
Lancer des tâches concurrentes avec timeout et gestion des échecs par tâche.
code minimal
import asyncio
async def work(n: int, delay: float):
await asyncio.sleep(delay)
return n
async def main():
tasks = [asyncio.wait_for(work(1, 0.1), timeout=0.2),
asyncio.wait_for(work(2, 0.5), timeout=0.2)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# attendu: [1, TimeoutError(...)] -> True si un élément est Exception
print([isinstance(r, Exception) or r for r in results][0] == 1)
if __name__ == "__main__":
asyncio.run(main())
utilisation
import asyncio
async def ok(): return 42
print(asyncio.run(ok()))
variante(s) utile(s)
import asyncio
async def worker(i): await asyncio.sleep(0.01); return i*i
async def run():
res = await asyncio.gather(*[worker(i) for i in range(3)])
print(sum(res)) # 0^2+1^2+2^2 = 5
asyncio.run(run())
notes
- return_exceptions=True évite d’annuler toutes les tâches au premier échec.
- Préférez wait_for pour forcer un timeout par tâche.