插件窝 干货文章 nginx线程池源码是什么

nginx线程池源码是什么

ngx thread 线程 task 64    来源:    2025-04-21

Nginx线程池源码分析

Nginx的线程池实现是其高效处理异步I/O操作的关键组件之一。以下是Nginx线程池的核心源码分析:

主要源码文件

Nginx线程池实现主要位于以下文件中: - src/os/unix/ngx_thread_pool.c - 线程池核心实现 - src/os/unix/ngx_thread_pool.h - 线程池头文件

核心数据结构

1. 线程池结构体

typedef struct {
    ngx_thread_mutex_t        mtx;
    ngx_thread_cond_t         cond;
    ngx_thread_pool_queue_t   queue;
    ngx_int_t                waiting;
    ngx_thread_pool_t        *pool;

    ngx_log_t                *log;

    unsigned                 failed:1;
} ngx_thread_pool_ctx_t;

2. 任务队列结构体

typedef struct {
    ngx_thread_task_t        *first;
    ngx_thread_task_t       **last;
} ngx_thread_pool_queue_t;

3. 任务结构体

struct ngx_thread_task_s {
    ngx_thread_task_t        *next;
    ngx_uint_t                id;
    void                     *ctx;
    void                    (*handler)(void *data, ngx_log_t *log);
    ngx_event_t              *event;
};

核心函数实现

1. 线程池初始化

ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
    int             err;
    pthread_t       tid;
    ngx_uint_t      n;
    pthread_attr_t  attr;

    // 初始化互斥锁和条件变量
    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
        return NGX_ERROR;
    }

    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
        (void) ngx_thread_mutex_destroy(&tp->mtx, log);
        return NGX_ERROR;
    }

    // 创建工作线程
    err = pthread_attr_init(&attr);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_init() failed");
        return NGX_ERROR;
    }

    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
        if (err) {
            ngx_log_error(NGX_LOG_ALERT, log, err,
                          "pthread_create() failed");
            return NGX_ERROR;
        }
    }

    (void) pthread_attr_destroy(&attr);

    return NGX_OK;
}

2. 工作线程主循环

static void *
ngx_thread_pool_cycle(void *data)
{
    ngx_thread_pool_t *tp = data;
    ngx_thread_task_t  *task;

    for ( ;; ) {
        // 获取互斥锁
        if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }

        // 等待任务
        while (tp->queue.first == NULL && !tp->terminate) {
            if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
                != NGX_OK)
            {
                (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
                return NULL;
            }
        }

        // 检查终止标志
        if (tp->terminate) {
            if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
                return NULL;
            }
            break;
        }

        // 获取任务
        task = tp->queue.first;
        tp->queue.first = task->next;

        if (tp->queue.first == NULL) {
            tp->queue.last = &tp->queue.first;
        }

        // 释放互斥锁
        if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }

        // 执行任务
        task->handler(task->ctx, tp->log);

        // 通知任务完成
        if (task->event) {
            task->event->complete = 1;
            task->event->active = 0;

            if (ngx_notify(task->event) != NGX_OK) {
                ngx_log_error(NGX_LOG_ALERT, tp->log, ngx_errno,
                              "ngx_notify() failed");
            }
        }
    }

    return NULL;
}

3. 提交任务到线程池

ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
        return NGX_ERROR;
    }

    // 将任务添加到队列
    *tp->queue.last = task;
    tp->queue.last = &task->next;
    task->next = NULL;

    // 唤醒一个工作线程
    if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NGX_ERROR;
    }

    if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
        return NGX_ERROR;
    }

    return NGX_OK;
}

关键设计特点

  1. 任务队列:使用链表实现的FIFO队列管理待处理任务
  2. 线程同步:使用互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)实现线程间同步
  3. 事件通知:任务完成后通过事件机制通知主线程
  4. 线程管理:固定数量的工作线程,避免频繁创建销毁线程的开销
  5. 无锁设计:任务提交和获取都通过互斥锁保护,保证线程安全

使用示例

在Nginx模块中使用线程池的典型代码:

// 定义任务处理函数
static void my_task_handler(void *data, ngx_log_t *log)
{
    // 执行耗时操作
}

// 提交任务
ngx_thread_task_t *task;

task = ngx_thread_task_alloc(pool, sizeof(my_task_data));
if (task == NULL) {
    return NGX_ERROR;
}

task->handler = my_task_handler;
task->ctx = my_task_data;

if (ngx_thread_task_post(thread_pool, task) != NGX_OK) {
    return NGX_ERROR;
}

Nginx的线程池实现简洁高效,是其支持高并发的重要基础组件之一。