Gunicorn 的 Arbiter 是其核心组件,负责管理 worker 进程的生命周期。下面我将对 Arbiter 的源码进行详细解析。
Arbiter 是 Gunicorn 的主控进程,主要职责包括: - 启动、监控和重启 worker 进程 - 处理信号 - 维护 worker 进程池 - 实现优雅重启和热更新
Arbiter 类位于 gunicorn/arbiter.py
文件中,主要包含以下重要方法:
def __init__(self, app):
self._num_workers = None
self._last_logged_active_worker_count = None
self.log = None
self.workers = {}
self.pid = os.getpid()
self.app = app
self.cfg = app.cfg
# ... 其他初始化代码
def run(self):
"""主运行循环"""
self.start()
util._setproctitle("master [%s]" % self.proc_name)
try:
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
# 处理信号
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
handler()
except (StopIteration, KeyboardInterrupt):
self.halt()
except HaltServer as inst:
self.halt(reason=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
self.log.info("Unhandled exception in main loop", exc_info=True)
self.stop(False)
if self.pid == 0:
return
self.log.info("Shutting down")
sys.exit(-1)
def manage_workers(self):
"""维护 worker 数量"""
if len(self.workers) < self.num_workers:
self.spawn_workers()
workers = self.workers.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
def spawn_worker(self):
"""生成一个新的 worker"""
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.workers[pid] = worker
return pid
# Worker 进程
worker_pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker_pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except AppImportError as e:
self.log.debug("Exception while loading the application", exc_info=True)
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception("Exception in worker process")
sys.exit(self.WORKER_BOOT_ERROR)
finally:
self.log.info("Worker exiting (pid: %s)", worker_pid)
try:
worker.tmp.close()
except:
pass
Arbiter 通过信号与 worker 进程通信:
def init_signals(self):
"""初始化信号处理器"""
# 关闭现有信号处理器
[signal.signal(s, signal.SIG_DFL) for s in range(1, signal.NSIG)]
# 设置新信号处理器
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_int)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGUSR2, self.handle_usr2)
signal.signal(signal.SIGTTIN, self.handle_ttin)
signal.signal(signal.SIGTTOU, self.handle_ttou)
signal.signal(signal.SIGCHLD, self.handle_chld)
Gunicorn 的优雅重启是通过 USR2 信号实现的:
def handle_usr2(self):
"""处理USR2信号,实现优雅重启"""
self.reexec()
def reexec(self):
"""重新执行主进程"""
if self.reexec_pid != 0:
self.log.warning("USR2 signal ignored. Child exists.")
return
master_pid = os.getpid()
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
return
# 新主进程
os.environ['GUNICORN_PID'] = str(master_pid)
self.cfg.pre_exec(self)
# 重新执行
os.execvpe(sys.argv[0], sys.argv, os.environ)
多进程模型:Arbiter 采用 master-worker 模型,master 负责管理 worker 进程
信号队列:使用 SIG_QUEUE
来处理异步信号,避免信号处理函数的限制
Worker 管理:
workers
跟踪所有 worker 进程热更新:
日志分析:设置 --log-level debug
查看详细运行日志
信号追踪:使用 strace -p <pid>
跟踪信号处理
源码调试:
import pdb; pdb.set_trace() # 在关键位置插入断点
进程树查看:
pstree -p <master_pid>
Worker 频繁重启:
max_requests
和 max_requests_jitter
参数信号不响应:
内存泄漏:
--preload
减少内存占用通过深入理解 Arbiter 的源码,可以更好地优化 Gunicorn 的性能和稳定性,以及解决生产环境中遇到的各种问题。