objectif
Exécuter une fonction bloquante dans un thread sans bloquer l’event loop.
code minimal
import asyncio, time
def blocking(): time.sleep(0.005); return 1
async def main():
v = await asyncio.to_thread(blocking)
print(v == 1) # attendu: True
asyncio.run(main())
utilisation
import asyncio
def cpu_light(n:int)->int: return sum(range(n))
async def go():
r = await asyncio.to_thread(cpu_light, 1000)
return r
print(asyncio.run(go()) >= 0)
variante(s) utile(s)
import asyncio
print(callable(asyncio.to_thread))
notes
- Préférez to_thread pour petites tâches bloquantes ponctuelles.
- Pour CPU lourd, utilisez ProcessPoolExecutor.