python 的并发编程能力已经显着发展,为开发人员提供了编写高效、并行代码的强大工具。我花了相当多的时间探索这些先进技术,很高兴与您分享我的见解。
使用 asyncio 进行异步编程是 i/o 密集型任务的游戏规则改变者。它允许我们编写非阻塞代码,可以同时处理多个操作,而无需线程开销。下面是一个简单的示例,说明如何使用 asyncio 同时从多个 url 获取数据:
import asyncioimport aiohttpasync def fetch_url(session, url): async with session.get(url) as response: return await response.text()async def main(): urls = ['', '', ''] async with aiohttp.clientsession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for url, result in zip(urls, results): print(f"content length of {url}: {len(result)}")
这段代码演示了我们如何创建多个协程来同时从不同的 url 获取数据。 asyncio.gather() 函数允许我们等待所有协程完成并收集它们的结果。
虽然 asyncio 非常适合 i/o 密集型任务,但它不适合 cpu 密集型操作。为此,我们转向concurrent.futures模块,它提供了threadpoolexecutor和processpoolexecutor。 threadpoolexecutor 非常适合不释放 gil 的 i/o 密集型任务,而 processpoolexecutor 非常适合 cpu 密集型任务。
下面是使用 threadpoolexecutor 并发下载多个文件的示例:
import concurrent.futuresimport requestsdef download_file(url): response = requests.get(url) filename = url.split('/')[-1] with open(filename, 'wb') as f: f.write(response.content) return f"downloaded {filename}"urls = [ '', '', '']with concurrent.futures.threadpoolexecutor(max_workers=3) as executor: future_to_url = {executor.submit(download_file, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except exception as exc: print(f"{url} generated an exception: {exc}") else: print(data)
此代码创建一个包含三个工作线程的线程池,并为每个 url 提交一个下载任务。 as_completed() 函数允许我们在结果可用时对其进行处理,而不是等待所有任务完成。
对于 cpu 密集型任务,我们可以使用 processpoolexecutor 来利用多个 cpu 核心。这是并行计算素数的示例:
import concurrent.futuresimport mathdef is_prime(n): if n < 2: return false for i in range(2, int(math.sqrt(n)) + 1): if n % i == 0: return false return truedef find_primes(start, end): return [n for n in range(start, end) if is_prime(n)]ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]with concurrent.futures.processpoolexecutor() as executor: results = r: find_primes(*r), ranges)all_primes = [prime for sublist in results for prime in sublist]print(f"found {len(all_primes)} prime numbers")
此代码将查找素数的任务分为四个范围,并使用单独的 python 进程并行处理它们。 map() 函数将 find_primes() 函数应用于每个范围并收集结果。
from multiprocessing import process, arrayimport numpy as npdef worker(shared_array, start, end): for i in range(start, end): shared_array[i] = i * iif __name__ == '__main__': size = 10000000 shared_array = array('d', size) # create 4 processes processes = [] chunk_size = size // 4 for i in range(4): start = i * chunk_size end = start + chunk_size if i < 3 else size p = process(target=worker, args=(shared_array, start, end)) processes.append(p) p.start() # wait for all processes to finish for p in processes: p.join() # convert shared array to numpy array for easy manipulation np_array = np.frombuffer(shared_array.get_obj()) print(f"sum of squares: {np_array.sum()}")
from threading import lock, threadclass counter: def __init__(self): self.count = 0 self.lock = lock() def increment(self): with self.lock: self.count += 1def worker(counter, num_increments): for _ in range(num_increments): counter.increment()counter = counter()threads = []for _ in range(10): t = thread(target=worker, args=(counter, 100000)) threads.append(t) t.start()for t in threads: t.join()print(f"final count: {counter.count}")
import asyncioimport aiohttpfrom asyncio import semaphoreasync def fetch_url(url, semaphore): async with semaphore: async with aiohttp.clientsession() as session: async with session.get(url) as response: return await response.text()async def main(): urls = [f'{i}' for i in range(100)] semaphore = semaphore(10) # limit to 10 concurrent connections tasks = [fetch_url(url, semaphore) for url in urls] results = await asyncio.gather(*tasks) print(f"fetched {len(results)} urls")
此代码使用信号量将并发网络连接数限制为 10,防止网络或服务器不堪重负。
使用并发代码时,正确处理异常也很重要。 asyncio 模块为 asyncio.gather() 函数提供了一个 return_exceptions 参数,该参数对此很有用:
import asyncioasync def risky_operation(i): if i % 2 == 0: raise valueerror(f"even number not allowed: {i}") await asyncio.sleep(1) return iasync def main(): tasks = [risky_operation(i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=true) for result in results: if isinstance(result, exception): print(f"got an exception: {result}") else: print(f"got a result: {result}")
import asyncioasync def fetch_data(url): print(f"fetching data from {url}") await asyncio.sleep(2) # simulate network delay return f"data from {url}"async def process_data(data): print(f"processing {data}") await asyncio.sleep(1) # simulate processing time return f"processed {data}"async def save_result(result): print(f"saving {result}") await asyncio.sleep(0.5) # simulate saving delay return f"saved {result}"async def fetch_process_save(url): data = await fetch_data(url) processed = await process_data(data) return await save_result(processed)async def main(): urls = ['', '', ''] tasks = [fetch_process_save(url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result)
此代码链接了三个协程(fetch_data、process_data 和 save_result),为每个 url 创建一个管道。然后 asyncio.gather() 函数同时运行这些管道。
import asyncioasync def long_running_task(n): print(f"Starting long task {n}") try: await asyncio.sleep(10) print(f"Task {n} completed") return n except asyncio.CancelledError: print(f"Task {n} was cancelled") raiseasync def main(): tasks = [long_running_task(i) for i in range(5)] try: results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=5) except asyncio.TimeoutError: print("Operation timed out, cancelling remaining tasks") for task in tasks: task.cancel() # Wait for all tasks to finish (they'll raise CancelledError) await asyncio.gather(*tasks, return_exceptions=True) else: print(f"All tasks completed successfully: {results}")
此代码启动五个长时间运行的任务,但设置所有任务的超时时间为 5 秒才能完成。如果达到超时,则会取消所有剩余任务。
总之,python 的并发编程功能为编写高效的并行代码提供了广泛的工具和技术。从使用 asyncio 的异步编程到 cpu 密集型任务的多处理,这些先进技术可以显着提高应用程序的性能。然而,了解基本概念、为每项任务选择正确的工具以及仔细管理共享资源和潜在的竞争条件至关重要。通过实践和精心设计,我们可以利用 python 中并发编程的全部功能来构建快速、可扩展且响应迅速的应用程序。
