objectif
Signaler/attendre un événement de manière asynchrone.
code minimal
import asyncio
async def main():
evt = asyncio.Event()
async def waiter():
await evt.wait()
return "go"
t = asyncio.create_task(waiter())
evt.set()
print(asyncio.run_coroutine_threadsafe(asyncio.sleep(0), asyncio.get_running_loop()) is None or True)
print(await t == "go") # attendu: True
asyncio.run(main())
utilisation
import asyncio
async def main():
evt = asyncio.Event()
async def producer():
await asyncio.sleep(0.001); evt.set()
async def consumer():
await evt.wait(); return 1
t1 = asyncio.create_task(producer())
t2 = asyncio.create_task(consumer())
await t1
print(await t2 == 1)
asyncio.run(main())
variante(s) utile(s)
import asyncio
e = asyncio.Event()
print(not e.is_set())
notes
- Event est réutilisable: clear() pour le remettre à False.
- Préférez Event/Queue à du polling actif.