写 Python 写了几年,你或许曾遇到过这样的困惑:明明给数据处理加了多线程,CPU 占用率飙上去了,速度却几乎没变。这不是你的代码有 bug,而是 GIL 在起作用。

本文从原理出发,用实验数据说话,帮你彻底搞清楚 GIL 的边界——以及在机器学习、深度学习、量化金融场景下如何系统性地绕开它。

实验环境:Apple M3 Pro,Python 3.11.13,numpy 2.2.3,pandas 2.3.3,torch 2.11.0

实验代码:https://github.com/Coldeye2020/gil_experiments


一、GIL 是什么,为什么存在?

一句话定义:GIL(Global Interpreter Lock,全局解释器锁)是 CPython 中的一把互斥锁,保证同一时刻只有一个线程在执行 Python 字节码。

从引用计数说起

CPython 用引用计数(reference counting)管理内存。每个 Python 对象都维护一个 ob_refcnt 字段,记录当前有多少个引用指向它;当计数归零时,对象被立即释放。

这个机制简洁高效,但存在一个问题:如果多个线程同时对同一个对象的引用计数做 +1-1,就会产生竞态条件(race condition)——两个线程同时读到旧值 n,各自写回 n+1,结果只加了一次而不是两次。这会导致内存泄漏,甚至在错误时机释放仍在使用的对象,引发崩溃。

理论上可以给每个对象的引用计数加一把细粒度锁,但这意味着几乎每次对象访问都要加锁解锁,开销极大。CPython 的设计者 Guido van Rossum 在 1990 年代初选择了一个更简单的方案:加一把大锁,锁住整个解释器。这把锁就是 GIL。

用代码验证引用计数

import sys

a = []
print(sys.getrefcount(a))   # 输出 2:a 本身持有 1 个引用,getrefcount 调用时临时持有 1 个

b = a
print(sys.getrefcount(a))   # 输出 3:b 也引用了同一个列表

del b
print(sys.getrefcount(a))   # 输出 2:b 被删除,引用数减 1

sys.getrefcount() 让我们直接观察引用计数的变化。正是这个机制的存在,让 GIL 成为 CPython 的"必要之恶"。

:PyPy、Jython、GraalPy 等其他 Python 实现不使用引用计数,因此没有 GIL。本文所讨论的 GIL 均指 CPython 实现。


二、用实验证明 GIL 真的在拖后腿

光说不练没说服力。我们用两个实验来直接量化 GIL 的影响。

实验1:CPU 密集型任务——多线程 vs 单线程

任务:用纯 Python 循环暴力求 [2, 50000) 内的质数个数,重复 4 次(模拟 4 个独立任务)。

import threading
import time


def count_primes(n: int) -> int:
    """暴力判断 [2, n) 内有多少个质数,纯 Python 循环,受 GIL 严格限制。"""
    count = 0
    for num in range(2, n):
        is_prime = True
        for divisor in range(2, int(num**0.5) + 1):
            if num % divisor == 0:
                is_prime = False
                break
        if is_prime:
            count += 1
    return count


N = 50_000
REPEAT = 4

# 单线程:顺序执行 4 次
t0 = time.perf_counter()
for _ in range(REPEAT):
    count_primes(N)
single_time = time.perf_counter() - t0

# 多线程:4 个线程同时执行
threads = [threading.Thread(target=count_primes, args=(N,)) for _ in range(REPEAT)]
t0 = time.perf_counter()
for t in threads:
    t.start()
for t in threads:
    t.join()
multi_time = time.perf_counter() - t0

print(f"单线程耗时:{single_time:.2f}s")
print(f"多线程耗时:{multi_time:.2f}s(4 个线程)")
print(f"加速比:{single_time / multi_time:.2f}x(理想应为 4x)")

实验结果

耗时 加速比
单线程(4次串行) 0.09s 1x(基准)
多线程(4线程并发) 0.08s 1.01x

实验结论:4 个线程同时计算质数,耗时与单线程几乎相同,加速比仅 1.01x 而非理想的 4x。GIL 每次只允许一个线程执行字节码,4 个线程实质上是串行轮流执行,额外付出了线程切换开销。

实验2:IO 密集型任务——多线程 vs 单线程

任务:向 httpbin.org/delay/1 发送 HTTP 请求(服务端固定等待 1 秒后返回),重复 4 次。

import urllib.request
import threading
import time


def fetch_url(url: str):
    """发起一次 HTTP GET 请求,等待返回。IO 等待期间 GIL 会被释放。"""
    with urllib.request.urlopen(url, timeout=10) as resp:
        resp.read()


URL = "https://httpbin.org/delay/1"
REPEAT = 4

# 单线程:顺序发 4 次请求,约 4 秒
t0 = time.perf_counter()
for _ in range(REPEAT):
    fetch_url(URL)
single_time = time.perf_counter() - t0

# 多线程:4 个线程并发发请求,约 1 秒
threads = [threading.Thread(target=fetch_url, args=(URL,)) for _ in range(REPEAT)]
t0 = time.perf_counter()
for t in threads:
    t.start()
for t in threads:
    t.join()
multi_time = time.perf_counter() - t0

print(f"单线程耗时:{single_time:.2f}s")
print(f"多线程耗时:{multi_time:.2f}s(4 个线程)")
print(f"加速比:{single_time / multi_time:.2f}x")

实验结果

耗时 加速比
单线程(4次串行) 11.76s 1x(基准)
多线程(4线程并发) 2.66s 4.41x

实验结论:IO 等待期间,操作系统内核接管了网络通信,Python 线程无需执行字节码,CPython 会主动释放 GIL(通过底层 select/epoll 系统调用)。因此 4 个线程可以真正并发等待,总时间约等于单次请求时间,加速比超过了理想的 4x(网络抖动导致单线程耗时略有偏差)。

小结

任务类型 多线程效果 原因
CPU 密集型(纯 Python 循环) 几乎无加速,甚至变慢 GIL 阻止多线程并行执行字节码
IO 密集型(网络/文件读写) 接近线性加速 IO 等待时 GIL 主动释放,其他线程可运行

三、GIL 影响哪些代码?

这是最容易被误解的部分。GIL 并不是说"多线程全部没用",而是有明确的边界。

受 GIL 影响的代码

以下操作每一步都在执行 Python 字节码,GIL 全程持有:

  • 纯 Python 循环for x in list: ...while 循环中的数值计算
  • pandas 字符串操作df['col'].str.replace().str.split() 等(底层走 Python 对象,非 C 扩展)
  • np.vectorize:虽然名字叫向量化,但本质是对 Python 函数逐元素调用,GIL 全程持有
  • object dtype 的 NumPy 数组:存储 Python 对象引用,每次访问都要操作引用计数
import numpy as np

# 陷阱:object dtype 数组不是真正的向量化
arr = np.array([1, "hello", 3.14], dtype=object)  # object dtype
result = np.vectorize(lambda x: str(x))(arr)       # 逐元素调用 Python 函数,受 GIL 限制

不受 GIL 影响的代码

以下操作在 C/C++ 层执行,进入时主动调用 Py_BEGIN_ALLOW_THREADS 释放 GIL:

  • NumPy 内置运算np.sum()np.dot()、矩阵运算等(底层 BLAS/LAPACK)
  • PyTorch / TensorFlow 张量运算:所有算子均在 C++/CUDA 层执行
  • 文件/网络 IOopen()socketurllibrequests
  • pandasrollinggroupby 等聚合操作:底层由 Cython 实现

关键原理:NumPy 等库在进入 C 层时调用的这两个宏:

// NumPy 源码中典型的 GIL 释放模式
Py_BEGIN_ALLOW_THREADS   // 释放 GIL,其他 Python 线程可以运行
/* ... 纯 C 计算,不操作任何 Python 对象 ... */
Py_END_ALLOW_THREADS     // 重新获取 GIL

实验3:受影响 vs 不受影响的直观对比

任务:对 500 万个元素的数组求和,分别用纯 Python 循环和 np.sum(),各用 4 个线程并发执行。

import threading
import time
import numpy as np

N = 5_000_000
THREADS = 4


def python_sum(arr: list) -> float:
    """用纯 Python 循环累加,每一步都在执行 Python 字节码,GIL 全程持有。"""
    total = 0.0
    for x in arr:
        total += x
    return total


def numpy_sum(arr: np.ndarray) -> float:
    """调用 np.sum(),计算在 C 层执行,进入时释放 GIL,多线程可并行。"""
    return np.sum(arr)


def run_single(func, arg):
    t0 = time.perf_counter()
    for _ in range(THREADS):
        func(arg)
    return time.perf_counter() - t0


def run_multi(func, arg):
    threads = [threading.Thread(target=func, args=(arg,)) for _ in range(THREADS)]
    t0 = time.perf_counter()
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return time.perf_counter() - t0


py_data = list(range(N))
np_data = np.arange(N, dtype=np.float64)

py_single = run_single(python_sum, py_data)
py_multi  = run_multi(python_sum, py_data)

np_single = run_single(numpy_sum, np_data)
np_multi  = run_multi(numpy_sum, np_data)

实验结果(数组大小 500 万,线程数 4):

方案 单线程耗时 多线程耗时 加速比
纯 Python 循环 0.29s 0.27s 1.07x
NumPy np.sum() 0.01s <0.01s 4.94x

实验结论:纯 Python 循环的多线程加速比仅 1.07x(GIL 使并发退化为串行),NumPy 向量化的多线程加速比达 4.94x(超过核数,源于硬件层的 SIMD 并行)。两者单线程性能差距就高达 29 倍(0.29s vs 0.01s)。这印证了"向量化不只是更快的循环,它让计算逃离了 GIL 的管辖范围"。


四、避免 GIL 影响的策略

4.1 通用场景

策略A:向量化(NumPy / Pandas)

原理:将 Python 循环替换为 NumPy/Pandas 内置运算。这些运算在 C 层执行时调用 Py_BEGIN_ALLOW_THREADS 释放 GIL,因此:①单线程下就比 Python 循环快几十倍(SIMD/BLAS 优化);②多线程环境中能真正并行。

最常见的陷阱是把 Python 函数传给 np.vectorize——这不是向量化,本质是个 for 循环的语法糖:

import numpy as np

# ❌ 陷阱:np.vectorize 是 for 循环的语法糖,受 GIL 限制
result = np.vectorize(lambda x: x ** 2)(arr)

# ✅ 正确:直接用 NumPy 广播,在 C 层执行
result = arr ** 2
import threading
import time
import numpy as np

N = 5_000_000
THREADS = 4


def python_sum(arr: list) -> float:
    total = 0.0
    for x in arr:
        total += x
    return total


def numpy_sum(arr: np.ndarray) -> float:
    return np.sum(arr)


def run_single(func, arg):
    t0 = time.perf_counter()
    for _ in range(THREADS):
        func(arg)
    return time.perf_counter() - t0


def run_multi(func, arg):
    threads = [threading.Thread(target=func, args=(arg,)) for _ in range(THREADS)]
    t0 = time.perf_counter()
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return time.perf_counter() - t0


if __name__ == "__main__":
    py_data = list(range(N))
    np_data = np.arange(N, dtype=np.float64)

    py_single = run_single(python_sum, py_data)
    py_multi  = run_multi(python_sum, py_data)
    print(f"Python 循环 - 单线程:{py_single:.2f}s,多线程:{py_multi:.2f}s,加速比:{py_single/py_multi:.2f}x")

    np_single = run_single(numpy_sum, np_data)
    np_multi  = run_multi(numpy_sum, np_data)
    print(f"NumPy sum  - 单线程:{np_single:.2f}s,多线程:{np_multi:.2f}s,加速比:{np_single/np_multi:.2f}x")

实验结论:见上方实验3结果。NumPy 向量化在多线程下实现 4.94x 加速比,而 Python 循环仅 1.07x。


策略B:multiprocessing(每进程独立 GIL)

原理multiprocessing 不是绕开 GIL,而是彻底逃离它。每个子进程有独立的 Python 解释器实例,因而有独立的 GIL、独立的内存空间。多个进程可以在多个 CPU 核上真正并行地执行 Python 字节码。

代价:进程启动开销(fork + import)约 100ms~几百ms;进程间传递数据需要 pickle 序列化,大对象有额外成本。

import threading
import multiprocessing
import time


def max_collatz_steps(limit: int) -> int:
    """计算 [1, limit) 中 Collatz 序列最长的步数。纯 Python 数值运算。"""
    def steps(n):
        count = 0
        while n != 1:
            n = n // 2 if n % 2 == 0 else 3 * n + 1
            count += 1
        return count
    return max(steps(n) for n in range(1, limit))


LIMIT = 200_000
WORKERS = 4


def run_single():
    t0 = time.perf_counter()
    for _ in range(WORKERS):
        max_collatz_steps(LIMIT)
    return time.perf_counter() - t0


def run_threads():
    threads = [threading.Thread(target=max_collatz_steps, args=(LIMIT,))
               for _ in range(WORKERS)]
    t0 = time.perf_counter()
    for t in threads: t.start()
    for t in threads: t.join()
    return time.perf_counter() - t0


def run_processes():
    with multiprocessing.Pool(processes=WORKERS) as pool:
        t0 = time.perf_counter()
        pool.starmap(max_collatz_steps, [(LIMIT,)] * WORKERS)
        elapsed = time.perf_counter() - t0
    return elapsed


if __name__ == "__main__":
    t_single  = run_single()
    t_thread  = run_threads()
    t_process = run_processes()

    print(f"单线程:{t_single:.2f}s")
    print(f"多线程:{t_thread:.2f}s,加速比:{t_single/t_thread:.2f}x")
    print(f"多进程:{t_process:.2f}s,加速比:{t_single/t_process:.2f}x")

实验结果(Collatz 序列,limit=200,000,重复 4 次):

方案 耗时 加速比
单线程 3.03s 1x(基准)
多线程(4线程) 2.93s 1.03x
多进程(4进程) 1.02s 2.97x

实验结论:多线程加速比仅 1.03x,多进程实现了 2.97x 接近线性的加速。与理想 4x 的差距来自进程启动和 IPC 开销,在任务规模足够大时这部分开销可以忽略不计。

使用建议:任务粒度 > 几百毫秒时用多进程;更小的任务优先用向量化;不要盲目用 multiprocessing 处理每个小任务,进程启动开销会淹没收益。


策略C:asyncio(IO 密集型最优解)

原理asyncio协作式单线程并发。所有协程运行在同一个线程,轮流通过 await 把控制权交还给事件循环。没有多线程的 GIL 竞争,也没有多进程的启动开销。

它不能加速 CPU 密集型任务(仍是单线程),但对于 IO 密集型任务(大量网络请求、数据库查询、文件读写)是最轻量的解决方案:

import asyncio
import aiohttp
import time


async def fetch(session: aiohttp.ClientSession, url: str) -> int:
    """协程:发起 HTTP 请求,await 时主动释放控制权给事件循环。"""
    async with session.get(url) as resp:
        content = await resp.read()
        return len(content)


async def main():
    URL = "https://httpbin.org/delay/1"
    N   = 10   # 并发请求数

    async with aiohttp.ClientSession() as session:
        t0 = time.perf_counter()
        tasks = [fetch(session, URL) for _ in range(N)]
        results = await asyncio.gather(*tasks)   # 并发等待所有请求
        elapsed = time.perf_counter() - t0

    print(f"并发 {N} 个请求(每个约 1 秒):耗时 {elapsed:.2f}s")
    print(f"加速比:{N / elapsed:.1f}x(理想为 {N}x)")


if __name__ == "__main__":
    asyncio.run(main())

对比三种 IO 并发方案:

方案 适用场景 GIL 问题 开销
threading IO 密集 IO 等待时自动释放 线程切换开销,有竞态风险
asyncio IO 密集 单线程,无 GIL 问题 极低,需要 async/await 语法
multiprocessing CPU 密集 独立 GIL 进程启动 + 序列化开销

结论:IO 密集型任务首选 asyncio,代码更简洁,资源消耗更低;CPU 密集型任务用 multiprocessing


4.2 深度学习场景

策略D:DataLoader num_workers

训练深度学习模型时,数据预处理(图像解码、增强、归一化)通常运行在 CPU 上,而 GPU 做前向/反向传播。如果数据加载跟不上 GPU 计算,训练会被卡住等数据——这就是"数据瓶颈"。

原理DataLoadernum_workers 参数启动的是子进程(不是线程),每个 worker 进程独立运行,有独立的 GIL,可在多核上并行执行预处理。

⚠️ 实验设计的关键:数据必须在磁盘上

如果数据集全在内存(如 numpy array),num_workers > 0 反而会更慢。 原因是子进程无法直接访问主进程的内存,数据必须通过 IPC(进程间通信)序列化传递, 这个开销在数据已在内存的情况下完全是额外负担。

真实场景中图像从磁盘读取,此时磁盘 IO + CPU 预处理才是瓶颈, 多进程并行预取才能让 GPU 不空等数据。

import os, time, shutil, tempfile
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader


def prepare_dataset_on_disk(size, image_hw, data_dir):
    """将随机图像保存为独立 .npy 文件,模拟真实磁盘数据集。"""
    os.makedirs(data_dir, exist_ok=True)
    for i in range(size):
        img = np.random.randint(0, 256, (3, image_hw, image_hw), dtype=np.uint8)
        np.save(os.path.join(data_dir, f"{i:05d}.npy"), img)


class DiskImageDataset(Dataset):
    """
    每次 __getitem__ 从磁盘读取一个 .npy 文件,再做 CPU 预处理。
    这才是 num_workers 有效的前提:
      - 磁盘读取是 IO 操作(GIL 在系统调用时会释放)
      - numpy 解码 + 数据增强是 CPU 计算(多进程可并行)
    """
    def __init__(self, data_dir):
        self.files = sorted([
            os.path.join(data_dir, f)
            for f in os.listdir(data_dir) if f.endswith(".npy")
        ])

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img = np.load(self.files[idx]).astype(np.float32)   # 磁盘读取
        if np.random.rand() > 0.5:
            img = img[:, :, ::-1].copy()
        img = img / 255.0
        mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1)
        std  = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1)
        img = (img - mean) / std
        img = np.clip(img * 1.1 + np.random.normal(0, 0.01, img.shape).astype(np.float32), -3, 3)
        return torch.from_numpy(img.copy()), idx % 10


def benchmark_dataloader(num_workers, dataset, batch_size=32):
    loader = DataLoader(
        dataset, batch_size=batch_size, num_workers=num_workers,
        shuffle=True, pin_memory=False,
        prefetch_factor=2 if num_workers > 0 else None,
        persistent_workers=True if num_workers > 0 else False,
    )
    loader_iter = iter(loader)
    next(loader_iter)   # 预热:让 worker 进程完成 fork + import
    t0 = time.perf_counter()
    for batch_imgs, _ in loader_iter:
        _ = batch_imgs.mean()
    return time.perf_counter() - t0


if __name__ == "__main__":
    data_dir = os.path.join(tempfile.gettempdir(), "gil_fake_images")
    try:
        prepare_dataset_on_disk(1000, 128, data_dir)
        dataset = DiskImageDataset(data_dir)
        for nw in [0, 1, 2, 4]:
            t = benchmark_dataloader(nw, dataset)
            print(f"num_workers={nw}{t:.2f}s")
    finally:
        shutil.rmtree(data_dir)

实验结果(1000 张 128×128 图像,从磁盘读取,batch_size=32):

num_workers 耗时 加速比(vs 0) 说明
0 1.38s 1x(基准) 主进程串行,受 GIL 限制
1 0.82s 1.68x 1 个 worker 子进程,独立 GIL
2 0.48s 2.87x 2 个 worker 子进程
4 0.41s 3.36x 4 个 worker 子进程

内存数据集(旧版)对比num_workers=0 约 6.83s,num_workers=4 约 27.29s — workers 越多越慢(IPC 开销 > 并行收益)。

实验结论num_workers 的提速效果取决于数据在哪里。数据在磁盘时,worker 子进程并行执行「读盘 → 解码 → 增强」,主进程做 forward 时已在预取下一批,GPU 不会等数据;数据在内存时,IPC 序列化开销反而拖慢速度。注意加速比从 num_workers=2 到 4 趋于平缓(从 2.87x→3.36x),磁盘 IO 成为新的瓶颈。经验法则:num_workers = min(4, CPU核数 // 2),配合 persistent_workers=True 避免每个 epoch 重建进程。


策略E:大规模特征提取向量化

训练完模型后,对大量样本提取 embedding(图像检索、语义相似度)是常见需求。典型的错误写法是逐样本预处理然后推理,正确做法是向量化预处理后批量推理。

向量化的核心思路:把对单个样本的操作改写成对整个矩阵的广播运算。

# 串行版本:N_SAMPLES 次 Python 循环
for x in raw_data:
    feat = x / norm(x)   # 每次操作一个向量

# 向量化版本:一次操作整个矩阵
feat = raw_data / norm(raw_data, axis=1, keepdims=True)

NumPy 的广播操作在 C 层执行,GIL 释放,无 Python 循环。对于 50,000 × 2048 的矩阵,这个差距非常显著。

import time, threading, concurrent.futures
import numpy as np
import torch
import torch.nn as nn


class Encoder(nn.Module):
    def __init__(self, input_dim=2048, hidden_dim=2048, embed_dim=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim),
        )

    def forward(self, x):
        return self.net(x)


INPUT_DIM  = 2048    # 模拟 ResNet 最后一层特征展平
EMBED_DIM  = 256
N_SAMPLES  = 50_000  # 足够大,才能体现向量化优势
BATCH_SIZE = 256
WORKERS    = 4


def preprocess_one(raw: np.ndarray) -> torch.Tensor:
    """对单个样本做预处理:L2 归一化 + 随机 dropout + clip。"""
    feat = raw.astype(np.float32)
    norm = np.linalg.norm(feat) + 1e-8
    feat = feat / norm
    mask = (np.random.rand(len(feat)) > 0.1).astype(np.float32)
    feat = feat * mask
    feat = np.clip(feat, -5.0, 5.0)
    return torch.from_numpy(feat)


def extract_serial(model, raw_data):
    """串行逐样本预处理:Python for 循环,GIL 全程持有。"""
    model.eval()
    embeddings = []
    t0 = time.perf_counter()
    with torch.no_grad():
        for i in range(0, N_SAMPLES, BATCH_SIZE):
            chunk = raw_data[i:i+BATCH_SIZE]
            batch = torch.stack([preprocess_one(x) for x in chunk])
            embeddings.append(model(batch))
    return time.perf_counter() - t0, torch.cat(embeddings)


def extract_vectorized(model, raw_data):
    """
    NumPy 向量化批量预处理:无任何 Python 循环。
    将 preprocess_one 的所有操作改写为对整个矩阵的广播运算。
    """
    model.eval()
    t0 = time.perf_counter()

    feat = raw_data.astype(np.float32)

    # L2 归一化(axis=1 广播,操作整个矩阵)
    norms = np.linalg.norm(feat, axis=1, keepdims=True) + 1e-8
    feat = feat / norms

    # 向量化 feature dropout(一次生成整个 mask 矩阵)
    mask = (np.random.rand(N_SAMPLES, INPUT_DIM) > 0.1).astype(np.float32)
    feat = feat * mask

    feat = np.clip(feat, -5.0, 5.0)

    all_tensors = torch.from_numpy(feat)
    embeddings = []
    with torch.no_grad():
        for i in range(0, N_SAMPLES, BATCH_SIZE):
            embeddings.append(model(all_tensors[i:i+BATCH_SIZE]))

    return time.perf_counter() - t0, torch.cat(embeddings)

实验结果(50,000 个样本,dim=2048,3层 MLP hidden=2048,提取 256d embedding):

策略 耗时 加速比
串行逐样本预处理(基准) 46.58s 1x
多线程预处理 19.79s 2.35x(GIL 限制 Python 循环,提升有限)
多进程预处理 8.79s 5.30x(绕开 GIL,但 IPC 传数组有开销)
NumPy 向量化批量 3.28s 14.19x

实验结论:向量化是这四种方案中唯一从根本上消除 Python 循环的方法——不是"更快的 Python 循环",而是将 50,000 次 Python 调用压缩为一次矩阵广播,由 C/NumPy 内核执行。

加速比拆解:

  • 预处理:Python 循环 50,000 次 → NumPy 矩阵广播 1 次,差距在 10~100x 量级
  • 推理:两者都是 batch PyTorch forward,速度相同
  • 多线程 2.35x:GIL 让线程无法真正并行,仅靠 NumPy 内部少量 GIL 释放获得微小收益
  • 多进程 5.30x:真正绕开 GIL,但 IPC 序列化 50K × 2048 float 数组有显著开销
  • 向量化 14.19x:零并发,零进程开销,纯靠消除 Python 循环

4.3 量化金融场景

策略G:Alpha 因子 joblib 并行

量化研究中,对数百支股票同时计算技术因子(动量、波动率、RSI 等)是最典型的 GIL 受害场景:每支股票的计算彼此独立,天然可并行,但如果用 Python 循环实现,多线程几乎没用。

import time
import threading
import numpy as np
from joblib import Parallel, delayed

N_STOCKS = 500
N_DAYS   = 252
WORKERS  = 4


def generate_price_data(n_stocks=N_STOCKS, n_days=N_DAYS) -> np.ndarray:
    returns = np.random.normal(0.0005, 0.02, (n_stocks, n_days))
    return 100 * np.exp(np.cumsum(returns, axis=1))


def compute_factors_for_one_stock(prices: np.ndarray) -> dict:
    """纯 Python 循环实现 5 个技术因子(动量、波动率、RSI 等)。"""
    n = len(prices)
    mom_20 = [prices[i] / prices[i-20] - 1 for i in range(20, n)]
    mom_60 = [prices[i] / prices[i-60] - 1 for i in range(60, n)]
    returns = [prices[i] / prices[i-1] - 1 for i in range(1, n)]
    vol_20 = []
    for i in range(20, len(returns)):
        w = returns[i-20:i]
        mean = sum(w) / 20
        vol_20.append((sum((x-mean)**2 for x in w) / 20) ** 0.5)
    return {
        "mom_20": np.mean(mom_20) if mom_20 else 0,
        "mom_60": np.mean(mom_60) if mom_60 else 0,
        "vol_20": np.mean(vol_20) if vol_20 else 0,
    }


def run_vectorized(prices: np.ndarray) -> float:
    """NumPy/Pandas 矩阵化计算所有股票的所有因子。"""
    import pandas as pd
    t0 = time.perf_counter()
    returns = prices[:, 1:] / prices[:, :-1] - 1
    mom_20 = (prices[:, 20:] / prices[:, :-20] - 1).mean(axis=1)
    mom_60 = (prices[:, 60:] / prices[:, :-60] - 1).mean(axis=1)
    df_returns = pd.DataFrame(returns.T)
    vol_20 = df_returns.rolling(20).std().mean().values
    return time.perf_counter() - t0


if __name__ == "__main__":
    prices = generate_price_data()

    t1 = time.perf_counter()
    [compute_factors_for_one_stock(prices[i]) for i in range(N_STOCKS)]
    t1 = time.perf_counter() - t1

    t2 = time.perf_counter()
    results = [None] * N_STOCKS
    def worker(i): results[i] = compute_factors_for_one_stock(prices[i])
    threads = [threading.Thread(target=worker, args=(i,)) for i in range(N_STOCKS)]
    for t in threads: t.start()
    for t in threads: t.join()
    t2 = time.perf_counter() - t2

    t3 = time.perf_counter()
    Parallel(n_jobs=WORKERS)(delayed(compute_factors_for_one_stock)(prices[i]) for i in range(N_STOCKS))
    t3 = time.perf_counter() - t3

    t4 = run_vectorized(prices)
    print(f"串行:{t1:.2f}s | 多线程:{t2:.2f}s | joblib:{t3:.2f}s | 向量化:{t4:.2f}s")

实验结果(500 支股票,252 天历史数据,5 个技术因子):

策略 耗时 加速比
串行(基准) 0.72s 1x
多线程(500 线程) 0.72s 1.00x
joblib 多进程(4 workers) 0.39s 1.86x
NumPy 向量化 0.03s 28.28x

实验结论:多线程 1.00x——GIL 完全抵消了并发收益。joblib 多进程实现 1.86x 加速(500 个任务分给 4 个进程,有调度开销)。NumPy 向量化以 28x 的优势碾压其他方案——将 500 支股票的逐股计算改写成 500×252 矩阵运算,同时获得了"消除 Python 循环"和"GIL 释放"两重收益。这是量化研究中最值得投入时间的重写方向。

joblib 使用技巧n_jobs=-1 使用所有 CPU 核;backend="loky" 是默认的进程后端;当任务非常多但单个任务很小时,可设置 batch_size='auto' 减少调度开销。


策略H:蒙特卡洛向量化定价

期权定价(Black-Scholes 模型下的蒙特卡洛方法)需要模拟大量股价路径,每条路径独立,是天然的并行场景。

import time
import math
import concurrent.futures
import numpy as np

S0, K, T, r, sigma = 100.0, 105.0, 1.0, 0.05, 0.2
N, M, WORKERS = 252, 100_000, 4


def simulate_one_path_python(seed: int) -> float:
    rng = np.random.default_rng(seed)
    dt = T / N
    S = S0
    for _ in range(N):
        z = rng.standard_normal()
        S *= math.exp((r - 0.5 * sigma**2) * dt + sigma * math.sqrt(dt) * z)
    return S


def run_serial():
    t0 = time.perf_counter()
    payoffs = [max(simulate_one_path_python(i) - K, 0) for i in range(M)]
    price = math.exp(-r * T) * sum(payoffs) / M
    return time.perf_counter() - t0, price


def run_vectorized():
    """精确终价公式:S_T = S0 * exp((r - 0.5σ²)T + σ√T·Z),Z~N(0,1)"""
    t0 = time.perf_counter()
    Z = np.random.standard_normal(M)
    S_T = S0 * np.exp((r - 0.5 * sigma**2) * T + sigma * math.sqrt(T) * Z)
    payoffs = np.maximum(S_T - K, 0.0)
    price = math.exp(-r * T) * payoffs.mean()
    return time.perf_counter() - t0, price

实验结果(欧式看涨期权,10 万条路径,S0=100, K=105, T=1y, σ=0.2):

BS 解析价格:8.0214

策略 耗时 期权价格 加速比
串行(Python 循环) 7.03s 8.0756 1x
多进程(4 workers) 1.88s 8.0756 3.74x
NumPy 向量化(精确终价) <0.01s 7.9821 ~3430x
NumPy 向量化(逐步路径 M×N) 0.32s 7.9756 22.30x

实验结论:四种方法的期权价格都接近 BS 解析解 8.0214(验证了正确性)。精确终价公式(方式A)以 ~3430x 的加速比彻底击败其他方案:利用 GBM 的解析解 $S_T = S_0 \cdot e^{(r - \frac{\sigma^2}{2})T + \sigma\sqrt{T}Z}$,一次生成 10 万个终价,无离散化误差,也无进程开销。对于欧式期权,这是生产环境的唯一正确选择;路径依赖期权(亚式、障碍期权)才需要逐步路径模拟。


策略I:向量化回测

回测是量化研究的核心。事件驱动回测(逐根 K 线循环)是最"自然"的写法,也是 GIL 的重灾区;向量化回测(Pandas rolling + NumPy 信号向量)则从根本上消除了 Python 循环。这个案例还揭示了一个有趣的反直觉结论:向量化让单次回测快到 ~0.001s,以至于多线程反而成为负担。

import time, threading, concurrent.futures
import numpy as np
import pandas as pd

N_DAYS   = 5000
N_STOCKS = 100
THREADS  = 4


def event_driven_backtest(df: pd.DataFrame) -> dict:
    """逐根 K 线执行双均线策略,纯 Python 循环。"""
    close = df["close"].values
    n = len(close)
    cash, shares, nav = 10000.0, 0, []

    for i in range(20, n):
        ma5  = sum(close[i-5:i])  / 5    # Python sum(),每步都是字节码
        ma20 = sum(close[i-20:i]) / 20
        prev_ma5  = sum(close[i-6:i-1])  / 5
        prev_ma20 = sum(close[i-21:i-1]) / 20
        price = close[i]
        if ma5 > ma20 and prev_ma5 <= prev_ma20 and cash >= price:
            n_buy = int(cash / price)
            shares += n_buy; cash -= n_buy * price
        elif ma5 < ma20 and prev_ma5 >= prev_ma20 and shares > 0:
            cash += shares * price; shares = 0
        nav.append(cash + shares * price)

    return {"total_return": nav[-1] / 10000.0 - 1 if nav else 0}


def vectorized_backtest(df: pd.DataFrame) -> dict:
    """用 Pandas rolling 计算均线,NumPy 向量化生成信号。"""
    close = df["close"]
    ma5, ma20 = close.rolling(5).mean(), close.rolling(20).mean()
    cross_up   = (ma5 > ma20) & (ma5.shift(1) <= ma20.shift(1))
    cross_down = (ma5 < ma20) & (ma5.shift(1) >= ma20.shift(1))
    signal = pd.Series(np.nan, index=df.index)
    signal[cross_up] = 1; signal[cross_down] = 0
    signal = signal.ffill().fillna(0)
    strategy_return = (signal.shift(1) * close.pct_change()).fillna(0)
    total_return = (1 + strategy_return).prod() - 1
    return {"total_return": float(total_return)}


def run_all_serial(backtest_fn, all_data):
    t0 = time.perf_counter()
    results = [backtest_fn(df) for df in all_data]
    return time.perf_counter() - t0, results


def run_all_threaded(backtest_fn, all_data):
    """使用 ThreadPoolExecutor(THREADS 个 worker)并行回测所有股票。"""
    t0 = time.perf_counter()
    with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as executor:
        results = list(executor.map(backtest_fn, all_data))
    return time.perf_counter() - t0, results

实验结果(100 支股票,5000 根 K 线,双均线策略,4 workers):

策略 串行耗时 多线程耗时(4 workers) 多线程加速比
事件驱动(Python 循环) 1.70s 1.67s 1.02x
向量化(Pandas + NumPy) 0.08s 0.10s 0.82x

向量化 vs 事件驱动(串行):~20x 加速

实验结论

  • 事件驱动多线程加速比 ~1x:GIL 把并行收益完全抹平,100 支股票的 Python 循环无法真正并行
  • 向量化多线程 0.82x,反而更慢:每支股票的向量化回测只需 ~0.001s(0.08s ÷ 100),这比线程池的调度开销还小;此时并发反而是负担,串行就是最优解
  • 向量化串行(0.08s)比事件驱动多线程(1.67s)快 ~20x——单纯改写算法的收益远大于任何并发方案
  • 核心结论:向量化让每支股票的计算变得极其廉价,以至于多线程的收益空间消失了。并行加速的前提是单个任务足够重(通常 >10ms);当任务已经被向量化压缩到亚毫秒级,进一步并行得不偿失
  • 事件驱动的优势在于逻辑灵活(复杂仓位管理、滑点模型、条件单),不在性能

4.4 自定义算子加速

前面的策略有一个共同前提:计算可以被表达成向量化形式,或任务间彼此独立。但有一类运算天然打破这个前提——递推计算:每一步的结果依赖上一步,无法并行或广播。

最典型的例子是 EMA(指数加权移动平均),在量化金融中无处不在:

$$\text{EMA}[i] = \alpha \cdot x[i] + (1-\alpha) \cdot \text{EMA}[i-1]$$

这个递推无法向量化,但可以把它下沉到 C 层,并在 C 层主动释放 GIL。下面介绍三种实现路径,复杂度依次递增,性能相近。


策略J:Numba JIT(@jit nogil=True

原理:Numba 使用 LLVM 将带类型的 Python/NumPy 函数在首次调用时编译为机器码。加上 nogil=True 参数后,编译后的函数在执行期间会释放 GIL,多线程可真正并行。关键前提是必须同时设置 nopython=True——这保证函数内部完全不操作 Python 对象,才能安全地释放 GIL。

from numba import jit

# ❌ 不释放 GIL:单线程快,多线程仍串行
@jit(nopython=True, nogil=False)
def ema_numba(close, alpha): ...

# ✅ 释放 GIL:单线程一样快,多线程真正并行
@jit(nopython=True, nogil=True)
def ema_numba_nogil(close, alpha): ...
import threading
import time
import numpy as np
from numba import jit

N = 1_000_000
PERIOD = 20
THREADS = 4
alpha = 2.0 / (PERIOD + 1)


def ema_python(close: np.ndarray, alpha: float) -> np.ndarray:
    """纯 Python 递推,每步都是 Python 字节码,受 GIL 严格限制。"""
    result = np.empty(len(close))
    result[0] = close[0]
    for i in range(1, len(close)):
        result[i] = alpha * close[i] + (1 - alpha) * result[i - 1]
    return result


@jit(nopython=True, nogil=True)
def ema_numba_nogil(close: np.ndarray, alpha: float) -> np.ndarray:
    """
    nogil=True:函数执行期间释放 GIL。
    nopython=True 确保函数内部不操作任何 Python 对象,释放 GIL 才安全。
    多个线程可以真正并行地各自执行此函数。
    """
    result = np.empty(len(close))
    result[0] = close[0]
    for i in range(1, len(close)):
        result[i] = alpha * close[i] + (1 - alpha) * result[i - 1]
    return result


def run_single(func, data, repeat=THREADS):
    t0 = time.perf_counter()
    for _ in range(repeat):
        func(data, alpha)
    return time.perf_counter() - t0


def run_multi(func, data, n_threads=THREADS):
    threads = [threading.Thread(target=func, args=(data, alpha))
               for _ in range(n_threads)]
    t0 = time.perf_counter()
    for t in threads: t.start()
    for t in threads: t.join()
    return time.perf_counter() - t0


if __name__ == "__main__":
    close = np.random.rand(N).astype(np.float64) * 100 + 50

    # 预热:触发 JIT 编译(首次调用有延迟,不计入计时)
    ema_numba_nogil(close[:100], alpha)

    t_py_s = run_single(ema_python, close)
    t_nb_s = run_single(ema_numba_nogil, close)
    t_nb_m = run_multi(ema_numba_nogil, close)

    print(f"纯 Python 单线程:{t_py_s:.3f}s")
    print(f"Numba nogil 单线程:{t_nb_s:.3f}s(vs Python: {t_py_s/t_nb_s:.1f}x)")
    print(f"Numba nogil 多线程:{t_nb_m:.3f}s(多线程加速比: {t_nb_s*THREADS/t_nb_m:.2f}x)")

实验结果(EMA period=20,数组长度 100 万,重复 4 次):

方案 单线程耗时 vs Python 多线程耗时 多线程加速比
纯 Python 循环 0.702s 1x(基准) 0.681s 1.03x
Numba nogil=False 0.009s 76.5x 0.010s 0.96x
Numba nogil=True 0.009s 77.4x 0.003s 2.98x

实验结论:Numba JIT 单线程比 Python 快 ~77x(LLVM 机器码 vs 字节码解释)。nogil=False 多线程加速比仅 0.96x——编译后的代码虽快,GIL 依然把线程串行化;nogil=True 多线程加速比 2.98x(接近理论 4x,差距来自线程启动和内存带宽竞争)。注意:首次调用有 JIT 编译延迟(约 0.5~2s),适合长期运行的服务而非一次性脚本。


策略K:Cython(cdef + with nogil

原理:Cython 是 Python 的超集,通过添加 cdef 静态类型注解编译为 C 扩展(.so 文件)。with nogil: 块显式释放 GIL,编译器会做安全检查——如果块内有任何 Python 对象操作,编译时直接报错,而不是运行时崩溃。这是 pandas、scipy、scikit-learn 内部大量使用的方案,无运行时依赖。

# ema_cython.pyx
import numpy as np
cimport numpy as np


def fast_ema_nogil(np.ndarray[double, ndim=1] close, int period):
    """
    在 with nogil: 块中执行递推,显式释放 GIL。
    多线程场景下,多个线程可以同时执行此函数。

    注意:with nogil 块内不能有任何 Python 对象操作,
    否则 Cython 编译时会报错,这是一个编译期安全检查。
    """
    cdef double alpha = 2.0 / (period + 1)
    cdef double[:] result = np.empty(len(close), dtype=np.float64)
    cdef double[:] c = close   # 先拿到 memoryview,避免 nogil 块内访问 Python 对象
    cdef int i
    cdef int n = len(close)

    result[0] = c[0]

    # with nogil 块:释放 GIL,允许其他 Python 线程运行
    # 块内只能操作 C 变量和 memoryview,不能有任何 Python 对象
    with nogil:
        for i in range(1, n):
            result[i] = alpha * c[i] + (1 - alpha) * result[i - 1]

    return np.asarray(result)
# 编译:python setup_cython.py build_ext --inplace
from setuptools import setup
from Cython.Build import cythonize
import numpy as np

setup(
    ext_modules=cythonize(
        "ema_cython.pyx",
        compiler_directives={
            "language_level": "3",
            "boundscheck": False,   # 关闭越界检查,提升性能(生产慎用)
            "wraparound": False,    # 关闭负索引支持
        },
    ),
    include_dirs=[np.get_include()],
)

实验结果(相同任务):

方案 单线程耗时 vs Python 多线程耗时 多线程加速比
纯 Python 循环 0.679s 1x(基准) 0.698s 0.97x
Cython cdef(默认持有 GIL) 0.015s 46.8x 0.015s 0.94x
Cython with nogil 0.015s 45.5x 0.005s 3.22x

实验结论:Cython cdef 单线程比 Python 快 ~47x。默认 Cython 多线程加速比仅 0.94x——cdef 消除了 Python 对象,但 GIL 仍由调用方隐式持有;with nogil 显式释放后,多线程加速比 3.22xCython 的核心优势:编译产物是标准 .so 文件,无运行时依赖(无需安装 Numba),无首次编译延迟,是 pandas/scipy 的选择。代价是需要维护 .pyx 文件和编译流程。


策略L:pybind11 C++ 扩展(gil_scoped_release

原理:pybind11 是现代 C++/Python 绑定库。py::gil_scoped_release 采用 RAII 风格——构造时释放 GIL,析构时(离开作用域时)自动重新获取,无需手动管理。适合对接已有 C++ 代码库,或需要极致性能时直接操作 NumPy buffer。

#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>

namespace py = pybind11;

py::array_t<double> fast_ema_nogil(py::array_t<double> close, int period) {
    auto buf = close.request();
    double* ptr = static_cast<double*>(buf.ptr);
    int n = static_cast<int>(buf.shape[0]);
    double alpha = 2.0 / (period + 1);

    py::array_t<double> result(n);
    auto res_buf = result.request();
    double* res = static_cast<double*>(res_buf.ptr);

    // 在释放 GIL 之前,确保所有 Python 对象操作已完成
    // gil_scoped_release:RAII 风格,构造时释放 GIL,析构时重新获取
    {
        py::gil_scoped_release release;

        // GIL 已释放:这里只操作原始 C 指针,不涉及任何 Python 对象
        // 多个线程可以同时执行到这里,真正并行
        res[0] = ptr[0];
        for (int i = 1; i < n; ++i) {
            res[i] = alpha * ptr[i] + (1.0 - alpha) * res[i - 1];
        }

    }   // GIL 在这里自动重新获取

    return result;
}

PYBIND11_MODULE(ema_cpp, m) {
    m.def("fast_ema_nogil", &fast_ema_nogil,
          py::arg("close"), py::arg("period"),
          "EMA 递推(释放 GIL,多线程可并行)");
}
import threading, time
import numpy as np
import ema_cpp   # 编译自 ema_cpp.cpp

N = 1_000_000
PERIOD = 20
THREADS = 4


def run_multi(func, data, period, n_threads=THREADS):
    threads = [threading.Thread(target=func, args=(data, period))
               for _ in range(n_threads)]
    t0 = time.perf_counter()
    for t in threads: t.start()
    for t in threads: t.join()
    return time.perf_counter() - t0


close = np.random.rand(N).astype(np.float64) * 100 + 50

t_pb_m = run_multi(ema_cpp.fast_ema_nogil, close, PERIOD)
print(f"pybind11 nogil 多线程({THREADS} 线程):{t_pb_m:.3f}s")

实验结果(相同任务):

方案 单线程耗时 vs Python 多线程耗时 多线程加速比
纯 Python 循环 0.684s 1x(基准) 0.676s 1.01x
pybind11 C++(持有 GIL) 0.010s 69.1x 0.010s 0.99x
pybind11 gil_scoped_release 0.010s 70.6x 0.003s 3.11x

实验结论:pybind11 C++ 单线程比 Python 快 ~70x,与 Numba/Cython 处于同一量级。持有 GIL 版本多线程 0.99x——C++ 代码本身不操作 Python 对象,但 GIL 由调用框架持有,线程仍被串行化;gil_scoped_release 释放后多线程加速比 3.11xpybind11 的核心优势:可无缝对接已有 C++ 库(交易所 SDK、数值计算库),无需将现有逻辑重写为 Cython。代价是需要 C++ 编译器。


三种方案横向对比

import time, threading
import numpy as np
from numba import jit
import ema_cython, ema_cpp

N, PERIOD, THREADS = 1_000_000, 20, 4


@jit(nopython=True, nogil=True)
def ema_numba(close, period):
    a = 2.0 / (period + 1)
    result = np.empty(len(close))
    result[0] = close[0]
    for i in range(1, len(close)):
        result[i] = a * close[i] + (1 - a) * result[i - 1]
    return result


def ema_python(close, period):
    a = 2.0 / (period + 1)
    result = np.empty(len(close))
    result[0] = close[0]
    for i in range(1, len(close)):
        result[i] = a * close[i] + (1 - a) * result[i - 1]
    return result


def bench(func, data, period, repeat=5):
    """多次运行取最小值,减少系统噪声。"""
    return min(
        (lambda: (t := time.perf_counter(), func(data, period), time.perf_counter() - t)[2])()
        for _ in range(repeat)
    )


def bench_multi(func, data, period, n_threads=THREADS):
    threads = [threading.Thread(target=func, args=(data, period))
               for _ in range(n_threads)]
    t0 = time.perf_counter()
    for t in threads: t.start()
    for t in threads: t.join()
    return time.perf_counter() - t0

汇总对比表(EMA period=20,数组长度 100 万):

方案 单线程耗时 vs Python 多线程耗时 多线程加速比
Python 循环 0.1709s 1.0x 0.7101s 1.0x
Numba nogil=True 0.0022s 76.4x 0.0029s 3.1x
Cython with nogil 0.0038s 44.5x 0.0047s 3.2x
pybind11 gil_scoped_release 0.0025s 68.2x 0.0032s 3.1x

选型建议

场景 推荐方案 理由
快速原型 / 研究脚本 Numba 代码改动最小(加两行装饰器),无需编译步骤
生产库 / 发布给用户 Cython 零运行时依赖,编译期安全检查,pandas/scipy 验证过
对接已有 C++ 代码库 pybind11 直接绑定 C++ 接口,无需重写逻辑
纯 Python 环境限制 multiprocessing 无需编译,但有进程启动开销

五、延伸:Python 3.13 no-GIL 模式(PEP 703)

Python 3.13 引入了实验性的 no-GIL 构建--disable-gil),这是社区多年来期待的重大改变。

核心变化

  • 细粒度引用计数(biased reference counting)替代 GIL,减少引用计数操作中的竞争
  • 多线程下 CPU 密集型任务可实现真正并行
  • 不需要修改应用层代码

当前限制与注意事项

  1. 性能回退:no-GIL 模式下,单线程程序速度会下降约 5%~15%(细粒度锁的开销)
  2. 生态兼容性:大量 C 扩展库(NumPy、Pandas、PyTorch 等)依赖 GIL 提供的线程安全保证,可能在 no-GIL 模式下出现竞态 bug
  3. 仍是实验阶段:CPython 团队明确表示 3.13 的 no-GIL 是"技术预览",不建议生产使用
  4. 需要显式启用:默认构建仍有 GIL,需要使用专门的 no-GIL 构建版本
# 检查当前 Python 是否为 no-GIL 构建
python -c "import sys; print(sys._is_gil_enabled())"
# 有 GIL 时输出 True,no-GIL 构建输出 False

实际建议:在 PEP 703 生态完全成熟之前(预计 Python 3.15~3.16),本文介绍的向量化、multiprocessing、asyncio 策略仍是生产环境的可靠选择。no-GIL 模式值得关注,但暂时不建议在生产环境部署。


六、总结与决策树

核心结论

  1. GIL 只影响 Python 字节码执行,不影响 C/C++ 层的计算(NumPy、PyTorch 算子、IO 等)
  2. 多线程对 CPU 密集型纯 Python 代码几乎无效;对 IO 密集型任务有效
  3. 向量化是第一优先级:既能加速单线程,又能释放 GIL,是深度学习和量化金融场景下的最优解
  4. multiprocessing / joblib 是"不改算法"前提下最简单的 CPU 密集型加速手段
  5. asyncio 是 IO 密集型任务的最优并发方案

决策树

你的瓶颈是什么?
│
├── IO 密集型(网络请求、文件读写、数据库查询)
│   └── 首选 asyncio(轻量、无竞态),其次 threading
│
└── CPU 密集型
    │
    ├── 能改写为向量化运算?(NumPy/Pandas/PyTorch)
    │   └── YES → 向量化(首选,速度最快,代码最简洁)
    │
    └── 不能向量化(逻辑复杂、依赖前序状态)
        │
        ├── 任务粒度 > ~100ms?
        │   └── YES → multiprocessing / joblib
        │
        └── 任务粒度很小 → 考虑批量合并后再并行化

各场景推荐方案速查

场景 推荐方案 备注
Pandas 数据清洗 向量化(避免 .apply + lambda) .apply 是隐藏的 Python 循环
机器学习特征工程 NumPy 广播 / Pandas rolling 彻底消除 Python for 循环
DL 训练数据加载 DataLoader num_workers ≥ 2(需磁盘IO) workers 是进程,独立 GIL;纯内存数据集不适用
DL 大规模特征提取 NumPy 向量化预处理 + batch 推理 50K样本 14x vs 串行;消除 Python 循环是关键
量化因子计算 NumPy 矩阵化 / joblib 500 支股票 → 500×252 矩阵,28x 加速
蒙特卡洛定价(欧式) NumPy 向量化终价公式 GBM 解析解,~3430x 加速
向量化回测 Pandas rolling + ffill 信号 vs 事件驱动串行:20x 加速
大量独立 HTTP 请求 asyncio + aiohttp 万级并发无压力