state thread是一个开源的c语言网络协程库,它在用户空间实现了协程调度
st最初是由网景(Netscape)公司的MSPR(Netscape Portable Runtime library)项目中剥离出来,后由SGI(Silicon Graphic Inc)和Yahoo!公司(前者是主力)共同开发维护。
2001年发布v1.0以来一直到2009年v1.9稳定版后未再变动
State Threads:回调终结者(必读)
https://blog.csdn.net/caoshangpa/article/details/79565411
st-1.9.tar.gz 是原版, http://state-threads.sourceforge.net/
state-threads-1.9.1.tar.gz 是srs修改版, https://github.com/ossrs/state-threads
st源码编译
tar zxvf st-1.9.tar.gz
cd st-1.9
make linux-debug // make命令可以查看支持的编译选项
obj目录有编译生成的文件st.h, lib*.so,lib*.a
examples目录有几个例子lookupdns,proxy,server
需要的知识点
1 汇编语言(非必需)
2 线程的栈管理(非必需)
3 线程的调度和同步(必须)。线程不同步的测试代码thread.c
4 setjmp/longjmp的使用(必须)。测试代码setjmp.c
5 epoll原理和使用(必须)。测试代码epoll_server.c 和 epoll_client.c
分析state_thread源码的目的,是为了正确的使用它
st中thread其实是协程的概念
st_xxx分为 io类 和 延迟类
一些重要的数据结构
_st_vp_t _st_this_vp; virtual processor 虚拟处理器
_st_thread_t *_st_this_thread;
_st_clist_t run_q, io_q, zombie_q, thread_q
_st_thread_t *idle_thread, *sleep_q
代码分析
st库自带的example业务逻辑较为复杂,有兴趣可以看下。
为了简化问题,编写了测试代码st-1.9/examples/st_epoll.c,依据此代码提出问题分析问题。
st_init()做了什么?
_st_idle_thread_start()做了什么?
st_thread_create()做了什么?
st_thread_exit()做了什么?
st_usleep()做了什么?
主业务逻辑(无限循环)协程是如何调度的?
监听的文件描述符是如何调度的?
协程如何正常退出?
1 没有设置终止条件变量(不可以被join)的协程直接return即可退出;
2 设置了终止条件变量(可以被join)的协程退出时,先把自己加入到zombie_q中,然后通知等待的协程,等待的协程退出后,自己在退出。
协程的join(连接)是什么意思?
1 创建协程a的时候 st_thread_create(handle_cycle, NULL, 1, 0) 要设置为1, 表示该协程可以被join
2 协程b代码里要掉用st_thread_join(thread, retvalp),表示我要join到协程a上
3 join的意思是 协程a和协程b 有一定关联行,在协程退出时,要先退出协程b 才能退出协程a
4 st中一个协程只能被另一个协程join,不能被多个协程join
5 可以被join的协程a,在没有其他协程join时,协程a无法正常退出
st里的mutex有什么用?
通常情况下st的多协程是不需要加锁的,但是在有些情况下需要锁来保证原子操作,下面会详细说明。
st_mutex_new(void); 创建锁
st_mutex_destroy(st_mutex_t lock); 等待队列必须为空才能销毁锁
st_mutex_lock(st_mutex_t lock); 第一次掉用能获得锁,以后掉用会加入锁的等待队列中(FIFO)
st_mutex_unlock(st_mutex_t lock); 释放锁并激活等待队列的协程
st_mutex_trylock(st_mutex_t lock); 尝试获得锁不会加入到等待队列
st里的cond有什么用?
通常情况下st的多协程是不需要条件变量的,但是有些情况下需要条件变量来保证协程执行的先后顺序,比如:协程a要先于协程b执行
st_cond_new(void); 创建条件变量
st_cond_destroy(st_cond_t cvar); 等待队列必须为空才能销毁条件变量
st_cond_timedwait(st_cond_t cvar, st_utime_t timeout); 限时等待条件变量,会加入条件变量的等待队列中(FIFO),并加入到sleep_q队列中(可能先于FIFO的顺序被调度到)
st_cond_wait(st_cond_t cvar); 阻塞等待条件变量,会加入条件变量的等待队列中(FIFO)
st_cond_signal(st_cond_t cvar); 唤醒阻塞在条件变量上的一个协程
st_cond_broadcast(st_cond_t cvar); 唤醒阻塞在条件变量上的全部协程
st中与调度有关的函数
st的setjmp
#define _ST_SWITCH_CONTEXT(_thread) \ 协程切换的两个宏函数之一,停止当前协程并运行其他协程
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \ 协程切走时调用的函数,一般不管用
if (!MD_SETJMP((_thread)->context)) \ 汇编语言实现 应该跟setjmp()一样 首次掉用返回0
{ \
_st_vp_schedule(); \ 核心调度函数
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \ 协程切回时调用的函数,一般不管用
ST_END_MACRO
st的longjmp
#define _ST_RESTORE_CONTEXT(_thread) \ 协程切换的两个宏函数之一,恢复线程运行
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \ 设置全局变量 _st_this_thread = _thread
MD_LONGJMP((_thread)->context, 1); \ 汇编语言实现 应该跟longjmp()一样, 返回值永远为1
ST_END_MACRO
MD_SETJMP的时候,会使用汇编把所有寄存器的信息保留下来,而MD_LONGJMP则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成函数间的跳转。
st的核心调度函数
void _st_vp_schedule(void)
{
_st_thread_t *thread;
printf("in _st_vp_schedule\n");
printf("_st_active_count = %d\n", _st_active_count);
if (_ST_RUNQ.next != &_ST_RUNQ)
{
printf("use runq\n");
/* Pull thread off of the run queue */
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
}
else
{
printf("use idle\n");
/* If there are no threads to run, switch to the idle thread */
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/* Resume the thread */
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
st辅助调度函数
void *_st_idle_thread_start(void *arg)
{
printf("i'm in _st_idle_thread_start()\n");
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0)
{
/* Idle vp till I/O is ready or the smallest timeout expired */
printf("call _st_epoll_dispatch()\n");
_ST_VP_IDLE(); 处理io类事件
/* Check sleep queue for expired threads */
_st_vp_check_clock(); 处理延时类事件
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me); 从这里恢复运行,然后判断_st_active_count的值
}
/* No more threads */
exit(0); 整个程序退出
/* NOTREACHED */
return NULL;
}
会触发协程切换的函数有哪些?
sched.c:86: _ST_SWITCH_CONTEXT(me); 59 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
sched.c:234: _ST_SWITCH_CONTEXT(me); 221 void *_st_idle_thread_start(void *arg)
sched.c:261: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sched.c:276: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sync.c:131: _ST_SWITCH_CONTEXT(me); 115 int st_usleep(st_utime_t usecs)
sync.c:198: _ST_SWITCH_CONTEXT(me); 180 int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout)
sync.c:315: _ST_SWITCH_CONTEXT(me); 290 int st_mutex_lock(_st_mutex_t *lock)
sched.c:134: _ST_RESTORE_CONTEXT(thread); 115 void _st_vp_schedule(void)
st的优缺点
优点:
1 用户空间实现协程调度,降低了用户空间和内核空间的切换,一定程度上提高了程序效率。
2 由于是在单核上的单线程多协程,同一时间只会有一个协程在运行,所以对于全局变量也不需要做协程同步。
共享资源释放函数只需做到可重入就行,所谓的可重入就是释放之前先判断是否为空值,释放后要赋空值。
3 协程使用完,直接return即可,st会回收协程资源并做协程切换。
4 可以通过向run_q链表头部加入协程,来实现优先调度。
5 st支持多个操作系统,比如 AIX,CYGWIN,DARWIN,FREEBSD,HPUX,IRIX,LINUX,NETBSD,OPENBSD,SOLARIS
缺点:
1 所有I/O操作必须使用st提供的API,只有这样协程才能被调度器管理。
2 所有协程里不能使用sleep(),sleep()会造成整个线程sleep。
3 被调度到的协程不会被限制运行时长,如果有协程是cpu密集型或死循环,就会严重阻碍其他协程运行。
4 单进程单线程,只能使用单核,想要通过多个cpu提高并发能力,只能开多个程序(进程),多进程通信较麻烦。
补充知识点
1 线程为什么要同步?
线程由内核自动调度
同一个进程上的线程共享该进程的整个虚拟地址空间
同一个进程上的线程代码区是共享的,即不同的线程可以执行同样的函数
所以在并发环境中,多个线程同时对同一个内存地址进行写入,由于CPU寄存器时间调度上的问题,写入数据会被多次的覆盖,会造成共享数据损坏,所以就要使线程同步。
2 什么情况下需要线程同步?
线程同步指的是 不同时发生,就是线程要排队
1 多核,单进程多线程,不同线程会对全局变量读写,这种情况才需要对线程做同步控制
2 单核,单进程多线程,不同线程会对全局变量读写,这种情况不需要对线程做同步控制
3 多核,单进程多线程,不同线程不对全局变量读写,这种情况不需要对线程做同步控制
4 多核,单进程多线程,不同线程会对全局变量读,这种情况不需要对线程做同步控制
问题:对于第2条,这个应该是有点儿片面,有依赖有优先级抢占也是要同步,除非像abcde这种几个线程干一摸一样的事情,而且项目之间不依赖。
有依赖 可以理解为 生产消费关系,虽然是 单核 单进程 多线程 但是同一时间只能有一个线程在运行,也就是说 生产和消费不会同时发生,同样多个生产也不会同时发生,所以不需要锁。
线程是有优先级控制,但是不管怎么控制,只要保证同一时间只能有一个线程在运行,就不需要锁了。
问题:对于第2条,这个应该是有点儿片面,有原子操作且原子操作过程中有线程切换,这种是需要锁的。
比如,线程a 第一次读取全局变量x并做处理,然后发生线程切换(线程由内和自动调度)后切回,然后第二次读取全局变量x并做处理,我们想确保两次读取的x值相同,但是发生了线程切换x值可能被改变。
如何 确保 第一次读取并处理和第二次读取并处理是原子操作呢? 使用st_mutex_t
3 accept()序列化
亦称惊群效应,亦亦称Zeeg难题
https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/SerializingAccept.html
在多次fork自己之后,每个进程一般将会开始阻塞在 accept() 上
每当socket上尝试进行一个连接,阻塞在 accept() 上的每个进程的 accept() 都会被唤醒。
只有其中一个进程能够真正接收到这个连接,而剩余的进程将会获得一个无聊的 EAGAIN 这导致了大量的CPU周期浪费,实际解决方法是把一个锁放在 accept() 调用之前,来序列化它的使用
4 Internet Applications网络程序架构
多进程架构 Multi-Process
一个进程服务一个连接,要解决数据共享问题
单进程多线程架构 Multi-Threaded
一个线程服务一个连接,要解决数据同步问题
事件驱动的状态机架构 Event-Driven State Machine
事件触发回调函数(缺点是嵌套) 或 用户空间实现协程调度
实际上 EDSM架构 用很复杂的方式模拟了多线程
st提供的就是EDSM机制,它在用户空间实现协程调度
https://blog.csdn.net/caoshangpa/article/details/53282330