并发

线程

进程与线程

  • 一个进程是操作系统中运行的一个任务
    • 进程拥有独立的 CPU 和内存资源
    • 多进程是支持并发的
  • 一个线程是一个进程下运行的一个任务
    • 线程之间共享进程的 CPU 和内存资源

创建线程

  • 使用threading模块中的Thread类创建线程
    • start()方法:启动线程
    • join()方法: 等待线程结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def task():
for i in range(5):
print(i)


thread_1 = Thread(target=task)
thread_2 = Thread(target=task)
thread_1.start()
thread_2.start()

thread_1.join()
print("thread_1 end")

thread_2.join()
print(“thread_2 end”)
- 带参数的方法调用使用`args`传入参数
1
2
3
4
5
6
7
8
9
def task(num: int):
for i in range(num):
print(i)


thread_1 = Thread(target=task, args = (5,))
thread_2 = Thread(target=task, args = (10,))
thread_1.start()
thread_2.start()
  • 通过继承Thread类来创建线程类
    • setName()方法可以给线程设置名字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MyThread(Thread):
def __init__(self, name:str, num: int):
super().__init__()

self.setName(name)
self.num = num

def run(self) -> None:
for i in range(self.num):
print(f"{self.getName()}: {i}")
time.sleep(1)

thread_1 = MyThread("A", 5)
thread_2 = MyThread("B", 10)

thread_1.start()
thread_2.start()

守护线程

  • 当主线程结束时,守护线程自动结束
  • 主线程中有非守护线程时,会等待非守护线程执行完才结束
  • 守护线程一般用于日志等 非关键线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 实现方法1
def task(num: int):
for i in range(num):
print(i)

thread_1 = Thread(target=task,args=(5,), daemon=True)
thread_1.start()

# 实现方法2
class MyThread(Thread):
def __init__(self, name:str, num: int):
super().__init__()

self.setName(name)
self.setDaemon(True)
self.num = num

def run(self) -> None:
for i in range(self.num):
print(f"{self.getName()}: {i}")
time.sleep(1)

thread_2 = MyThread("A", 5)
thread_2.start()

线程安全队列

queue 模块中 Queue 类提供了线程安全队列

  • queue.put(item, block=True)
    • block 为 True 当 queue 满了的时候线程会等待,直到有空间 put
    • block 为 False 当 queue 满了的时候会抛出异常
  • queue.put(item, timeout=5)
    • timeout 当 queue 满了的时候等待 timeout 时长,若还是满着抛出异常,若空闲了则 put
  • queue.get(block=True)
  • queue.get(timeout=5)
  • queue.qsize()
  • queue.empty()
  • queue.full()

通过一个生产者、消费者模型进行演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Producer(Thread):
def __init__(self, name:str, num: int, queue: Queue):
super().__init__()

self.setName(name)
self.num = num
self.queue = queue

def run(self) -> None:
for i in range(self.num):
item = f"{self.getName()}: {i}"
queue.put(item, block=True)

class Consumer(Thread):
def __init__(self, name: str, queue: Queue):
super().__init__()

self.setName(name)
self.queue = queue
self.setDaemon(True)

def run(self) -> None:
while True:
item = self.queue.get(block=True)
print(f"{self.getName()}->{item}\n", end="")


queue = Queue(3)
threads = []

threads.append(Producer("P1", 5, queue))
threads.append(Producer("P2", 5, queue))
threads.append(Producer("P3", 5, queue))

threads.append(Consumer("C1", queue))
threads.append(Consumer("C1", queue))

for thread in threads:
thread.start()

线程锁

多个线程同一时刻访问同一资源时,会出现竞争的现象,容易造成数据的丢失、覆盖等问题

1
2
3
4
5
6
7
8
9
10
11
12
13
def task(name: str):
print(f"{name}: 1\n", end="")
print(f"{name}: 2\n", end="")
print(f"{name}: 3\n", end="")


thread_1 = Thread(target=task, args=("A",))
thread_2 = Thread(target=task, args=("B",))
thread_3 = Thread(target=task, args=("C",))

threads = [thread_1, thread_2, thread_3]
for thread in threads:
thread.start()

可以使用锁来解决这一问题

  • Lock
    • lock.acquire()获取锁,当一个锁对象中的锁被获取了,另一个需要获取的需要等待
    • lock.release()释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
lock = Lock()


def task(name: str):
global lock
lock.acquire()
print(f"{name}: 1\n", end="")
print(f"{name}: 2\n", end="")
print(f"{name}: 3\n", end="")
lock.release()


thread_1 = Thread(target=task, args=("A",))
thread_2 = Thread(target=task, args=("B",))
thread_3 = Thread(target=task, args=("C",))

threads = [thread_1, thread_2, thread_3]
for thread in threads:
thread.start()
  • Condition
    • acquire()
    • release()
    • wait() 等待锁的释放
    • notify_all() 通知其他等待锁的线程

自己实现一个线程安全队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class MySafeQueue():

def __init__(self, size: int):
self.__queue = []
self.size = size
self.__lock = Condition()

def put(self, item):
self.__lock.acquire()
while len(self.__queue) >= self.size:
self.__lock.wait()
self.__queue.append(item)
self.__lock.notify_all()
self.__lock.release()

def get(self):
self.__lock.acquire()
while len(self.__queue) == 0:
self.__lock.wait()
item = self.__queue.pop(0)
self.__lock.notify_all()
self.__lock.release()
return item


class Producer(Thread):
def __init__(self, name: str, num: int, queue: MySafeQueue):
super().__init__()

self.setName(name)
self.num = num
self.queue = queue

def run(self) -> None:
for i in range(self.num):
item = f"{self.getName()}: {i}"
queue.put(item)


class Consumer(Thread):
def __init__(self, name: str, queue: MySafeQueue):
super().__init__()

self.setName(name)
self.queue = queue
self.setDaemon(True)

def run(self) -> None:
while True:
item = self.queue.get()
print(f"{self.getName()}->{item}\n", end="")


queue = MySafeQueue(3)
threads = []

threads.append(Producer("P1", 5, queue))
threads.append(Producer("P2", 5, queue))
threads.append(Producer("P3", 5, queue))

threads.append(Consumer("C1", queue))
threads.append(Consumer("C1", queue))

for thread in threads:
thread.start()

锁支持上下文管理器,因此可以改成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MySafeQueue():

def __init__(self, size: int):
self.__queue = []
self.size = size
self.__lock = Condition()

def put(self, item):
with self.__lock:
while len(self.__queue) >= self.size:
self.__lock.wait()
self.__queue.append(item)
self.__lock.notify_all()

def get(self):
with self.__lock:
while len(self.__queue) == 0:
self.__lock.wait()
item = self.__queue.pop(0)
self.__lock.notify_all()
return item

线程池

  • 线程的创建和销毁昂贵
  • 频繁操作(创建与销毁)造成性能低
  • 线程池的出现使得便于对 Python 的线程进行管理、提高性能

ThreadPoolExecutor

  • submit()告诉线程池执行者,启动一个任务,返回值为 future 对象,包含执行结果或者异常
    • result()
    • exception()
  • map()执行多个任务
  • shutdown()关闭线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def task(name: str):
print(f'{name} started')
time.sleep(1)
print(f'{name} processed')
return f"{name} finished"


with ThreadPoolExecutor() as executor:
result_1 = executor.submit(task, 'task1')
result_2 = executor.submit(task, 'task2')
print(result_1.result())
print(result_2.result())

with ThreadPoolExecutor() as executor:
result_1 = executor.map(task, ['task3', 'task4'])
print(list(result_1))

多进程

  • multiprocessing模块用于提供多进程的实现
    • multiprocessing.Process用于创建进程
    • start()用于启动进程
    • join()用于等待进程结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def task(name: str, num: int):
print(f'{name}:{num}->start process\n', end="")
res = 0
for i in range(num):
res += i
print(f'{name}:{num}->end process, result:{res}\n', end="")


def process_1():
task_process = multiprocessing.Process(target=task, args=("task1", 10))
task_process.start()
task_process.join()
print("process_1 end")


def process_2():
task_process_args = [("task2", 10), ("task3", 10), ("task4", 10)]
task_processes = [multiprocessing.Process(target=task, args=arg) for arg in task_process_args]
for task_process in task_processes:
task_process.start()
for task_process in task_processes:
task_process.join()
print("process_2 end")


if __name__ == '__main__':
process_1()
process_2()

可以使用ps -ef | grep python来查看是否有这么多进程

进程池

  • 进程的创建与销毁同样是昂贵的
  • 频繁地操作(创建与销毁)对性能影响很大

ProcessPoolExecutor

  • submit()告诉进程池执行者,启动一个任务,返回值为 future 对象,包含执行结果或者异常
    • result()
    • exception()
  • map()执行多个进程
  • shutdown()关闭进程池
1
2
3
4
5
6
7
8
9
10
11
def task(name: str):
print(f'{name} started')
time.sleep(1)
print(f'{name} processed')
return f"{name} finished"


if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
result_1 = executor.map(task, ['task3', 'task4'])
print(list(result_1))

更新: 2024-05-16 13:18:58
原文: https://www.yuque.com/zacharyblock/cx2om6/man8qrk3tugq2v5t