Python中常用的线程通信方式主要有共享变量、Queue、Event、Condition、Semaphore和Barrier,这些方法都通过锁机制保证线程安全,用于实现多线程之间的通信和数据交换
一般来说,大部分遇到的多线程,只要能各自完成好各自的任务即可。少数情况下,不同线程可能需要在线程安全的情况下,进行通信和数据交换。Python 中常用的线程通信有以下方法。
共享变量是最简单的线程通信方式,比如进行计数更新等操作,但需要配合锁来保证线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import threading# 共享变量shared_data = 0lock = threading.Lock()def worker(): global shared_data with lock: # 使用锁保证线程安全 shared_data += 1threads = []for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start()for t in threads: t.join()print(f"最终结果: {shared_data}") # 应该是5 |
最常用的线程安全通信方式,是生产者-消费者模式的理想选择。比如使用优先级队列优先消费高优先级的数据(序号越低,优先级越高,越优先消费)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | from time import sleepfrom random import random, randintfrom threading import Threadfrom queue import PriorityQueuequeue = PriorityQueue()def producer(queue): print('Producer: Running') for i in range(5): # create item with priority value = random() priority = randint(0, 5) item = (priority, value) queue.put(item) # wait for all items to be processed queue.join() queue.put(None) print('Producer: Done')def consumer(queue): print('Consumer: Running') while True: # get a unit of work item = queue.get() if item is None: break sleep(item[1]) print(item) queue.task_done() print('Consumer: Done')producer = Thread(target=producer, args=(queue,))producer.start()consumer = Thread(target=consumer, args=(queue,))consumer.start()producer.join()consumer.join() |
1 2 3 4 5 6 7 8 9 | Producer: RunningConsumer: Running(0, 0.9945246262101098)(2, 0.35853829355476663)(2, 0.4794139132317813)(3, 0.8460111545035349)(5, 0.6047655828611674)Producer: DoneConsumer: Done |
线程模提供了 Event 用于线程间的简单信号传递。Event 对象管理内部标志的状态。标志初始为False,并通过 set() 方法变为 True,通过 clear() 方法重新设置为 False。wait() 方法会阻塞,直到标志变为 True。
比如下面使用 Event 通知,模拟交通信号灯周期性变化及车辆通行之间的协同运行。车辆根据信号灯的状态判断是否通行还是等待;车辆通行完毕以后,只剩下信号灯周期性变化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | from threading import *import timedef signal_state(): while True: time.sleep(5) print("Traffic Police Giving GREEN Signal") event.set() time.sleep(10) print("Traffic Police Giving RED Signal") event.clear()def traffic_flow(): num = 0 while num < 10: print("Waiting for GREEN Signal") event.wait() print("GREEN Signal ... Traffic can move") while event.is_set(): num = num + 1 print("Vehicle No:", num, " Crossing the Signal") time.sleep(2) print("RED Signal ... Traffic has to wait")event = Event()t1 = Thread(target=signal_state)t2 = Thread(target=traffic_flow)t1.start()t2.start() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | Waiting for GREEN SignalTraffic Police Giving GREEN SignalGREEN Signal ... Traffic can moveVehicle No: 1 Crossing the SignalVehicle No: 2 Crossing the SignalVehicle No: 3 Crossing the SignalVehicle No: 4 Crossing the SignalVehicle No: 5 Crossing the SignalTraffic Police Giving RED SignalRED Signal ... Traffic has to waitWaiting for GREEN SignalTraffic Police Giving GREEN SignalGREEN Signal ... Traffic can moveVehicle No: 6 Crossing the SignalVehicle No: 7 Crossing the SignalVehicle No: 8 Crossing the SignalVehicle No: 9 Crossing the SignalVehicle No: 10 Crossing the SignalTraffic Police Giving RED SignalRED Signal ... Traffic has to waitTraffic Police Giving GREEN SignalTraffic Police Giving RED SignalTraffic Police Giving GREEN SignalTraffic Police Giving RED Signal... |
线程模块中的 Condition 类实现了条件变量对象。条件对象会强制一个或多个线程等待,直到被另一个线程通知。Condition 用于更复杂的线程同步,允许线程等待特定条件。比如上面的 Event 的实现,其内部也是在使用 Condition。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import threadingimport time# 共享资源buffer = []MAX_ITEMS = 5condition = threading.Condition()def producer(): """生产者""" for i in range(10): time.sleep(0.2) with condition: while len(buffer) >= MAX_ITEMS: print("Buffer full,wait...") condition.wait() # 等待缓冲区有空位 item = f"item-{i}" buffer.append(item) print(f"Producer: {item}, Buffer: {len(buffer)}") condition.notify_all() # 通知消费者def consumer(): """消费者""" for i in range(10): time.sleep(0.8) with condition: while len(buffer) == 0: print("Buffer empty,wait...") condition.wait() # 等待缓冲区有数据 item = buffer.pop(0) print(f"Consumer: {item}, Buffer: {len(buffer)}") condition.notify_all() # 通知生产者# 创建线程prod = threading.Thread(target=producer)cons = threading.Thread(target=consumer)prod.start()cons.start()prod.join()cons.join() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | Producer: item-0, Buffer: 1Producer: item-1, Buffer: 2Producer: item-2, Buffer: 3Consumer: item-0, Buffer: 2Producer: item-3, Buffer: 3Producer: item-4, Buffer: 4Producer: item-5, Buffer: 5Buffer full,wait...Consumer: item-1, Buffer: 4Producer: item-6, Buffer: 5Buffer full,wait...Consumer: item-2, Buffer: 4Producer: item-7, Buffer: 5Buffer full,wait...Consumer: item-3, Buffer: 4Producer: item-8, Buffer: 5Buffer full,wait...Consumer: item-4, Buffer: 4Producer: item-9, Buffer: 5Consumer: item-5, Buffer: 4Consumer: item-6, Buffer: 3Consumer: item-7, Buffer: 2Consumer: item-8, Buffer: 1Consumer: item-9, Buffer: 0 |
Semaphore 信号量控制对共享资源的访问数量。信号量的基本概念是使用一个内部计数器,每个 acquire() 调用将其递减,每个 release() 调用将其递增。计数器永远不能低于零;当 acquire() 发现计数器为零时,它会阻塞,直到某个其他线程调用 release()。当然,从源码看,信号量也是通过 Condition 条件对象来进行实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import threadingimport time# 信号量,限制最多3个线程同时访问semaphore = threading.Semaphore(3)def access_resource(thread_id): """访问共享资源""" with semaphore: print(f"Thread {thread_id} acquire\n", end="") time.sleep(2) # 模拟资源访问 print(f"Thread {thread_id} release\n", end="")# 创建10个线程threads = []for i in range(10): t = threading.Thread(target=access_resource, args=(i,)) threads.append(t) t.start()for t in threads: t.join() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | Thread 0 acquireThread 1 acquireThread 2 acquireThread 0 releaseThread 3 acquireThread 1 releaseThread 2 releaseThread 4 acquireThread 5 acquireThread 3 releaseThread 6 acquireThread 4 releaseThread 7 acquireThread 5 releaseThread 8 acquireThread 6 releaseThread 9 acquireThread 7 releaseThread 8 releaseThread 9 release |
Barrier 使多个线程等待,直到指定数目的线程都到达某个点,这些线程才会被同时唤醒,然后继续往后执行(需要注意的是:如果没有设置 timeout,且总的线程数无法整除给定的线程数 parties 时,会导致线程阻塞,形成死锁)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import threadingimport time# 创建屏障,等待3个线程(注意:如果总的线程数无法整除3,则会导致线程阻塞)barrier = threading.Barrier(3)def worker(worker_id): """工作线程""" print(f"Worker {worker_id} start") time.sleep(worker_id) # 模拟不同工作速度 print(f"Worker {worker_id} arrive") barrier.wait() # 等待所有线程到达 print(f"Worker {worker_id} continue")# 创建3个线程threads = []for i in range(6): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()for t in threads: t.join() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | Worker 0 startWorker 0 arriveWorker 1 startWorker 2 startWorker 3 startWorker 4 startWorker 5 startWorker 1 arriveWorker 2 arriveWorker 2 continueWorker 0 continueWorker 1 continueWorker 3 arriveWorker 4 arriveWorker 5 arriveWorker 5 continueWorker 3 continueWorker 4 continue |
不管以什么样的方式进行线程通信,最重要的当属线程安全,线程通信的各种设计,也是建立在通过锁的机制保证线程安全的情况下来实现各种功能的。