徒手写一个 C 语言线程池 — 从 mutex 串行化到快慢路径分流的性能优化之路

一、为什么要写一个线程池

在多线程编程中,如果每来一个任务就 pthread_create 一个线程,做完再 pthread_join 销毁,频繁的创建/销毁会带来巨大的系统开销。线程池的核心思想很简单:预先创建一组工作线程,它们从共享任务队列中取任务执行,做完不销毁,继续等下一个任务。 这个项目从零实现了一个 C 语言线程池,完整覆盖了生产者-消费者模型、条件变量、自旋锁、原子操作等并发编程的核心知识点。

二、整体架构

Plaintext

┌─────────────────────────────┐
│ 生产者 (main / 任意线程)     │
│ pool_add_task()             │
└──────────┬──────────────────┘
           │
           ▼
┌─────────────────────────────┐
│ 任务队列 task_queue_t        │
│ ┌───────────────────────┐   │
│ │ spinlock (fast path)  │───│── 保护 head/tail 指针
│ │ mutex (slow path)     │───│── cond_wait / cond_signal
│ │ atomic_int task_count │───│── 完全无锁读写
│ │ not_empty / not_full  │   │
│ └───────────────────────┘   │
└──────────┬──────────────────┘
           │
     ┌─────┼─────┐
     ▼     ▼     ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│  取任务  │ │  取任务  │ │  取任务  │
│   执行   │ │   执行   │ │   执行   │
└──────────┘ └──────────┘ └──────────┘

线程池结构体很简单,就是一个线程数组 + 一个任务队列:

C

typedef struct threadpool {
    pthread_t *threads; // 工作线程 ID 数组
    int thread_count; // 线程数量
    int stop; // 关闭标志:0=工作中,1=准备下班
    task_queue_t *queue; // 共享任务队列
} threadpool_t;

三、核心优化:快慢路径分流

3.1 初版的问题 — 一把大锁串行化所有操作

最初的 worker 函数使用单一 pthread_mutex_t 保护所有队列操作和线程睡眠。这意味着同一时刻只有一个线程能取任务,其他线程全在排队等锁:

C

// 初版 worker — 性能瓶颈
void *worker(void *thread_pool) {
    while (1) {
        pthread_mutex_lock(&pool->queue->mutex); // 全局串行化
        while (task_count == 0 && !pool->stop)
            pthread_cond_wait(&pool->queue->not_empty, &pool->queue->mutex);
        if (pool->stop && task_count == 0) {
            pthread_mutex_unlock(&pool->queue->mutex);
            break;
        }
        task = queue_pop(pool->queue);
        pthread_mutex_unlock(&pool->queue->mutex);
        task->func(task->arg);
        free(task);
    }
    return NULL;
}

问题很明显:持锁时间太长pthread_mutex_t 是睡眠锁 — 拿不到锁的线程会被 OS 挂起,等锁释放后再被唤醒,上下文切换开销很大。而队列的 head/tail 操作其实只需要几条指针赋值,拿 mutex 去保护这种纳秒级操作纯粹是"大炮打蚊子"。

3.2 优化方案:三把"锁"各司其职

将锁的职责拆分成三层:

组件职责持锁时间
spinlock保护 head/tail 指针操作纳秒级
mutex + condvar线程的睡眠和唤醒只在没任务时使用
atomic_int保护 task_count 读写完全无锁

优化后的 worker 函数是这样工作的:

C

void *worker(void *thread_pool) {
    threadpool_t *pool = (threadpool_t *)thread_pool;
    while (1) {
        task_t *task = NULL;
        // ============ fast path ============
        // 原子读 task_count,有任务就直接抢
        if (atomic_load(&pool->queue->task_count) > 0) {
            task = queue_pop(pool->queue); // spinlock 保护,纳秒级
            if (task != NULL) { // 可能被其他线程抢走了
                pthread_cond_signal(&pool->queue->not_full);
                task->func(task->arg);
                free(task);
                continue; // 继续 fast path,不碰 mutex
            }
        }
        // ============ slow path ============
        // 真的没任务了,才加 mutex 进入睡眠
        pthread_mutex_lock(&pool->queue->mutex);
        while (atomic_load(&pool->queue->task_count) == 0 && !pool->stop) {
            pthread_cond_wait(&pool->queue->not_empty, &pool->queue->mutex);
        }
        if (pool->stop && atomic_load(&pool->queue->task_count) == 0) {
            pthread_mutex_unlock(&pool->queue->mutex);
            break; // 下班
        }
        pthread_mutex_unlock(&pool->queue->mutex);
    }
    return NULL;
}

核心思路:99% 的时间走 fast path。只有当队列确实空的时候,线程才进入 slow path 加 mutex 休眠。mutex 不再参与高频的取任务竞争,只负责"线程该不该睡觉"这件事。

四、自旋锁实现 — 基于 C11 atomic_flag

spinlock.h 是整个项目最小的文件,但它是性能优化的关键:

C

typedef struct spinlock {
    atomic_flag lock; // C11 原子标志位:0=未锁,1=已锁
} spinlock_t;
// 初始化为未锁状态
static inline void spin_init(spinlock_t *s) {
    atomic_flag_clear(&s->lock);
}
// 加锁:Test-And-Set 自旋
static inline void spin_lock(spinlock_t *s) {
    while (atomic_flag_test_and_set(&s->lock)) {
        // 空转等待
    }
}
// 解锁
static inline void spin_unlock(spinlock_t *s) {
    atomic_flag_clear(&s->lock);
}

几个关键点:

inline 关键字:自旋锁函数体很小(几条指令),函数调用本身的开销(压栈、跳转、恢复现场)可能比函数体还大。inline 告诉编译器直接展开到调用处,消除调用开销。

atomic_flag_test_and_set:这是 C11 标准的原子操作,做了两件事:

返回 lock旧值

lock 设为 1(上锁)

所以线程 A执行时 lock=0while(0) 不进入循环,同时 lock 变为 1。线程 B 再来执行时 lock=1while(1) 原地空转,直到 A 释放锁。

为什么用自旋锁而不是 mutex:mutex 是睡眠锁,拿不到锁的线程会被 OS 挂起然后唤醒,两次上下文切换的成本是微秒级。而队列的 push/pop 只是几条指针赋值,纳秒级完成。用自旋锁空转几个 CPU 周期比上下文切换快一个数量级。

但自旋锁不适合临界区长的场景 — 如果持锁时间几毫秒,空转的 CPU 就全浪费了。

五、任务队列 — 分离关注点

任务队列被单独封装为 task_queue_t,与线程池逻辑解耦:

C

typedef struct task {
    task_func func; // 函数指针:线程池不关心任务是什么
    void *arg; // 参数,多参数时封装成结构体传入
    struct task *next; // 链表串起任务
} task_t;
typedef struct task_queue {
    task_t *head; // 队列头
    task_t *tail; // 队列尾(尾插法,O(1) 入队)
    pthread_mutex_t mutex; // slow path:保护 cond_wait
    spinlock_t lock; // fast path:保护 head/tail 指针
    pthread_cond_t not_empty; // 队列空 → 消费者等待
    pthread_cond_t not_full; // 队列满 → 生产者等待
    atomic_int task_count; // 原子变量,完全无锁
    int max_tasks; // 容量上限,防止内存无限膨胀
} task_queue_t;

入队和出队操作非常简洁,全程只有 spinlock 保护:

C

void queue_push(task_queue_t *queue, task_t *task) {
    spin_lock(&queue->lock);
    if (queue->head == NULL) {
        queue->head = task;
        queue->tail = task;
    } else {
        queue->tail->next = task; // 尾插法
        queue->tail = task;
    }
    atomic_fetch_add(&queue->task_count, 1); // 原子自增
    spin_unlock(&queue->lock);
}
task_t *queue_pop(task_queue_t *queue) {
    spin_lock(&queue->lock);
    if (queue->head == NULL) {
        spin_unlock(&queue->lock);
        return NULL;
    }
    task_t *task = queue->head;
    queue->head = task->next;
    if (queue->head == NULL)
        queue->tail = NULL;
    atomic_fetch_sub(&queue->task_count, 1); // 原子自减
    spin_unlock(&queue->lock);
    return task;
}

注意 task_count 使用的是 atomic_fetch_add / atomic_fetch_sub — 完全不需要锁,CPU 保证原子性。

六、三个关键 API

6.1 pool_create — 创建线程池

C

threadpool_t *pool_create(int n) {
    threadpool_t *pool = malloc(sizeof(threadpool_t));
    pool->thread_count = n;
    pool->stop = 0;
    pool->queue = malloc(sizeof(task_queue_t));
    queue_init(pool->queue, 10000); // 队列容量 10000
    pool->threads = malloc(sizeof(pthread_t) * n);
    for (int i = 0; i < n; i++) {
        pthread_create(&pool->threads[i], NULL, worker, pool);
    }
    return pool;
}

pthread_create 之后,每个线程立即进入 worker() 函数开始循环。由于此时队列为空,它们会直接走 slow path 进入 cond_wait 休眠。

6.2 pool_add_task — 提交任务

C

void pool_add_task(threadpool_t *pool, task_func func, void *arg) {
    task_t *task = malloc(sizeof(task_t));
    task->func = func;
    task->arg = arg;
    task->next = NULL;
    pthread_mutex_lock(&pool->queue->mutex);
    // 队列满 → 生产者等待(背压)
    while (atomic_load(&pool->queue->task_count) >= pool->queue->max_tasks) {
        pthread_cond_wait(&pool->queue->not_full, &pool->queue->mutex);
    }
    queue_push(pool->queue, task); // spinlock 保护
    pthread_cond_signal(&pool->queue->not_empty); // 唤醒一个消费者
    pthread_mutex_unlock(&pool->queue->mutex);
}

这里加 mutex 而不是 spinlock 的原因是:生产者可能需要在 not_full 条件变量上等待,而条件变量必须与 mutex 配对使用。入队时 queue_push 内部仍然使用 spinlock 保护指针。

6.3 pool_destroy — 优雅关闭

C

void pool_destroy(threadpool_t *pool) {
    pthread_mutex_lock(&pool->queue->mutex);
    pool->stop = 1; // 通知:准备下班
    pthread_cond_broadcast(&pool->queue->not_empty); // 广播唤醒所有睡眠线程
    pthread_mutex_unlock(&pool->queue->mutex);
    for (int i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL); // 等待每个线程退出
    }
    free(pool->threads);
    queue_destory(pool->queue);
    free(pool->queue);
    free(pool);
}

关闭流程的要点:

设置 stop = 1

pthread_cond_broadcast 广播唤醒所有在 not_empty 上睡眠的线程

被唤醒的线程检查 while(atomic_load(&pool->queue->task_count) == 0 && !pool->stop) 条件不再满足,退出循环

接着命中 if (pool->stop && task_count == 0) 分支,break 退出 worker 函数

pthread_join 确保所有线程都退出后再释放内存

七、条件变量的正确用法 — 必须用 while 而非 if

一个容易犯的错误是用 if 检查条件:

C

// 错误写法
if (task_count == 0 && !stop)
    pthread_cond_wait(&not_empty, &mutex);

必须使用 while,原因有两个:

虚假唤醒(spurious wakeup):POSIX 允许 pthread_cond_wait 在没有线程调用 signal/broadcast 的情况下返回。如果用 if,虚假唤醒后会直接跳出判断去取任务,可能操作空队列。

惊群效应:假设 4 个线程都在等 not_empty,但只有一个任务到来。signal 只唤醒一个线程,但极端情况下多个线程可能同时被唤醒。第一个拿到锁的线程取走了唯一的任务,其他线程醒来后发现 task_count 又变成 0 了。while 会让它们重新检查条件并继续睡眠,if 则会导致它们去操作空指针。

C

// 正确写法
while (atomic_load(&pool->queue->task_count) == 0 && !pool->stop) {
    pthread_cond_wait(&pool->queue->not_empty, &pool->queue->mutex);
}

八、性能基准测试

benchmark.c 设计了三个场景来衡量线程池的性能表现:

测试 1:空任务(仅原子自增)

测量线程池的纯调度开销。任务本身几乎不做任何事,瓶颈完全在线程池的锁竞争和调度上。

C

void empty_task(void *arg) {
    atomic_fetch_add(&g_done, 1); // 就这一行
    (void)arg;
}

分别在 1/2/4/8 个 worker 下提交 200 万个任务。

测试 2:多生产者并发提交

4 个生产者线程同时向同一个线程池提交任务,测量自旋锁在多线程并发写入下的争抢表现。

C

void *producer_thread(void *arg) {
    producer_arg_t *pa = (producer_arg_t *)arg;
    for (long i = 0; i < pa->count; i++) {
        pool_add_task(pa->pool, pa->task_fn, NULL);
    }
    return NULL;
}

测试 3:轻量任务(100 次整数运算 + 原子自增)

任务本身有少量耗时,观察增加 worker 数量后的加速效果。

C

void light_task(void *arg) {
    volatile int x = 0;
    for (int i = 0; i < 100; i++) x += i;
    atomic_fetch_add(&g_done, 1);
    (void)arg;
    (void)x;
}

每个测试输出三个时间指标:

指标含义
post提交所有任务耗时
exec消费者执行完所有任务耗时
total总耗时
thrpt吞吐量(tasks/second)

九、使用示例

C

#include "simple_pool.h"

typedef struct {
    int id;
    int work_time;
    char desc[32];
} task_info_t;

void timed_task(void *arg) {
    task_info_t *info = (task_info_t *)arg;
    printf("[线程%lu] 开始: %s (预计 %ds)\n",
        pthread_self(), info->desc, info->work_time);
    sleep(info->work_time);
    printf("[线程%lu] 完成: %s\n", pthread_self(), info->desc);
    free(arg);
}

int main() {
    threadpool_t *pool = pool_create(4); // 4 个工作线程
    // 提交 8 个耗时各异的任务
    for (int i = 0; i < 8; i++) {
        task_info_t *info = malloc(sizeof(task_info_t));
        info->id = i;
        info->work_time = (i % 3) + 1;
        snprintf(info->desc, 32, "任务#%d", i);
        pool_add_task(pool, timed_task, info);
    }
    sleep(20); // 等待任务执行完
    pool_destroy(pool); // 优雅关闭
    return 0;
}

编译运行:

Bash

gcc -pthread -std=c11 -O2 -o app main.c simple_pool.c task_queue.c
./app

输出中可以看到不同线程 ID 交替出现,证明任务真正在并发执行。

十、总结

这个线程池项目虽然代码量不大(总共约 300 行),但覆盖了并发编程中几个非常核心的概念:

自旋锁 vs 互斥锁:批判区短用 spinlock(空转等),临界区长用 mutex(睡眠等)。关键不是"用什么锁",而是"临界区有多长"。

快慢路径分流:高频操作走轻量路径,低频操作走重量路径。不要让慢路径拖累快路径。

原子操作atomic_int + atomic_fetch_add 替代带锁的计数器,完全消除锁开销。

条件变量最佳实践while 不是可选的,是必须的。虚假唤醒和惊群效应是真实存在的问题。

优雅关闭stop 标志 + broadcast 唤醒 + pthread_join 等待,确保无内存泄漏。

如果你正在学习 C 语言多线程编程,建议按这个顺序阅读源码:

spinlock.h — 最短的文件,理解 TAS 自旋锁

task_queue.c — 理解队列的 push/pop 和原子操作

simple_pool.c 中的 worker() — 理解快慢路径分流

simple_pool.c 中的 pool_destroy() — 理解优雅关闭

main.c — 看完整的调用流程

benchmark.c — 运行压测,观察不同配置下的吞吐量变化

完整代码见 GitHub(https://github.com/yodragon666/threadpool-c)。

搜索

文章归档

广告位招租