一、为什么要写一个线程池
在多线程编程中,如果每来一个任务就 pthread_create 一个线程,做完再 pthread_join 销毁,频繁的创建/销毁会带来巨大的系统开销。线程池的核心思想很简单:预先创建一组工作线程,它们从共享任务队列中取任务执行,做完不销毁,继续等下一个任务。 这个项目从零实现了一个 C 语言线程池,完整覆盖了生产者-消费者模型、条件变量、自旋锁、原子操作等并发编程的核心知识点。
二、整体架构
Plaintext
┌─────────────────────────────┐
│ 生产者 (main / 任意线程) │
│ pool_add_task() │
└──────────┬──────────────────┘
│
▼
┌─────────────────────────────┐
│ 任务队列 task_queue_t │
│ ┌───────────────────────┐ │
│ │ spinlock (fast path) │───│── 保护 head/tail 指针
│ │ mutex (slow path) │───│── cond_wait / cond_signal
│ │ atomic_int task_count │───│── 完全无锁读写
│ │ not_empty / not_full │ │
│ └───────────────────────┘ │
└──────────┬──────────────────┘
│
┌─────┼─────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ 取任务 │ │ 取任务 │ │ 取任务 │
│ 执行 │ │ 执行 │ │ 执行 │
└──────────┘ └──────────┘ └──────────┘
线程池结构体很简单,就是一个线程数组 + 一个任务队列:
C
typedef struct threadpool {
pthread_t *threads; // 工作线程 ID 数组
int thread_count; // 线程数量
int stop; // 关闭标志:0=工作中,1=准备下班
task_queue_t *queue; // 共享任务队列
} threadpool_t;
三、核心优化:快慢路径分流
3.1 初版的问题 — 一把大锁串行化所有操作
最初的 worker 函数使用单一 pthread_mutex_t 保护所有队列操作和线程睡眠。这意味着同一时刻只有一个线程能取任务,其他线程全在排队等锁:
C
// 初版 worker — 性能瓶颈
void *worker(void *thread_pool) {
while (1) {
pthread_mutex_lock(&pool->queue->mutex); // 全局串行化
while (task_count == 0 && !pool->stop)
pthread_cond_wait(&pool->queue->not_empty, &pool->queue->mutex);
if (pool->stop && task_count == 0) {
pthread_mutex_unlock(&pool->queue->mutex);
break;
}
task = queue_pop(pool->queue);
pthread_mutex_unlock(&pool->queue->mutex);
task->func(task->arg);
free(task);
}
return NULL;
}
问题很明显:持锁时间太长。pthread_mutex_t 是睡眠锁 — 拿不到锁的线程会被 OS 挂起,等锁释放后再被唤醒,上下文切换开销很大。而队列的 head/tail 操作其实只需要几条指针赋值,拿 mutex 去保护这种纳秒级操作纯粹是"大炮打蚊子"。
3.2 优化方案:三把"锁"各司其职
将锁的职责拆分成三层:
| 组件 | 职责 | 持锁时间 |
|---|---|---|
spinlock | 保护 head/tail 指针操作 | 纳秒级 |
mutex + condvar | 线程的睡眠和唤醒 | 只在没任务时使用 |
atomic_int | 保护 task_count 读写 | 完全无锁 |
优化后的 worker 函数是这样工作的:
C
void *worker(void *thread_pool) {
threadpool_t *pool = (threadpool_t *)thread_pool;
while (1) {
task_t *task = NULL;
// ============ fast path ============
// 原子读 task_count,有任务就直接抢
if (atomic_load(&pool->queue->task_count) > 0) {
task = queue_pop(pool->queue); // spinlock 保护,纳秒级
if (task != NULL) { // 可能被其他线程抢走了
pthread_cond_signal(&pool->queue->not_full);
task->func(task->arg);
free(task);
continue; // 继续 fast path,不碰 mutex
}
}
// ============ slow path ============
// 真的没任务了,才加 mutex 进入睡眠
pthread_mutex_lock(&pool->queue->mutex);
while (atomic_load(&pool->queue->task_count) == 0 && !pool->stop) {
pthread_cond_wait(&pool->queue->not_empty, &pool->queue->mutex);
}
if (pool->stop && atomic_load(&pool->queue->task_count) == 0) {
pthread_mutex_unlock(&pool->queue->mutex);
break; // 下班
}
pthread_mutex_unlock(&pool->queue->mutex);
}
return NULL;
}
核心思路:99% 的时间走 fast path。只有当队列确实空的时候,线程才进入 slow path 加 mutex 休眠。mutex 不再参与高频的取任务竞争,只负责"线程该不该睡觉"这件事。
四、自旋锁实现 — 基于 C11 atomic_flag
spinlock.h 是整个项目最小的文件,但它是性能优化的关键:
C
typedef struct spinlock {
atomic_flag lock; // C11 原子标志位:0=未锁,1=已锁
} spinlock_t;
// 初始化为未锁状态
static inline void spin_init(spinlock_t *s) {
atomic_flag_clear(&s->lock);
}
// 加锁:Test-And-Set 自旋
static inline void spin_lock(spinlock_t *s) {
while (atomic_flag_test_and_set(&s->lock)) {
// 空转等待
}
}
// 解锁
static inline void spin_unlock(spinlock_t *s) {
atomic_flag_clear(&s->lock);
}
几个关键点:
inline 关键字:自旋锁函数体很小(几条指令),函数调用本身的开销(压栈、跳转、恢复现场)可能比函数体还大。inline 告诉编译器直接展开到调用处,消除调用开销。
atomic_flag_test_and_set:这是 C11 标准的原子操作,做了两件事:
返回 lock 的旧值
将 lock 设为 1(上锁)
所以线程 A执行时 lock=0,while(0) 不进入循环,同时 lock 变为 1。线程 B 再来执行时 lock=1,while(1) 原地空转,直到 A 释放锁。
为什么用自旋锁而不是 mutex:mutex 是睡眠锁,拿不到锁的线程会被 OS 挂起然后唤醒,两次上下文切换的成本是微秒级。而队列的 push/pop 只是几条指针赋值,纳秒级完成。用自旋锁空转几个 CPU 周期比上下文切换快一个数量级。
但自旋锁不适合临界区长的场景 — 如果持锁时间几毫秒,空转的 CPU 就全浪费了。
五、任务队列 — 分离关注点
任务队列被单独封装为 task_queue_t,与线程池逻辑解耦:
C
typedef struct task {
task_func func; // 函数指针:线程池不关心任务是什么
void *arg; // 参数,多参数时封装成结构体传入
struct task *next; // 链表串起任务
} task_t;
typedef struct task_queue {
task_t *head; // 队列头
task_t *tail; // 队列尾(尾插法,O(1) 入队)
pthread_mutex_t mutex; // slow path:保护 cond_wait
spinlock_t lock; // fast path:保护 head/tail 指针
pthread_cond_t not_empty; // 队列空 → 消费者等待
pthread_cond_t not_full; // 队列满 → 生产者等待
atomic_int task_count; // 原子变量,完全无锁
int max_tasks; // 容量上限,防止内存无限膨胀
} task_queue_t;
入队和出队操作非常简洁,全程只有 spinlock 保护:
C
void queue_push(task_queue_t *queue, task_t *task) {
spin_lock(&queue->lock);
if (queue->head == NULL) {
queue->head = task;
queue->tail = task;
} else {
queue->tail->next = task; // 尾插法
queue->tail = task;
}
atomic_fetch_add(&queue->task_count, 1); // 原子自增
spin_unlock(&queue->lock);
}
task_t *queue_pop(task_queue_t *queue) {
spin_lock(&queue->lock);
if (queue->head == NULL) {
spin_unlock(&queue->lock);
return NULL;
}
task_t *task = queue->head;
queue->head = task->next;
if (queue->head == NULL)
queue->tail = NULL;
atomic_fetch_sub(&queue->task_count, 1); // 原子自减
spin_unlock(&queue->lock);
return task;
}
注意 task_count 使用的是 atomic_fetch_add / atomic_fetch_sub — 完全不需要锁,CPU 保证原子性。
六、三个关键 API
6.1 pool_create — 创建线程池
C
threadpool_t *pool_create(int n) {
threadpool_t *pool = malloc(sizeof(threadpool_t));
pool->thread_count = n;
pool->stop = 0;
pool->queue = malloc(sizeof(task_queue_t));
queue_init(pool->queue, 10000); // 队列容量 10000
pool->threads = malloc(sizeof(pthread_t) * n);
for (int i = 0; i < n; i++) {
pthread_create(&pool->threads[i], NULL, worker, pool);
}
return pool;
}
pthread_create 之后,每个线程立即进入 worker() 函数开始循环。由于此时队列为空,它们会直接走 slow path 进入 cond_wait 休眠。
6.2 pool_add_task — 提交任务
C
void pool_add_task(threadpool_t *pool, task_func func, void *arg) {
task_t *task = malloc(sizeof(task_t));
task->func = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->queue->mutex);
// 队列满 → 生产者等待(背压)
while (atomic_load(&pool->queue->task_count) >= pool->queue->max_tasks) {
pthread_cond_wait(&pool->queue->not_full, &pool->queue->mutex);
}
queue_push(pool->queue, task); // spinlock 保护
pthread_cond_signal(&pool->queue->not_empty); // 唤醒一个消费者
pthread_mutex_unlock(&pool->queue->mutex);
}
这里加 mutex 而不是 spinlock 的原因是:生产者可能需要在 not_full 条件变量上等待,而条件变量必须与 mutex 配对使用。入队时 queue_push 内部仍然使用 spinlock 保护指针。
6.3 pool_destroy — 优雅关闭
C
void pool_destroy(threadpool_t *pool) {
pthread_mutex_lock(&pool->queue->mutex);
pool->stop = 1; // 通知:准备下班
pthread_cond_broadcast(&pool->queue->not_empty); // 广播唤醒所有睡眠线程
pthread_mutex_unlock(&pool->queue->mutex);
for (int i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL); // 等待每个线程退出
}
free(pool->threads);
queue_destory(pool->queue);
free(pool->queue);
free(pool);
}
关闭流程的要点:
设置 stop = 1
pthread_cond_broadcast 广播唤醒所有在 not_empty 上睡眠的线程
被唤醒的线程检查 while(atomic_load(&pool->queue->task_count) == 0 && !pool->stop) 条件不再满足,退出循环
接着命中 if (pool->stop && task_count == 0) 分支,break 退出 worker 函数
pthread_join 确保所有线程都退出后再释放内存
七、条件变量的正确用法 — 必须用 while 而非 if
一个容易犯的错误是用 if 检查条件:
C
// 错误写法
if (task_count == 0 && !stop)
pthread_cond_wait(¬_empty, &mutex);
必须使用 while,原因有两个:
虚假唤醒(spurious wakeup):POSIX 允许 pthread_cond_wait 在没有线程调用 signal/broadcast 的情况下返回。如果用 if,虚假唤醒后会直接跳出判断去取任务,可能操作空队列。
惊群效应:假设 4 个线程都在等 not_empty,但只有一个任务到来。signal 只唤醒一个线程,但极端情况下多个线程可能同时被唤醒。第一个拿到锁的线程取走了唯一的任务,其他线程醒来后发现 task_count 又变成 0 了。while 会让它们重新检查条件并继续睡眠,if 则会导致它们去操作空指针。
C
// 正确写法
while (atomic_load(&pool->queue->task_count) == 0 && !pool->stop) {
pthread_cond_wait(&pool->queue->not_empty, &pool->queue->mutex);
}
八、性能基准测试
benchmark.c 设计了三个场景来衡量线程池的性能表现:
测试 1:空任务(仅原子自增)
测量线程池的纯调度开销。任务本身几乎不做任何事,瓶颈完全在线程池的锁竞争和调度上。
C
void empty_task(void *arg) {
atomic_fetch_add(&g_done, 1); // 就这一行
(void)arg;
}
分别在 1/2/4/8 个 worker 下提交 200 万个任务。
测试 2:多生产者并发提交
4 个生产者线程同时向同一个线程池提交任务,测量自旋锁在多线程并发写入下的争抢表现。
C
void *producer_thread(void *arg) {
producer_arg_t *pa = (producer_arg_t *)arg;
for (long i = 0; i < pa->count; i++) {
pool_add_task(pa->pool, pa->task_fn, NULL);
}
return NULL;
}
测试 3:轻量任务(100 次整数运算 + 原子自增)
任务本身有少量耗时,观察增加 worker 数量后的加速效果。
C
void light_task(void *arg) {
volatile int x = 0;
for (int i = 0; i < 100; i++) x += i;
atomic_fetch_add(&g_done, 1);
(void)arg;
(void)x;
}
每个测试输出三个时间指标:
| 指标 | 含义 |
|---|---|
post | 提交所有任务耗时 |
exec | 消费者执行完所有任务耗时 |
total | 总耗时 |
thrpt | 吞吐量(tasks/second) |
九、使用示例
C
#include "simple_pool.h"
typedef struct {
int id;
int work_time;
char desc[32];
} task_info_t;
void timed_task(void *arg) {
task_info_t *info = (task_info_t *)arg;
printf("[线程%lu] 开始: %s (预计 %ds)\n",
pthread_self(), info->desc, info->work_time);
sleep(info->work_time);
printf("[线程%lu] 完成: %s\n", pthread_self(), info->desc);
free(arg);
}
int main() {
threadpool_t *pool = pool_create(4); // 4 个工作线程
// 提交 8 个耗时各异的任务
for (int i = 0; i < 8; i++) {
task_info_t *info = malloc(sizeof(task_info_t));
info->id = i;
info->work_time = (i % 3) + 1;
snprintf(info->desc, 32, "任务#%d", i);
pool_add_task(pool, timed_task, info);
}
sleep(20); // 等待任务执行完
pool_destroy(pool); // 优雅关闭
return 0;
}
编译运行:
Bash
gcc -pthread -std=c11 -O2 -o app main.c simple_pool.c task_queue.c
./app
输出中可以看到不同线程 ID 交替出现,证明任务真正在并发执行。
十、总结
这个线程池项目虽然代码量不大(总共约 300 行),但覆盖了并发编程中几个非常核心的概念:
自旋锁 vs 互斥锁:批判区短用 spinlock(空转等),临界区长用 mutex(睡眠等)。关键不是"用什么锁",而是"临界区有多长"。
快慢路径分流:高频操作走轻量路径,低频操作走重量路径。不要让慢路径拖累快路径。
原子操作:atomic_int + atomic_fetch_add 替代带锁的计数器,完全消除锁开销。
条件变量最佳实践:while 不是可选的,是必须的。虚假唤醒和惊群效应是真实存在的问题。
优雅关闭:stop 标志 + broadcast 唤醒 + pthread_join 等待,确保无内存泄漏。
如果你正在学习 C 语言多线程编程,建议按这个顺序阅读源码:
spinlock.h — 最短的文件,理解 TAS 自旋锁
task_queue.c — 理解队列的 push/pop 和原子操作
simple_pool.c 中的 worker() — 理解快慢路径分流
simple_pool.c 中的 pool_destroy() — 理解优雅关闭
main.c — 看完整的调用流程
benchmark.c — 运行压测,观察不同配置下的吞吐量变化
完整代码见 GitHub(https://github.com/yodragon666/threadpool-c)。