objectif
Lancer plusieurs tâches; en cas d’erreur, annuler les autres (3.11+).
code minimal
import asyncio
async def sq(i):
await asyncio.sleep(0.005)
return i*i
async def main():
total = 0
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(sq(i)) for i in range(5)]
total = sum(t.result() for t in tasks)
except AttributeError:
# TaskGroup absent (<3.11) -> fallback simple
total = sum([i*i for i in range(5)])
print(total == 30) # attendu: True
asyncio.run(main())
utilisation
import asyncio
async def f(): return 1
print(asyncio.run(f()) == 1)
variante(s) utile(s)
import asyncio
async def err(): raise RuntimeError()
try:
async def demo():
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(0.001))
tg.create_task(err())
asyncio.run(demo())
except Exception:
print(True)
notes
- TaskGroup annule automatiquement les tâches sœurs en cas d’échec.
- Fournit une structure plus sûre que gather sur gros ensembles.