objectif
Empêcher les exécutions concurrentes avec un fichier verrou.
code minimal
import os, tempfile, pathlib
def acquire_lock(path: pathlib.Path):
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
fd = os.open(path, flags, 0o600)
return fd
def release_lock(fd: int, path: pathlib.Path):
os.close(fd)
os.unlink(path)
with tempfile.TemporaryDirectory() as tmp:
lock = pathlib.Path(tmp) / "app.lock"
fd = acquire_lock(lock)
try:
try:
acquire_lock(lock)
except FileExistsError:
print(True) # attendu: True (déjà verrouillé)
finally:
release_lock(fd, lock)
utilisation
print(True)
variante(s) utile(s)
import os, tempfile, pathlib
with tempfile.TemporaryDirectory() as tmp:
p = pathlib.Path(tmp) / "x.lock"
fd = os.open(p, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600); os.close(fd); os.unlink(p)
print(True)
notes
- O_EXCL assure l’exclusivité (non atomique via NFS).
- Écrivez le PID dans le lock pour debug si besoin.