目录

notos:一个类RTOS的无栈协程库

这篇文章介绍 notos,一个基于 C 实现的提供了类 RTOS 调用接口的无栈协程库,源码在 github 开源。

nos is not os

notos不是os,只有一两百行代码,勉强算作一个库,核心实现也不依赖底层平台。notos不提供实时性保证,不区分优先级,只提供一种差不多能用的伪多任务并发协作机制。

notos最终要达到用起来类似os的效果,也就是能这样使用:

1
2
3
4
5
6
7
int main() {
    os_init();
    os_task_create(task1, task_func1);
    os_task_create(task2, task_func1);
    os_start();
    // unreachable
}

每个任务函数要能这样去写,看上去都是一个无限的循环:

1
2
3
4
5
6
7
8
9
TASK(test_sleep) {
    int data = 0;
    while (true) {
        printf("%d\n", ++data);
        task_sleep(500);
        if (data == 10)
            task_exit();
    }
}

当然实际上,如果不利用底层平台的一些特性,无法实现真正的上下文切换(像常规OS那样)。C函数不可能在函数体运行过程中突然跳出,转而从另一个函数中间的某个位置开始运行,后面又在某个时刻切回当前位置继续运行。

根据Coroutines in C中描述的一种trick,利用这几行代码,就可以实现一种近似的效果:

1
2
3
4
#define tco_init(bp)                  bp = 0;
#define tco_begin(bp)                 switch(bp) { case 0:
#define tco_yeild(bp)                 bp = __LINE__; return; case __LINE__:
#define tco_end()                     }

可以写出这样的函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void trick_func() {
    static int bp;
    static int data;
    tco_begin(bp);
    data = 0;
    while (true) {
        printf("out %d\n", ++data);
        tco_yeild(bp);
    }
    tco_end();
}

连续调用trick_func,就会打印:

1
2
3
4
out 1
out 2
out 3
out 4

implement nos task

Not os,也就没有真正意义上的task,有的只有一些利用这样特殊方法写成的C函数。所谓的调度,不过是不断重复去调用该函数。

因此nos_task的核心数据结构只有:

1
2
3
4
5
6
struct nos_task {
    int               bp;   // coroutine break point
    async_func_t      func;
    void*             arg;
    util_queue_node_t qnode;
};

bp自不用说,要在别处去调用该函数,就需要把函数指针和参数保存下来,qnode负责将task串成列表,以供nos遍历访问。

为了更像那么回事,通过一些宏,最终可以这样创建一个task:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async_function(test_sleep) {
    struct _local {
        int i;
    }*  local     = (struct _local*)async_localdata(sizeof(struct _local));
    int period_ms = async_getarg(int);

    async_function_start();
    local->i = 0;
    while (true) {
        local->i += 1;

        printf("%s.%s -> %d\n", async_taskname(), __func__, local->i);
        async_sleep(period_ms);

        if (local->i >= 10) {
            printf("%s.%s finish.\n", async_taskname(), __func__);
            async_return();
        }
    }
    async_function_end();
}

int main() {
    int period_ms = 500;
    nos_task_create(task_sleep, test_sleep, period_ms);
    while (nos_schedule()) ;
}

无论看起来像怎么回事,任务函数本质上仍然只是一个普通的非阻塞的C函数,每次重新调用,其中的局部变量都会重置,不能像真正的task那样用局部变量保存状态。

因此提供了一个localdata来实现这种需求,模拟“任务中的局部变量”。该数据内存为动态申请,但又不能每次调用都重新申请一个,而且将来也需要被释放,于是task的数据结构中需要多一个指针成员来保存该数据地址。

1
2
3
4
5
6
7
8
9
struct nos_task {
    const char*       name;
    int               bp;
    async_func_t      func;
    void*             localdata;
    void*             arg;
    uint32_t          ts;
    util_queue_node_t qnode;
};

还多了一个name,便于标识task;还有一个ts,用来保存时间值,用来实现task_sleep接口的功能。

async call

nos的任务函数是一个所谓async function,而我想在这样一个async函数中调用其它的async函数。

比如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
async_function(test_acall_base) {
    struct _local {
        int i;
    }* local = (struct _local*)async_localdata(sizeof(struct _local));

    async_function_start();
    local->i = 0;
    while (true) {
        local->i += 1; printf("%d\n", local->i);
        async_yeild();

        async_call(test_acall_sub, nullptr);

        local->i += 2; printf("%d\n", local->i);
        if (local->i >= 5) {
            async_return();
        } else {
            async_yeild();
        }
    }
    async_function_end();
}

由于它本质上还是普通的C函数,因此并不会真正如看上去的那般,只在async_return的时候才返回。而且就算返回,也不可能return到它的“调用者”那里。以上面的test_acall_sub为例,它真正的调用者并不是test_acall_base,而是nos_schedule

nos_schedule的所谓调度,本质只是不断调用每个task的func函数。如果task中调用了其他async函数,那func这个task的“前台函数”就会变成新的async函数。同时,在该async函数“返回”,也应能恢复到上一级函数。

task的数据结构变为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct nos_task {
    const char*       name;
    nos_context_t*    context;
    uint32_t          ts;
    util_queue_node_t qnode;
};
struct nos_context {
    int            bp;   // coroutine break point
    async_func_t   func;
    void*          localdata;
    void*          arg;
    nos_context_t* prev;
};

调用async_call,即创建一个新的nos_context,并更新task当前的context。

这里有一个地方特殊,async_call需要先更新task的context,再保存当前breakpoint。然而task的context已然切换,再调用async_yeild,断点位置就保存到新函数的上下文了。

于是添加了一个有些丑陋的接口。action可能有副作用,必须放在保存bp之后;而等再回到这里时,不能再执行它,必须放在恢复点之前:

1
#define tco_yeild_after(bp, action)   bp = __LINE__; action; return; case __LINE__:

async_return时无需再通过async_yeild保存位置,真正return就可。在此之前,需要将当前的context销毁,并让task恢复成上一级的context。

sleep and timer

nos的主函数本质是一个busy-loop,不像常规os那般,由systick驱动。因此sleep也好、timer定时器也好,都是在nos_schedule中查询当前系统时间戳,唤醒超时的task和定时器。除了一个nos_gettimestamp,对外界没有任何依赖。

这种做法看上去不够聪明,没有事情可做的时候,一直在查询时间,遍历各种列表。但如果需要更“聪明”的解决方案,为何不直接用真正的os。再说,没有任务的时候,cpu本来也不可能真正闲下来。

sleep和timer的实现过于简单,没有专门说明的必要。而且timer本质上和nos的其他部分完全独立。

按说延时task和timer均应进行排序,避免每次都遍历整个列表。不过为了省市,这一块并没有处理。

demo中nos_gettimestamp的实现非常别扭,但在MCU中则可以非常简单。只需一个随定时器中断不段增长的全局时间,该函数只需返回此变量值即可。

一般以ms为时间单位,从系统启动开始计时,uint32足够几十天时间,因此不考虑时间值溢出回卷问题。

semaphore

所有的nos task、timer都和main函数在同一个线程内运行,不存在抢占,因此无需考虑常规多线程环境下的数据竞争问题。不过,除了task自身的sleep和yield外,task之间也需要一种机制来彼此协作。

nos中实现了一个极简单的sem。等待信号量的task会将自己从任务队列中移除,链接到该sem的队列上。除非有别的task释放信号量,再次将之移入任务队列,否则将不会被调度。

由于不需要真正的锁,基于sem很容易即可实现mutex和cond。

sem的出现引入了两个小问题。

nos_schedule中需要遍历task队列,而task可能主动休眠,会将自己从队列中移出,util_queue_foreach_safe确保这种操作是安全的。但有了sem,则task不仅可能将自己从队列移出,还可能移入别的被唤醒的task。

假设taskA通过信号量“唤醒”了taskB,那taskB起码应该在taskA再下一次被调度之前调度。简单的util_queue_insert只是将新节点挂在整个链表的末尾,如果当前已遍历到最后一个node,使用util_queue_foreach_safe,最后新添加的node将无法在本轮循环中被遍历到。

这个问题可以通过把被唤醒的task添加到当前task之前来解决。

此外,如果当前试图通过信号量唤醒某个task的并不是某个task,而是main函数,或某个timer定时器回调函数,此时根本不存在什么“当前task”,无法将被唤醒的task插入到一个合适的位置。

最终的解决办法是使用两个队列。当前循环正在调度中的task放在一个队列,被激活需要在下一个循环被首先调度的task放在另一个队列。

usage of nos

nos当然主要是图个乐,使用一种非常规的C语言技巧实现一些比较神奇的效果。使用也不算方便,在编写代码时需要遵循许多约定和限制。如果不遵守这些规定,编译器往往不能检查出来。

不过它也有一些独到的优势。

与裸机应用相比,使用nos,可以一种多线程的思维去编写代码,无需手动处理任务调度、协作、延时的问题。而相比rtos,nos的各task不占用单独的栈空间,任务调度的时间开销也很小。同时,完全不需要考虑访问共享数据可能面临的竞争问题。

nos可用作裸机应用,但裸机应用时app和isr之间共享数据的同步问题,这里自然也会遇到。解决办法当然也非常简单,这里不再多言。

nos可与os结合,则整个nos将运行在一个线程里。此时可在每轮调度之后添加任务休眠,避免死循环忙等待。nos线程和其他线程之间的数据同步,利用该os所提供的机制进行保护。

不要在nos所在线程之外调用任何nos提供的接口。nos及各nos task中不应调用任何阻塞函数,否则会将整个nos阻塞住。

nos与外部线程之间的数据交互,建议在main函数schedule间隙或定时器回调函数中处理。