objectif
Attendre la lisibilité/écriture de sockets/fichiers sans bloquer.
code minimal
import selectors, socket
sel = selectors.DefaultSelector()
a, b = socket.socketpair()
try:
a.setblocking(False); b.setblocking(False)
sel.register(a, selectors.EVENT_READ)
b.send(b"x")
events = sel.select(timeout=0.1)
print(any(key.fileobj is a for key, _ in events)) # attendu: True
finally:
sel.close(); a.close(); b.close()
utilisation
import selectors, socket
sel = selectors.DefaultSelector()
a, b = socket.socketpair()
try:
sel.register(a, selectors.EVENT_READ, data="tag")
b.send(b"ok")
ready = sel.select(0.05)
print(len(ready) == 1 and ready[0][0].data == "tag")
finally:
sel.close(); a.close(); b.close()
variante(s) utile(s)
import selectors
print(hasattr(selectors, "DefaultSelector"))
notes
- DefaultSelector choisit epoll/kqueue/poll selon la plate-forme.
- Utilisez setblocking(False) sur les sockets pour un vrai non-bloquant.