objectif
Communiquer entre threads avec une sentinelle d’arrêt propre.
code minimal
import threading, queue
q: queue.Queue[int | None] = queue.Queue()
out: list[int] = []
def producer():
for i in range(3): q.put(i)
q.put(None) # sentinelle
def consumer():
while True:
item = q.get()
if item is None: break
out.append(item*2)
q.task_done()
t1 = threading.Thread(target=producer); t2 = threading.Thread(target=consumer)
t1.start(); t2.start(); t1.join(); t2.join()
print(out == [0,2,4]) # attendu: True
utilisation
import queue
q = queue.Queue(maxsize=2)
q.put(1); q.put(2)
print(q.full())
variante(s) utile(s)
import queue
q = queue.SimpleQueue()
q.put("x")
print(q.get() == "x")
notes
- Utilisez une sentinelle None pour signaler la fin.
- task_done/join synchronisent la complétion des éléments.