← retour aux snippets

asyncio: exécuter un sous-processus et capturer

Lancer un processus de façon asynchrone et lire stdout/stderr.

objectif

Lancer un processus de façon asynchrone et lire stdout/stderr.

code minimal

import asyncio, sys

async def main():
    p = await asyncio.create_subprocess_exec(sys.executable, "-c", "print('ok')",
                                             stdout=asyncio.subprocess.PIPE,
                                             stderr=asyncio.subprocess.PIPE)
    out, err = await p.communicate()
    print(out.decode().strip() == "ok" and p.returncode == 0)  # attendu: True

asyncio.run(main())

utilisation

import asyncio, sys
async def once():
    p = await asyncio.create_subprocess_exec(sys.executable, "-c", "print('x')", stdout=asyncio.subprocess.PIPE)
    out, _ = await p.communicate()
    return out.decode().strip()
print(asyncio.run(once()) == "x")

variante(s) utile(s)

import asyncio, sys
async def code(status: int):
    p = await asyncio.create_subprocess_exec(sys.executable, "-c", f"import sys;sys.exit({status})")
    await p.wait()
    return p.returncode
print(asyncio.run(code(3)) == 3)

notes

  • Idéal pour lancer N commandes en parallèle avec asyncio.gather.
  • Évitez shell=True; passez une liste d’arguments.