Nginx的线程池实现是其高效处理异步I/O操作的关键组件之一。以下是Nginx线程池的核心源码分析:
Nginx线程池实现主要位于以下文件中:
- src/os/unix/ngx_thread_pool.c
- 线程池核心实现
- src/os/unix/ngx_thread_pool.h
- 线程池头文件
typedef struct {
ngx_thread_mutex_t mtx;
ngx_thread_cond_t cond;
ngx_thread_pool_queue_t queue;
ngx_int_t waiting;
ngx_thread_pool_t *pool;
ngx_log_t *log;
unsigned failed:1;
} ngx_thread_pool_ctx_t;
typedef struct {
ngx_thread_task_t *first;
ngx_thread_task_t **last;
} ngx_thread_pool_queue_t;
struct ngx_thread_task_s {
ngx_thread_task_t *next;
ngx_uint_t id;
void *ctx;
void (*handler)(void *data, ngx_log_t *log);
ngx_event_t *event;
};
ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
int err;
pthread_t tid;
ngx_uint_t n;
pthread_attr_t attr;
// 初始化互斥锁和条件变量
if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
(void) ngx_thread_mutex_destroy(&tp->mtx, log);
return NGX_ERROR;
}
// 创建工作线程
err = pthread_attr_init(&attr);
if (err) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_attr_init() failed");
return NGX_ERROR;
}
for (n = 0; n < tp->threads; n++) {
err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
if (err) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_create() failed");
return NGX_ERROR;
}
}
(void) pthread_attr_destroy(&attr);
return NGX_OK;
}
static void *
ngx_thread_pool_cycle(void *data)
{
ngx_thread_pool_t *tp = data;
ngx_thread_task_t *task;
for ( ;; ) {
// 获取互斥锁
if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
return NULL;
}
// 等待任务
while (tp->queue.first == NULL && !tp->terminate) {
if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
!= NGX_OK)
{
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
return NULL;
}
}
// 检查终止标志
if (tp->terminate) {
if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
return NULL;
}
break;
}
// 获取任务
task = tp->queue.first;
tp->queue.first = task->next;
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
// 释放互斥锁
if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
return NULL;
}
// 执行任务
task->handler(task->ctx, tp->log);
// 通知任务完成
if (task->event) {
task->event->complete = 1;
task->event->active = 0;
if (ngx_notify(task->event) != NGX_OK) {
ngx_log_error(NGX_LOG_ALERT, tp->log, ngx_errno,
"ngx_notify() failed");
}
}
}
return NULL;
}
ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
return NGX_ERROR;
}
// 将任务添加到队列
*tp->queue.last = task;
tp->queue.last = &task->next;
task->next = NULL;
// 唤醒一个工作线程
if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
return NGX_ERROR;
}
if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
return NGX_ERROR;
}
return NGX_OK;
}
在Nginx模块中使用线程池的典型代码:
// 定义任务处理函数
static void my_task_handler(void *data, ngx_log_t *log)
{
// 执行耗时操作
}
// 提交任务
ngx_thread_task_t *task;
task = ngx_thread_task_alloc(pool, sizeof(my_task_data));
if (task == NULL) {
return NGX_ERROR;
}
task->handler = my_task_handler;
task->ctx = my_task_data;
if (ngx_thread_task_post(thread_pool, task) != NGX_OK) {
return NGX_ERROR;
}
Nginx的线程池实现简洁高效,是其支持高并发的重要基础组件之一。