← retour aux snippets

queue.Queue: producteur/consommateur avec sentinelle

Communiquer entre threads avec une sentinelle d'arrêt propre.

objectif

Communiquer entre threads avec une sentinelle d’arrêt propre.

code minimal

import threading, queue
q: queue.Queue[int | None] = queue.Queue()
out: list[int] = []

def producer():
    for i in range(3): q.put(i)
    q.put(None)  # sentinelle

def consumer():
    while True:
        item = q.get()
        if item is None: break
        out.append(item*2)
        q.task_done()

t1 = threading.Thread(target=producer); t2 = threading.Thread(target=consumer)
t1.start(); t2.start(); t1.join(); t2.join()
print(out == [0,2,4])  # attendu: True

utilisation

import queue
q = queue.Queue(maxsize=2)
q.put(1); q.put(2)
print(q.full())

variante(s) utile(s)

import queue
q = queue.SimpleQueue()
q.put("x")
print(q.get() == "x")

notes

  • Utilisez une sentinelle None pour signaler la fin.
  • task_done/join synchronisent la complétion des éléments.