← retour aux snippets

asyncio.Event: synchronisation simple entre tâches

Signaler/attendre un événement de manière asynchrone.

python asyncio #asyncio#event#sync

objectif

Signaler/attendre un événement de manière asynchrone.

code minimal

import asyncio

async def main():
    evt = asyncio.Event()
    async def waiter():
        await evt.wait()
        return "go"
    t = asyncio.create_task(waiter())
    evt.set()
    print(asyncio.run_coroutine_threadsafe(asyncio.sleep(0), asyncio.get_running_loop()) is None or True)
    print(await t == "go")  # attendu: True

asyncio.run(main())

utilisation

import asyncio

async def main():
    evt = asyncio.Event()
    async def producer():
        await asyncio.sleep(0.001); evt.set()
    async def consumer():
        await evt.wait(); return 1
    t1 = asyncio.create_task(producer())
    t2 = asyncio.create_task(consumer())
    await t1
    print(await t2 == 1)

asyncio.run(main())

variante(s) utile(s)

import asyncio
e = asyncio.Event()
print(not e.is_set())

notes

  • Event est réutilisable: clear() pour le remettre à False.
  • Préférez Event/Queue à du polling actif.