并发编程

Python中实现并发编程的三种方案:多线程、多进程和异步I/O。并发编程的好处在于可以提升程序的执行效率以及改善用户体验;坏处在于并发的程序不容易开发和调试,同时对其他程序来说它并不友好。

  • 多线程:Python中提供了Thread类并辅以LockConditionEventSemaphoreBarrier。Python中有GIL来防止多个线程同时执行本地字节码,这个锁对于CPython是必须的,因为CPython的内存管理并不是线程安全的,因为GIL的存在多线程并不能发挥CPU的多核特性。

    ```Python “”” 面试题:进程和线程的区别和联系? 进程 - 操作系统分配内存的基本单位 - 一个进程可以包含一个或多个线程 线程 - 操作系统分配CPU的基本单位 并发编程(concurrent programming)

    1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行
    2. 改善用户体验 - 让耗时间的操作不会造成程序的假死 “”” import glob import os import threading

    from PIL import Image

    PREFIX = ‘thumbnails’

def generatethumbnail(infile, size, format=’PNG’): “””生成指定图片文件的缩略图””” file, ext = os.path.splitext(infile) file = file[file.rfind(‘/‘) + 1:] outfile = f’{PREFIX}/{file}{size[0]}_{size[1]}.{ext}’ img = Image.open(infile) img.thumbnail(size, Image.ANTIALIAS) img.save(outfile, format)

def main(): “””主函数””” if not os.path.exists(PREFIX): os.mkdir(PREFIX) for infile in glob.glob(‘images/*.png’): for size in (32, 64, 128):

  1. # 创建并启动线程
  2. threading.Thread(
  3. target=generate_thumbnail,
  4. args=(infile, (size, size))
  5. ).start()

if name == ‘main‘: main()

  1. 多个线程竞争资源的情况。
  2. ```Python
  3. “””
  4. 多线程程序如果没有竞争资源处理起来通常也比较简单
  5. 当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱
  6. 说明:临界资源就是被多个线程竞争的资源
  7. “””
  8. import time
  9. import threading
  10. from concurrent.futures import ThreadPoolExecutor
  11. class Account(object):
  12. “””银行账户”””
  13. def init(self):
  14. self.balance = 0.0
  15. self.lock = threading.Lock()
  16. def deposit(self, money):
  17. # 通过锁保护临界资源
  18. with self.lock:
  19. newbalance = self.balance + money
  20. time.sleep(0.001)
  21. self.balance = new_balance
  22. def main():
  23. “””主函数”””
  24. account = Account()
  25. # 创建线程池
  26. pool = ThreadPoolExecutor(max_workers=10)
  27. futures = []
  28. for in range(100):
  29. future = pool.submit(account.deposit, 1)
  30. futures.append(future)
  31. # 关闭线程池
  32. pool.shutdown()
  33. for future in futures:
  34. future.result()
  35. print(account.balance)
  36. if name == ‘main‘:
  37. main()

修改上面的程序,启动5个线程向账户中存钱,5个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading模块的Condition来实现线程调度,该对象也是基于锁来创建的,代码如下所示:

  1. “””
  2. 多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
  3. 多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
  4. 多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
  5. “””
  6. from concurrent.futures import ThreadPoolExecutor
  7. from random import randint
  8. from time import sleep
  9. import threading
  10. class Account:
  11. “””银行账户”””
  12. def init(self, balance=0):
  13. self.balance = balance
  14. lock = threading.RLock()
  15. self.condition = threading.Condition(lock)
  16. def withdraw(self, money):
  17. “””取钱”””
  18. with self.condition:
  19. while money > self.balance:
  20. self.condition.wait()
  21. newbalance = self.balance - money
  22. sleep(0.001)
  23. self.balance = new_balance
  24. def deposit(self, money):
  25. “””存钱”””
  26. with self.condition:
  27. new_balance = self.balance + money
  28. sleep(0.001)
  29. self.balance = new_balance
  30. self.condition.notify_all()
  31. def add_money(account):
  32. while True:
  33. money = randint(5, 10)
  34. account.deposit(money)
  35. print(threading.current_thread().name,
  36. ‘:’, money, ‘====>’, account.balance)
  37. sleep(0.5)
  38. def sub_money(account):
  39. while True:
  40. money = randint(10, 30)
  41. account.withdraw(money)
  42. print(threading.current_thread().name,
  43. ‘:’, money, ‘<====’, account.balance)
  44. sleep(1)
  45. def main():
  46. account = Account()
  47. with ThreadPoolExecutor(max_workers=15) as pool:
  48. for in range(5):
  49. pool.submit(addmoney, account)
  50. for in range(10):
  51. pool.submit(submoney, account)
  52. if name == _main:
  53. main()

  • 多进程:多进程可以有效的解决GIL的问题,实现多进程主要的类是Process,其他辅助的类跟threading模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing模块中有一个Queue类,它基于管道和锁机制提供了多个进程共享的队列。下面是官方文档上关于多进程和进程池的一个示例。

    ```Python “”” 多进程和进程池的使用 多线程因为GIL的存在不能够发挥CPU的多核特性 对于计算密集型任务应该考虑使用多进程 time python3 example22.py real 0m11.512s user 0m39.319s sys 0m0.169s 使用多进程后实际执行时间为11.512秒,而用户时间39.319秒约为实际执行时间的4倍 这就证明我们的程序通过多进程使用了CPU的多核特性,而且这台计算机配置了4核的CPU “”” import concurrent.futures import math

    PRIMES = [

    1. 1116281,
    2. 1297337,
    3. 104395303,
    4. 472882027,
    5. 533000389,
    6. 817504243,
    7. 982451653,
    8. 112272535095293,
    9. 112582705942171,
    10. 112272535095293,
    11. 115280095190773,
    12. 115797848077099,
    13. 1099726899285419

    ] * 5

def is_prime(n): “””判断素数””” if n % 2 == 0: return False

  1. sqrt_n = int(math.floor(math.sqrt(n)))
  2. for i in range(3, sqrt_n + 1, 2):
  3. if n % i == 0:
  4. return False
  5. return True

def main(): “””主函数””” with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print(‘%d is prime: %s’ % (number, prime))

if name == ‘main‘: main()

  1. > 重点多线程和多进程的比较
  2. >
  3. > 以下情况需要使用多线程:
  4. >
  5. > 1. 程序需要维护许多共享的状态(尤其是可变状态),Python中的列表、字典、集合都是线程安全的,所以使用线程而不是进程维护共享状态的代价相对较小。
  6. > 2. 程序会花费大量时间在I/O操作上,没有太多并行计算的需求且不需占用太多的内存。
  7. >
  8. > 以下情况需要使用多进程:
  9. >
  10. > 1. 程序执行计算密集型任务(如:字节码操作、数据处理、科学计算)。
  11. > 2. 程序的输入可以并行的分成块,并且可以将运算结果合并。
  12. > 3. 程序在内存使用方面没有任何限制且不强依赖于I/O操作(如:读写文件、套接字等)。
  13. - 异步处理:从调度程序的任务队列中挑选任务,该调度程序以交叉的形式执行这些任务,我们并不能保证任务将以某种顺序去执行,因为执行顺序取决于队列中的一项任务是否愿意将CPU处理时间让位给另一项任务。异步任务通常通过多任务协作处理的方式来实现,由于执行时间和顺序的不确定,因此需要通过回调式编程或者future对象来获取任务执行的结果。Python 3通过asyncio模块和awaitasync关键字(在Python 3.7中正式被列为关键字)来支持异步处理。
  14. ```Python
  15. “””
  16. 异步I/O - async / await
  17. “””
  18. import asyncio
  19. def numgenerator(m, n):
  20. “””指定范围的数字生成器”””
  21. yield from range(m, n + 1)
  22. async def primefilter(m, n):
  23. “””素数过滤器”””
  24. primes = []
  25. for i in numgenerator(m, n):
  26. flag = True
  27. for j in range(2, int(i * 0.5 + 1)):
  28. if i % j == 0:
  29. flag = False
  30. break
  31. if flag:
  32. print(‘Prime =>’, i)
  33. primes.append(i)
  34. await asyncio.sleep(0.001)
  35. return tuple(primes)
  36. async def square_mapper(m, n):
  37. “””平方映射器”””
  38. squares = []
  39. for i in num_generator(m, n):
  40. print(‘Square =>’, i i)
  41. squares.append(i * i)
  42. await asyncio.sleep(0.001)
  43. return squares
  44. def main():
  45. “””主函数”””
  46. loop = asyncio.getevent_loop()
  47. future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
  48. future.add_done_callback(lambda x: print(x.result()))
  49. loop.run_until_complete(future)
  50. loop.close()
  51. if __name == ‘__main‘:
  52. main()

说明:上面的代码使用get_event_loop函数获得系统默认的事件循环,通过gather函数可以获得一个future对象,future对象的add_done_callback可以添加执行完成时的回调函数,loop对象的run_until_complete方法可以等待通过future对象获得协程执行结果。

Python中有一个名为aiohttp的三方库,它提供了异步的HTTP客户端和服务器,这个三方库可以跟asyncio模块一起工作,并提供了对Future对象的支持。Python 3.6中引入了asyncawait来定义异步执行的函数以及创建异步上下文,在Python 3.7中它们正式成为了关键字。下面的代码异步的从5个URL中获取页面并通过正则表达式的命名捕获组提取了网站的标题。

  1. import asyncio
  2. import re
  3. import aiohttp
  4. PATTERN = re.compile(r‘\<title\>(?P<title>.*)\<\/title\>’)
  5. async def fetchpage(session, url):
  6. async with session.get(url, ssl=False) as resp:
  7. return await resp.text()
  8. async def showtitle(url):
  9. async with aiohttp.ClientSession() as session:
  10. html = await fetchpage(session, url)
  11. print(PATTERN.search(html).group(‘title’))
  12. def main():
  13. urls = (https://www.python.org/,
  14. https://git-scm.com/,
  15. https://www.jd.com/,
  16. https://www.taobao.com/,
  17. https://www.douban.com/)
  18. loop = asyncio.getevent_loop()
  19. cos = [show_title(url) for url in urls]
  20. loop.run_until_complete(asyncio.wait(cos))
  21. loop.close()
  22. if __name == ‘__main:
  23. main()

重点异步I/O与多进程的比较

当程序不需要真正的并发性或并行性,而是更多的依赖于异步处理和回调时,asyncio就是一种很好的选择。如果程序中有大量的等待与休眠时,也应该考虑asyncio,它很适合编写没有实时数据处理需求的Web应用服务器。

Python还有很多用于处理并行任务的三方库,例如:joblibPyMP等。实际开发中,要提升系统的可扩展性和并发性通常有垂直扩展(增加单个节点的处理能力)和水平扩展(将单个节点变成多个节点)两种做法。可以通过消息队列来实现应用程序的解耦合,消息队列相当于是多线程同步队列的扩展版本,不同机器上的应用程序相当于就是线程,而共享的分布式消息队列就是原来程序中的Queue。消息队列(面向消息的中间件)的最流行和最标准化的实现是AMQP(高级消息队列协议),AMQP源于金融行业,提供了排队、路由、可靠传输、安全等功能,最著名的实现包括:Apache的ActiveMQ、RabbitMQ等。

要实现任务的异步化,可以使用名为Celery的三方库。Celery是Python编写的分布式任务队列,它使用分布式消息进行工作,可以基于RabbitMQ或Redis来作为后端的消息代理。