PIL.33Lua的线程和状态机

11k 词

Lua并不支持真正的多线程,也就是说,抢先的线程间共享内存。缺乏这个支持有两个原因。一是ISO C不提供,所以没有可移植的方式来实现。第二个原因就是我们不认为多线程对Lua是一件好事。

多线程被开发来针对底层的编程。像信号量和监视器这样的同步算法在操作系统的上下文中被提出。找出并修正与多线程相关的bug是非常困难的,某几个这样的bug还会导致安全问题。而且,在某些非常挑剔的情况下同步会导致巨大的性能下降,如内存分配。

多线程的问题是由于对内存的抢占使用而生,所以我们可以使用非抢占式的线程或者不共享内存来避免这些问题。Lua两者都支持。Lua线程(经常被叫做协程)是合作性的,因此会避免未知的线程切换所带来的问题。Lua状态机不共享内存,所以在Lua为并行提供了一个好的基础。接下来我们就会介绍

多线程

Lua中,线程的本制就是一个协程, coroutine。我们可以把一个协程看做一个线程,加上一个好的接口;或者我们可以把线程就作一个底层的API。

从C API的观点来看,你会发觉把线程看成一个 会非常有用————从实现的观点来看,Lua线程就是一个栈。每个 栈 保留了一个线程所有挂起的调用,已经每个调用 的参数和本地变量。换句话说,一个栈拥有一个函数继续运行的所有信息。所以,多线程就意味着多个不同的栈。

绝大多数 Lua的C API在一个 特定的 栈上操作。Lua怎么知道用哪一个栈呢?当我们调用 lua_pushnumber的时候,我们怎么告诉它把数字压到哪里呢?秘密就是 类型 lua_State,这些函数的第一个参数,不仅代表了一个 Lua状态机,而且包括里面的一个线程。(很多人认为这个类型应该叫lua_thread。可能他们是正确的)

每当我们建立一个Lua State时,Lua会自动的在state内建立一个主线程,并返回一个 lua_State值来代表这个线程。 这个主线程绝不会被回收。它只会在我们调用lua_close时和state一起释放。程序一点都不用担心主线程里运行的东西。

可以在一个state内调用 lua_newthread来建立一个新线程:

lua_State *lua_newthread(lua_State *L);

这个函数会被线程压入到栈上,值类型是thread,被返回一个lua_State来代表这个新线程。更具体点,看下面:

L1 = lua_newthread(L);

在运行了这段代码后,我们有了两个线程,L1, L,两个都内部地指向同一个Lua State。每个线程有它自己的栈。线程L1以一个空栈开始;老的线程L在其栈顶对 新线程有一个引用:

printf("%dn", lua_gettop(L1));	--> 0
printf("%sn", luaL_typename(L, -1)); --> thread

除了主线程外,所有的线程都遵从垃圾回收,就跟其他的Lua对象一样。每当我们建立一个新线程,压入到栈的值保证这个线程不会被回收。我们绝不能使用一个没有在State内正确锁定的线程。(主线程会被Lua内部锚定,所以不用担心这个问题)。任何对Lua API的调用都可能会回收一个未锚定的线程,及时调用一个使用这个线程的函数。具体点说,看看下面的代码片段:

lua_State *L1 = lua_newthread(L);
lua_pop(L, 1);
lua_pushstring(L1, "hello")

调用lua_pushstring将会触发垃圾回收器,并回收L1,使这个程序崩溃,尽管 L1是在被使用。为了避免这样,总是保持一个对你使用的线程的引用,具体点,要么是在一个锚定线程的栈上,或者注册表,或者Lua变量中。

一旦有了一个新线程,我们就跟使用主线程一样使用它。我们可以向它的栈上压入或者弹出元素,用它来调用函数等等。下面的代码在新线程内调用f(5),然后把结构移动到老线程内:

lua_getglobal(L1, "f");	/* assume a global function 'f' */
lua_pushinteger(L1, 5);
lua_call(L1, 1, 1);
lua_xmove(L1, L, 1);

lua_xmove会在同一State的两个栈间移动值。一个类似lua_xmove(F, T, n)的调用会从 栈F上弹出 n个元素,然后把n个元素压入到T

在这样的情况时,我们并不真的需要一个新的线程;我们可以仅仅使用主线程。使用多线程的主要目标是为了实现协程,这样我们就可以挂起执行,后面又可以恢复。这时,我们就需要 lua_resume函数了:

int lua_resume(lua_State *L, lua_State *from, int narg);

为了运行一个协程,我们像使用lua_pcall一样使用lua_resume:压入 待执行函数(协程主体),压入参数,以nargs为参数个数调用lua_resume。(from参数一进行此调用的线程或者NULL)。这个的行为有点类似lua_pcall,但有三点不同。

  • lua_resume没有返回的结果个数;其总将函数的所有值返回。
  • 没有消息处理器;一个错误不会解开栈,因此我们可以在错误发生后在检查栈。
  • 如果运行的函数让出了时间片,lua_resume会返回LUA_YIELD,但线程在此State保留,且能进行恢复。

lua_resume返回LUA_YIELD时,线程的栈只有传递给yield函数的值可见。调用lua_gettop会返回 让出时间片时传递的值。可以用lua_xmove来把这些值转移到其他线程。

为了恢复一个挂起的线程,我们可以继续使用lua_resume。在这个调用时,Lua假设所有栈上的值都会被yield返回。

更具体点,如果在一个lua_resume和下一个恢复间不访问线程的栈,yield函数会返回其让出时间片时确切的值。

典型情况,我们会以一个函数体作为协程的开始。这个Lua函数可以调用其他函数,其他任何函数也可能会发生让出时间片,因此会中断 lua_resume的调用。看看下面代码:

function  (x) coroutine.yield(10, x) end
function foo1 (x) foo(x + 1); return 3 end

现在我们来运行下面的C代码:

lua_State *L1 = lua_newthread(L);
lua_getglobal(L1, "foo1");
lua_pushinteger(L1, 20);
lua_resume(L1, L, 1);

调用lua_resume将会返回LUA_YIELD,以此来表明线程让出了时间片。在这个时候,L1拥有传递给yield的值:

printf("%dn", lua_gettop(L1)); --->2
printf("%lldn", lua_tointeger(L1, 1)); --->10
printf("%lldn", lua_tointeger(L1, 2)); --->21

当我们继续恢复这个线程,它将会从其停止的地方继续运行(调用yield的位置)。在这里,foo函数,返回到了foo1,然后就按序返回到了 lua_resume

lua_resume(L1, L, 0);
printf("%dn", lua_gettop(L1)); ---> 1
printf("lldn", lua_tointeger(L1, 1)); ---> 3

第二次调用lua_resume会返回LUA_OK,表示一个正常的返回。

一个协程也可以调用C函数,这个C函数又能调用其他Lua函数。我们已经讨论了如何使用接续函数来运行这样Lua函数的让出时间片。一个C函数也可以让出时间片。在这样的情况下,其必须提供一个接续函数来在线程恢复的时候调用。为了让出时间片,一个C函数必须调用下面的函数:

int lua_yieldk (lua_State *L, int nresults, int ctx, lua_CFuntion k);

我们在一个返回声明中应该总是使用这个函数:

static int myCfunction (lua_State *L) {
...
return lua_yieldK(L, nresults, ctx, k):
}

这个调用会立即挂起运行的协程。nresults是将用从栈上返回到lua_resume的值个数;ctx 是要传递给接续函数的上下文; k就是接续函数。当协程恢复,控制直接转到k。在让出时间片后,myCfunction什么都不能做;它只能把他应该做的工作全部交给接续函数k

我们来看一个典型的例子。假如我们想写一个读一些数据的函数,在数据不可用时就让出时间片。

int readK (lua_State *L, int status, lua_KContext ctx) {
(void) status; (void) ctx; /* unused parameters */
if (something_to_read()) {
lua_pushstring(L, read_some_data());
return 1;
}
else
return lua_yieldk(L, 0, 0, &readK);
}

int prim_read (lua_State *L) {
return readK(L, 0, 0);
}

例子中,prim_read不需要任何初始化,所以其会直接调用 接续函数readK。如果有数据需要读,readK读取并返回这些数据。否则的话,让出时间片。当线程恢复,继续调用接续函数,这将继续尝试读取一些数据。

如果一个C函数在让出时间片后没什么需要做的,可以不传递接续函数参数给lua_yieldk或者调用lua_yield宏:

return lua_yield(L, nres);

在这个调用后,线程恢复时,控制转到调用myCfunction的函数。

Lua States

调用lua_newstate, luaL_newstate都会建立一个新的Lua State。每个Lua State都是相互独立的,不会共享任何数据。这就是说无论一个Lua State内发生了什么都不会影响其他的State;这也意味着不同的Lua State间不能直接通信;我们必须使用一些中间的C 代码。

比如,有两个State L1, L2,下面的命令会把L1栈顶的字符压入到L2去:

lua_pushstring(L2, lua_tostring(L1, -1));

因为数据必须通过C传递,Lua States就只能交换C能描述的数据,如 字符串和数字。其他类型,比如表,必须序列化来传输。

在支持多线程的系统中,一个有趣的设计就是为每个系统线程建立一个独立的Lua State。这样设计的结果就是得到了类似 POSIX进程的线程,我们不需要共享内存而获得并发。在这节中,我们会通过这种方式来开发一个类似实现的原型。我打算使用 POSIX 线程(pthread)来实现。想把代码移植到其他多线程系统并不困难,只是使用了一些基础的特性而已。

我们要开发的系统很简单。主要目的是展示在多线程的上下文中使用多个Lua States。在我们跑起来后,我们可以在其上添加一些高级特性。把我们的库叫做:lproc,只是包含了四个函数:

  • lproc.start(chunk) 启动一个新过程来运行给定的chunk(一个字符串)。库通过一个C 线程加上关联的Lua State来实现 Lua 进程。
  • lproc.send(channel, var1, var2, ...) 把所有给定的值(字符串)发送到被其名字所标记的通道,这也是一个字符串标记。(练习会要求你支持其他类型)
  • lproc.receive(channle) 接收发送到给定通道的值。
  • lproc.exit() 结束一个进程。只有主进程需要这个函数。如果进程不以调用lpro.exit而结束,整个程序都会终止,而不等待其他进程的结束。

库通过字符串来标记通道,并用他们来匹配发出者和接收者。一个发送操作可以发送任何数量的自负串值,然后会被对应的接收操作所返回。所有的通信都是同步的:一个发送消息的进程会屏蔽,直到有一个进程从此通道接收(就跟 管道一样),反之一样。

和接口一样,lproc的实现也很简单。使用了两个双向链表,一个给等待发送消息的进程用,一个给等待接受消息的进程用。使用了一个互斥量来控制对链表的访问。每个进程都有一个相关的 条件变量。

当一个进程想要发送一个消息到通道,其会遍历接收链表来找到一个等待对应通道消息的进程。如果找到,就会从这个接收链表内移除对应的进程,把消息的值从其自身移动到找到的进程,并信号通知其他进程。如果找不到等待的进程,就会把自己放到发送链表内,等待条件变量的唤醒。如果想要接收消息的,也会做这样一个对称类似的操作。

在这个实现内的一个主要元素就是代表一个进程的结构:



#include "lua.h"

#include "lauxlib.h"

typedef struct Proc {
lua_State *L;
pthread_t thread;
pthread_cond_t cond;
const char *channel;
strunct Proc *previous, *next;
} Proc;

开头的两个字段分别代表了 这个进程使用的Lua State和 在系统级别上跑这个进程的 C 线程。第三个字段,是用来等待线程唤醒一个 发送/接收操作的条件变量。第四个字段指的是当前进程等待的通道。
接下来的代码声明了两个等待链表和关联的互斥量:

static Proc *waitsend = NULL
static Proc *waitreceive = NULL:
static pthread_mutex_t kernel_access = PTHREAD_MUTEX_INITIALIZER;

每个进程都需要一个Proc结构,当其需要send/receive时也需要访问这个结构。所有这些函数需要的参数只有进程的Lua State;因此,每个进程需要将它自己的Proc结构保存在Lua State内。在我们的实现中,每个State在 注册表内 保存其对应的Proc结构为完全用户数据,以键_SELF进行关联。 辅助函数 getself会获得与一个指定 State 关联的 Proc结构:

static Proc *getself (lua_State *L) {
Proc *p;
lua_getfield(L, LUA_REGISTRYINDEX, "_SELF");
p = (Proc *)lua_touserdate(L, -1);
lua_pop(L, 1);
return p;
}

下一个函数,movevalues,会从一个发送进程移动数据到接收进程:

static void movevalues (lua_State *send, lua_State *rec) {
int n = lua_gettop(send);
int i;
luaL_checkstack(rec, n, "too many results");
for (i = 2; i <= n; i ++)
lua_pushstring(rec, lua_tostring(send, i);
}

此函数会将发送者栈内除第一个元素外的所有值移动到接收者,第一个元素代表的是通道。注意,因为我们要压入任意数量的元素,所以必须检查栈的空间。

接下来的函数会遍历链表来找到一个匹配的,等待对应通道的进程。

static Proc *searchmatch (const char *channel, Proc **List) {
Proc *node;
for (node = *List; node != NULL: node = node->next) {
if (strcmp(channel, node->channel) == 0) {
if (*list == node)
*list = (node->next == node) ? NULL : node->next;
node->previous->next = node->next;
node->next->previous = node->previous;
return node;
}
return NULL;
}

找到一个的话就把对应的结构从链表中移除并返回,如果找不到就返回NULL。

下面的函数是找不到对应的进程时,把自己添加到链表中:

static void waitonlist (lua_State *L, const char*channel, Proc **list) {
Proc *p = getself(L);
if (*list == NULL) {
*list = p;
p->previous = p->next = p;
} else {
p->previous = (*list)->previous;
p->next = *list;
p->previous->next = p->next->previous = p
}

p->channel = channel;
do {
pthread_cond_wait(&p->cond, &kernel_access);
} while (p->channel);
}
```

在这样的情况下,进程会把其自身放在等待链表的尾部,并等待条件变量的唤醒。当一个进程唤醒其他进程时,会被其唤醒进程的字段*channel*设置为NULL。所以,如果`p->channel`不是NULL,说明没有唤醒过,会一直保持等待。


接下来我们编写的是发送和接收函数:

```c
static int ll_send (lua_State *L) {
Proc *p;
const char *channel = luaL_checkstring(L, 1);
pthread_mutex_lock(&kernel_access);
p = searchmatch(channel, &waitreceive);

if (p) {
movevalues(L, p->L);
p->channel = NULL;
pthread_cond_signal(&p-cond);
} else {
waitonlist(L, channel, &waisend):

pthread_mutex_unlock(&kernel_access);
return 0;
}

static int ll_receive (lua_State *L) {
Proc *p;
const char *channel = luaL_checkstring(L, 1);
lua_settop(L, 1);
pthread_mutex_lock(&kernel_access);
p = searchmath(channel, &waisend);

if (p) {
movevalues(p->L, L);
p->channel = NULL;
pthread_cond_signal(&p->cond):
} else {
waitonlist(L, channel, &waitreceive);

pthread_mutex_unlock(&kernel_access);

return lua_gettop(L) - 1;
}
```


接下来我们看看如何建立一个新进程呢。一个新进程需要一个新的POSIX线程,并且需要一个线程运行入口(函数)。下面是一个原型:
static void *ll_thread (void *arg);
	
为了建立后运行一个进程,系统必须建立一个新的Lua State,开始一个新线程,编译给定的chunk,调用chunk,然后释放资源。原始的线程做了开始的三个任务,新线程做剩下的。(为了简化错误处理,系统只是在成功编译给定的chunk后启动一个新的线程)。

```c
// 建立一个新进程
static int ll_start (lua_State *L) {
pthread_t thread;
const char *chunk = luaL_checkstring(L, 1);
lua_State *L1 = luaL_newstate();

if (L1 == NULL)
luaL_error(L, "unable to create new thread");

if (luaL_loadstring(L1, chunk) != 0)
luaL_error(L, "error in thread body:%s", lua_tostring(L1, -1));

if (pthread_create(&thread, NULL, ll_thread, L1) !== 0)
luaL_error(L, "unable to create new thread")

pthread_detach(thread);
return 0;
}

这个函数先建立新的Lua State L1,然后在其内编译给定的chunk。出错的话,就会在原来的 state L内进行处理。接着,建立一个新线程,执行ll_thread,并以新的State L1 作为参数。pthread_detach告诉系统此进程不需要任何结束恢复。

每个线程的主体在函数 ll_thread中。

int luaopen_lproc(lua_State *L);

static void *ll_thread (void *arg) {
lua_State *L = (lua_State) *arg;
Proc *self;

openlibs(L);

luaL_requiref(L, "lproc", luaopen_lproc, 1);
lua_pop(L, 1);

self = (Proc *)lua_newuserdata(L, sizeof(Proc));
lua_setfield(L, LUA_REGISTRYINDEX, "_SELF");

self->L = L;
self->thread = pthread_self();
self->channel = NULL;

pthread_cond_init(&self->cond, NULL);

if (lua_pcall(L, 0, 0, 0) != 0)
fprintf(stderr, "thread error: %s", lua_tostring(L, -1));

pthread_cond_destory(&getself(L)->cond);
lua_close(L):
return NULL;
}

首先,打开标准Lua库和lproc库。 然后,建立和初始化其自身的控制块。接着,调用主要的chunk。最后,销毁条件变量和Lua State。

注意这里使用luaL_requiref来打开lproc库。这函数和 require类似,不过其使用给定的函数(luaopen_lproc,我们例子中)来打开库,而不是寻找一个加载器。在调用这个函数后,luaL_requiref会将结果注册到pacage.loaded表中,后面调用这个库函数就不用再次加载了。如果函数的最后一个参数是 true,其也会在对应的全局变量内注册这个库。

下面是最后几个函数:

static int ll_exit (lua_State *L) {
pthread_exit(NULL):
return 0;
}

static const struct luaL_reg ll_funcs[] = {
{"start", ll_start},
{"send", ll_send},
{"receive", ll_receive},
{"exit", ll_exit},
{NULL, NULL}
};

int luaopen_lproc (lua_State *L) {
luaL_newlib(L, ll_func);
return 1;
}

都很简单。 ll_exit应该只能被主进程结束时调用,以避免程序的立刻结束。luaopen_lproc是打开这个库的标准函数。

和早先说的一样,在Lua内进程的实现是很简单的。我们可以无止境的进行提升。这里就简单的说一下。

一个很明显的提高就是改变对于匹配通道 线性搜索。更好的办法是使用hash表,并对每个通道使用独立的等待链表。

另外一个关于效率的就在进程的创建。建立Lua State是很轻量的,但是,打开标准库就不那么轻量了,而且很多State并不一定需要全部的标准库。我们可以通过使用 预注册的库来避免这个开销。就和我们在require 函数一节说的一样。通过这种方式,不再需要为每个标准库调用luaL_requiref函数,我们只是将库打开函数放在 package.preload表中。如果进程调用 require "lib",然后(也只有 require会)调用这个关联的函数来打开这个库。下面来看看:

static void registerlib (lua_State *L, const char *name, lua_CFunction f) {
lua_getglobal(L, "package");
lua_getfield(L, -1, "preload");
pua_pushcfunction(L, f);
lua_setfield(L, -2, name);
lua_pop(L, 2);
}

static void openlibs (lua_State *L) {
luaL_requiref(L, "_G", luaopen_base, 1);
luaL_requiref(L, "package", luaoepn_package, 1);
lua_pop(L, 2); /* remove results from previous calls */
registerlib(L, "coroutine", luaopen_coroutine);
registerlib(L, "table", luaopen_table);
registerlib(L, "io", luaopen_io);
registerlib(L, "os", luaopen_os);
registerlib(L, "string", luaopen_string);
registerlib(L, "math", luaopen_math);
registerlib(L, "utf8", luaopen_utf8);
registerlib(L, "debug", luaopen_debug);”
}

打开基本库总是没有错的。我们也会需要 包 这个库;否则,我们就没有require函数来打开其他库。其他库都是可选的。我们可以调用我们自己的openlibs,而不用调用luaL_openlibs。当一个进程需要其中一个库时,就可以显示指定库名,require会调用对应的luaopen_*函数。