objectif
Lancer un processus de façon asynchrone et lire stdout/stderr.
code minimal
import asyncio, sys
async def main():
p = await asyncio.create_subprocess_exec(sys.executable, "-c", "print('ok')",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
out, err = await p.communicate()
print(out.decode().strip() == "ok" and p.returncode == 0) # attendu: True
asyncio.run(main())
utilisation
import asyncio, sys
async def once():
p = await asyncio.create_subprocess_exec(sys.executable, "-c", "print('x')", stdout=asyncio.subprocess.PIPE)
out, _ = await p.communicate()
return out.decode().strip()
print(asyncio.run(once()) == "x")
variante(s) utile(s)
import asyncio, sys
async def code(status: int):
p = await asyncio.create_subprocess_exec(sys.executable, "-c", f"import sys;sys.exit({status})")
await p.wait()
return p.returncode
print(asyncio.run(code(3)) == 3)
notes
- Idéal pour lancer N commandes en parallèle avec asyncio.gather.
- Évitez shell=True; passez une liste d’arguments.