并发 线程 进程与线程
一个进程是操作系统中运行的一个任务
进程拥有独立的 CPU 和内存资源
多进程是支持并发的
一个线程是一个进程下运行的一个任务
创建线程
使用threading模块中的Thread类创建线程
start()方法:启动线程
join()方法: 等待线程结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def task (): for i in range (5 ): print (i) thread_1 = Thread(target=task) thread_2 = Thread(target=task) thread_1.start() thread_2.start() thread_1.join() print ("thread_1 end" )thread_2.join() print (“thread_2 end”)
- 带参数的方法调用使用`args`传入参数
1 2 3 4 5 6 7 8 9 def task (num: int ): for i in range (num): print (i) thread_1 = Thread(target=task, args = (5 ,)) thread_2 = Thread(target=task, args = (10 ,)) thread_1.start() thread_2.start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MyThread (Thread ): def __init__ (self, name:str , num: int ): super ().__init__() self .setName(name) self .num = num def run (self ) -> None : for i in range (self .num): print (f"{self.getName()} : {i} " ) time.sleep(1 ) thread_1 = MyThread("A" , 5 ) thread_2 = MyThread("B" , 10 ) thread_1.start() thread_2.start()
守护线程
当主线程结束时,守护线程自动结束
主线程中有非守护线程时,会等待非守护线程执行完才结束
守护线程一般用于日志等 非关键线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def task (num: int ): for i in range (num): print (i) thread_1 = Thread(target=task,args=(5 ,), daemon=True ) thread_1.start() class MyThread (Thread ): def __init__ (self, name:str , num: int ): super ().__init__() self .setName(name) self .setDaemon(True ) self .num = num def run (self ) -> None : for i in range (self .num): print (f"{self.getName()} : {i} " ) time.sleep(1 ) thread_2 = MyThread("A" , 5 ) thread_2.start()
线程安全队列 queue 模块中 Queue 类提供了线程安全队列
queue.put(item, block=True)
block 为 True 当 queue 满了的时候线程会等待,直到有空间 put
block 为 False 当 queue 满了的时候会抛出异常
queue.put(item, timeout=5)
timeout 当 queue 满了的时候等待 timeout 时长,若还是满着抛出异常,若空闲了则 put
queue.get(block=True)
queue.get(timeout=5)
queue.qsize()
queue.empty()
queue.full()
通过一个生产者、消费者模型进行演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class Producer (Thread ): def __init__ (self, name:str , num: int , queue: Queue ): super ().__init__() self .setName(name) self .num = num self .queue = queue def run (self ) -> None : for i in range (self .num): item = f"{self.getName()} : {i} " queue.put(item, block=True ) class Consumer (Thread ): def __init__ (self, name: str , queue: Queue ): super ().__init__() self .setName(name) self .queue = queue self .setDaemon(True ) def run (self ) -> None : while True : item = self .queue.get(block=True ) print (f"{self.getName()} ->{item} \n" , end="" ) queue = Queue(3 ) threads = [] threads.append(Producer("P1" , 5 , queue)) threads.append(Producer("P2" , 5 , queue)) threads.append(Producer("P3" , 5 , queue)) threads.append(Consumer("C1" , queue)) threads.append(Consumer("C1" , queue)) for thread in threads: thread.start()
线程锁 多个线程同一时刻访问同一资源时,会出现竞争的现象,容易造成数据的丢失、覆盖等问题
1 2 3 4 5 6 7 8 9 10 11 12 13 def task (name: str ): print (f"{name} : 1\n" , end="" ) print (f"{name} : 2\n" , end="" ) print (f"{name} : 3\n" , end="" ) thread_1 = Thread(target=task, args=("A" ,)) thread_2 = Thread(target=task, args=("B" ,)) thread_3 = Thread(target=task, args=("C" ,)) threads = [thread_1, thread_2, thread_3] for thread in threads: thread.start()
可以使用锁来解决这一问题
Lock
lock.acquire()获取锁,当一个锁对象中的锁被获取了,另一个需要获取的需要等待
lock.release()释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 lock = Lock() def task (name: str ): global lock lock.acquire() print (f"{name} : 1\n" , end="" ) print (f"{name} : 2\n" , end="" ) print (f"{name} : 3\n" , end="" ) lock.release() thread_1 = Thread(target=task, args=("A" ,)) thread_2 = Thread(target=task, args=("B" ,)) thread_3 = Thread(target=task, args=("C" ,)) threads = [thread_1, thread_2, thread_3] for thread in threads: thread.start()
Condition
acquire()
release()
wait() 等待锁的释放
notify_all() 通知其他等待锁的线程
自己实现一个线程安全队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 class MySafeQueue (): def __init__ (self, size: int ): self .__queue = [] self .size = size self .__lock = Condition() def put (self, item ): self .__lock.acquire() while len (self .__queue) >= self .size: self .__lock.wait() self .__queue.append(item) self .__lock.notify_all() self .__lock.release() def get (self ): self .__lock.acquire() while len (self .__queue) == 0 : self .__lock.wait() item = self .__queue.pop(0 ) self .__lock.notify_all() self .__lock.release() return item class Producer (Thread ): def __init__ (self, name: str , num: int , queue: MySafeQueue ): super ().__init__() self .setName(name) self .num = num self .queue = queue def run (self ) -> None : for i in range (self .num): item = f"{self.getName()} : {i} " queue.put(item) class Consumer (Thread ): def __init__ (self, name: str , queue: MySafeQueue ): super ().__init__() self .setName(name) self .queue = queue self .setDaemon(True ) def run (self ) -> None : while True : item = self .queue.get() print (f"{self.getName()} ->{item} \n" , end="" ) queue = MySafeQueue(3 ) threads = [] threads.append(Producer("P1" , 5 , queue)) threads.append(Producer("P2" , 5 , queue)) threads.append(Producer("P3" , 5 , queue)) threads.append(Consumer("C1" , queue)) threads.append(Consumer("C1" , queue)) for thread in threads: thread.start()
锁支持上下文管理器,因此可以改成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class MySafeQueue (): def __init__ (self, size: int ): self .__queue = [] self .size = size self .__lock = Condition() def put (self, item ): with self .__lock: while len (self .__queue) >= self .size: self .__lock.wait() self .__queue.append(item) self .__lock.notify_all() def get (self ): with self .__lock: while len (self .__queue) == 0 : self .__lock.wait() item = self .__queue.pop(0 ) self .__lock.notify_all() return item
线程池
线程的创建和销毁昂贵
频繁操作(创建与销毁)造成性能低
线程池的出现使得便于对 Python 的线程进行管理、提高性能
ThreadPoolExecutor
submit()告诉线程池执行者,启动一个任务,返回值为 future 对象,包含执行结果或者异常
map()执行多个任务
shutdown()关闭线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def task (name: str ): print (f'{name} started' ) time.sleep(1 ) print (f'{name} processed' ) return f"{name} finished" with ThreadPoolExecutor() as executor: result_1 = executor.submit(task, 'task1' ) result_2 = executor.submit(task, 'task2' ) print (result_1.result()) print (result_2.result()) with ThreadPoolExecutor() as executor: result_1 = executor.map (task, ['task3' , 'task4' ]) print (list (result_1))
多进程
multiprocessing模块用于提供多进程的实现
multiprocessing.Process用于创建进程
start()用于启动进程
join()用于等待进程结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def task (name: str , num: int ): print (f'{name} :{num} ->start process\n' , end="" ) res = 0 for i in range (num): res += i print (f'{name} :{num} ->end process, result:{res} \n' , end="" ) def process_1 (): task_process = multiprocessing.Process(target=task, args=("task1" , 10 )) task_process.start() task_process.join() print ("process_1 end" ) def process_2 (): task_process_args = [("task2" , 10 ), ("task3" , 10 ), ("task4" , 10 )] task_processes = [multiprocessing.Process(target=task, args=arg) for arg in task_process_args] for task_process in task_processes: task_process.start() for task_process in task_processes: task_process.join() print ("process_2 end" ) if __name__ == '__main__' : process_1() process_2()
可以使用ps -ef | grep python来查看是否有这么多进程
进程池
进程的创建与销毁同样是昂贵的
频繁地操作(创建与销毁)对性能影响很大
ProcessPoolExecutor
submit()告诉进程池执行者,启动一个任务,返回值为 future 对象,包含执行结果或者异常
map()执行多个进程
shutdown()关闭进程池
1 2 3 4 5 6 7 8 9 10 11 def task (name: str ): print (f'{name} started' ) time.sleep(1 ) print (f'{name} processed' ) return f"{name} finished" if __name__ == '__main__' : with ProcessPoolExecutor() as executor: result_1 = executor.map (task, ['task3' , 'task4' ]) print (list (result_1))
更新: 2024-05-16 13:18:58 原文: https://www.yuque.com/zacharyblock/cx2om6/man8qrk3tugq2v5t
Author:
Zachary Block
Permalink:
http://blockzachary.cn/blog/2636183998/
License:
Copyright (c) 2019 CC-BY-NC-4.0 LICENSE