文前导读
skynet 是一个由云风所写的轻量级在线游戏服务器框架。本文为 skynet 框架源码剖析系列的第一篇文章,主要探讨 skynet 的消息调度机制,包含了以下内容:
- 基本数据结构:包括消息队列以及消息本身的数据结构
- 消息调度的过程:主要包括了消息是如何生产,又如何消费的
- 多线程模型:skynet 的线程池设计,如何为进程设置合理的优先级使其拥有较高的 CPU 利用率,如何做线程监控设计(monitor) 以及如何高效地实现线程安全。
基本数据结构之消息队列
skynet 采用了二级消息队列模式,其中顶层消息队列为 global_queue
,而底层的消息队列为 message_queue
,它们的具体定义如下:
1 | //skynet_mq.c |
skynet 的消息队列形式如下:
基本数据结构之消息
skynet 中一共支持两种不同的消息,一种为本地消息skynet_message
,另一种则为远程消息 remote_message
。其中,skynet_message
和 remote_message
如下:
1 | //skynet_mq.h |
这里解释一下上述消息定义中的 session
和 type
字段。**session
主要用来匹配一对请求和响应。当一个服务向另一个服务提起请求时,会生成一个 session,并跟随请求包一并发送出去。接收端接收到包并处理完毕后,再将同样的 session 返回。这样,编写服务的人只需要在服务的 callback 函数中记录下所有发送出去的 session 就可以在收到每个消息后调用正确的处理函数。而 **type
主要是用来区分不同的消息包类型。type 的定义如下:
1 |
谁生产,谁消费?
在 skynet 中,每个服务都拥有自己的一个次级消息队列。一个服务给另一个服务发送消息的过程,本质上就是将一个 skynet_message 压入到目标服务的次级消息队列当中。当一个服务的次级消息队列非空时,skynet 会将其push 到全局消息队列当中。而消息的消费,则是由线程池中的 worker 线程来完成,其大致的框图如下:
消息消费的过程
在 skynet 启动的时候,会根据配置文件的 thread
字段初始化线程池。其中线程池中的前三个线程是 monitor
, timer
和 socket
线程。其中,monitor 线程主要负责检查每个服务是否陷入了死循环,socket 线程负责网络相关操作,timer 线程则负责定时器。对应代码如下:
1 | //skynet_start.c |
在上述代码中,我们可以看出 skynet 创建线程池的流程,先创建好 monitor、socket 和 timer 这三个线程,然后创建相应数量的 worker 线程,而每个 worker 线程最终会调用 skynet_context_message_dispatch
函数从全局消息队列中获取消息。skynet_context_message_dispatch
的定义如下:
1 | // skynet_start.c |
结合 strat
和 skynet_context_message_dispatch
,我们可以知道 skynet 的消息调度机制的全貌:当 skynet 启动时会初始化线程池,其中线程池内总共包含 4 种线程:monitor
、timer
、socket
和 worker
,其中worker
具有不同的权重值。每个 worker
会不断从全局消息队列中取出某个服务的次级消息队列,并根据权重值的不同从消息队列中取出若干个消息,然后调用服务所属的 callback 函数消费消息。权重与取出的消息个数的关系如下:
-1 :从次级消息队列取出一个消息进行处理
0 :从次级消息队列中取出所有消息进行处理
1 :从次级消息队列中取出一半的消息进行处理
2 :从次级消息队列中取出四分之一的消息进行处理
3 :从次级消息队列中取出八分之一的消息进行处理
这种分配优先级的做法,使得 CPU 的运转效率尽可能的高。当线程足够多时,如果每次都只处理一个消息,虽然可以避免一些服务饿死,但却可能会使得消息队列中出现大量消息堆积。如果每次都处理一整个消息队列中的消息,则可能会使一些服务中的消息长时间得不到相应,从而导致服务饿死。为线程配置权重的做法是一个非常好的折中方案
消息生产的过程
skynet 中不同的服务运行在不同的上下文当中,彼此之间的交互只能通过消息队列进行转发。不同服务之间转发消息的接口为 skynet_send
,其定义如下:
1 | //skynet_server.c |
从上述代码中,skynet_send
使用了 source
和 destination
来标记消息的发送端和接收端,这两个参数的本质就是能够在全网范围内唯一标识一个服务的 handle。handle 为一个 32 位无符号整数,其中高 8 位为 harbor id,用来表示服务所属的 skynet 节点,而剩余的 24 位则用于表示该 skynet 内的唯一一个服务。不管最终调用的函数是 skynet_harbor_send
还是 skynet_context_push
,最后都会回归到 skynet_mq_push
这个函数中。因此,skynet 中发送消息的本质就是往目标服务的次级消息队列中压入消息。
监工机制 —— monitor 线程的工作
说完了 skynet 消息调度中消息的生产与消费,我们来稍微看一看 monitor 线程(监工) 是如何监管 worker 线程的工作的。在这之前我们先看看 monitor 的定义:
1 | //skynet_start.c |
如前面所提到的,当 skynet 启动线程池时,第一个创建的线程便是 monitor 线程,它的运行函数如下:
1 | //skynet_start.c |
monitor 的监管逻辑非常简单,每隔 5 s 便为每个 worker 线程执行一次 skynet_monitor_check
函数。
我们再来看看 skynet_monitor_trigger
函数的实现:
1 | // skynet_start.c |
从上述代码中,我们可以看出 monitor 线程的工作原理。我们来还原一下 monitor 的工作场景:
- 当一个 worker 线程(记为w)从消息队列中取出一个次级消费队列进行消费。在执行
dispatch_message(ctx, &msg);
之前会先调用skynet_monitor_trigger
函数,此时对应的 skynet_monitor(记为w_sm) 有w_sm->version = 1
,w_sm->check_version = 0
成立。随后 w 进入了消息消费过程。- 此时 monitor 刚好对 w_sm 执行了
skynet_monitor_check
函数,使得有w_sm->version == w_sm->check_version == 1
成立。- 当 w 在消费过程中陷入了死循环并超过第二步 5 s 的时间后,monitor 再一次对 w_sm 执行
skynet_monitor_check
函数。这一次 monitor 发现条件w_sm->version == w_sm->check_version
成立,于是向用户返回一条错误日志。- 若 w 在第二步 5 s 的时间内完成了消息消费的过程,则会将
w_sm->source
和w_sm->destination
都设置为 0。 这样即使 monitor 即使检测到w_sm->version == w_sm->check_version
也不会产生错误日志。
如何实现线程安全
在 skynet 的消息调度机制中,可能涉及到竞态问题的地方主要有往全局消息队列中执行push
和pop
操作、往次级消息队列中执行 push
和 pop
操作以及消息的消费过程
1 | struct message_queue * skynet_globalmq_pop() { |
skynet 的全局消息队列会被很多的线程访问,而且同一个服务可以同时接收多个不同服务所发送来的信息,因此这两个队列的访问频率都较高,而且对这两个队列的压入和弹出操作都非常快,使用自旋锁回避互斥锁更加经济。服务的 callback 不必是线程安全的,因为每次 worker 都会从全局消息队列中将整个次级消息队列取出,因此其他线程无法同时访问到同一个次级消息队列,自然也就不会面临竞态问题。
参考资料
[1]. Skynet 设计综述 —— 云风
[2].skynet源码赏析
- 本文作者: Phoenix
- 本文链接: http://hacker-cube.com/2020/11/04/skynet-源码阅读笔记-——-消息调度机制/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!