degesch: implement Lua coroutine async basics

This commit is contained in:
Přemysl Eric Janouch 2016-11-01 04:07:53 +01:00
parent 50599e09bd
commit 572f4e2ea3
Signed by: p
GPG Key ID: B715679E3A361BE6

339
degesch.c
View File

@ -8300,7 +8300,7 @@ struct lua_plugin
{ {
struct plugin super; ///< The structure we're deriving struct plugin super; ///< The structure we're deriving
struct app_context *ctx; ///< Application context struct app_context *ctx; ///< Application context
lua_State *L; ///< Lua state lua_State *L; ///< Lua state for the main thread
struct lua_schema_item *schemas; ///< Registered schema items struct lua_schema_item *schemas; ///< Registered schema items
}; };
@ -8385,6 +8385,7 @@ static bool
lua_plugin_call (struct lua_plugin *self, lua_plugin_call (struct lua_plugin *self,
int n_params, int n_results, struct error **e) int n_params, int n_results, struct error **e)
{ {
// FIXME: this may be called from a thread, then this is wrong
lua_State *L = self->L; lua_State *L = self->L;
// We need to pop the error handler at the end // We need to pop the error handler at the end
@ -8450,6 +8451,15 @@ lua_plugin_log_error
error_free (error); error_free (error);
} }
static void
lua_plugin_pack (lua_State *L, int n)
{
lua_createtable (L, n, 0);
lua_insert (L, -n - 1);
for (int i = n; i; i--)
lua_rawseti (L, -i - 1, i);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void static void
@ -8497,6 +8507,309 @@ lua_plugin_parse (lua_State *L)
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// The script can create as many wait channels as wanted. They only actually
// do anything once they get yielded to the main lua_resume() call.
/// Identifier for the Lua metatable
#define XLUA_WCHANNEL_METATABLE "wchannel"
struct lua_wait_channel
{
LIST_HEADER (struct lua_wait_channel)
struct lua_task *task; ///< The task we're active in
/// Check if the event is ready and eventually push values to the thread;
/// the channel then may release any resources
bool (*check) (struct lua_wait_channel *self);
/// Release all resources held by the subclass
void (*cleanup) (struct lua_wait_channel *self);
};
static int
lua_wchannel_gc (lua_State *L)
{
struct lua_wait_channel *self =
luaL_checkudata (L, 1, XLUA_WCHANNEL_METATABLE);
if (self->cleanup)
self->cleanup (self);
return 0;
}
static luaL_Reg lua_wchannel_table[] =
{
{ "__gc", lua_wchannel_gc },
{ NULL, NULL }
};
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// A task encapsulates a thread so that wait channels yielded from its main
// function get waited upon by the event loop
#define XLUA_TASK_METATABLE "task" ///< Identifier for the Lua metatable
struct lua_task
{
LIST_HEADER (struct lua_task)
struct lua_plugin *plugin; ///< The plugin we belong to
lua_State *thread; ///< Lua thread
struct lua_wait_channel *active; ///< Channels we're waiting on
struct poller_idle idle; ///< Idle job
};
static void
lua_task_unregister_channels (struct lua_task *self)
{
LIST_FOR_EACH (struct lua_wait_channel, iter, self->active)
{
iter->task = NULL;
LIST_UNLINK (self->active, iter);
lua_cache_invalidate (self->plugin->L, iter);
}
}
static void
lua_task_cancel_internal (struct lua_task *self)
{
if (self->thread)
{
lua_cache_invalidate (self->plugin->L, self->thread);
self->thread = NULL;
}
lua_task_unregister_channels (self);
poller_idle_reset (&self->idle);
// The task no longer has to stay alive
lua_cache_invalidate (self->plugin->L, self);
}
static int
lua_task_cancel (lua_State *L)
{
struct lua_task *self = luaL_checkudata (L, 1, XLUA_TASK_METATABLE);
// We could also yield and make lua_task_resume() check "self->thread",
// however the main issue here is that the script should just return
luaL_argcheck (L, L != self->thread, 1,
"cannot cancel task from within itself");
lua_task_cancel_internal (self);
return 0;
}
#define lua_task_wakeup(self) poller_idle_set (&(self)->idle)
static bool
lua_task_schedule (struct lua_task *self, int n, struct error **e)
{
lua_State *L = self->thread;
for (int i = -1; -i <= n; i--)
{
struct lua_wait_channel *channel =
luaL_testudata (L, i, XLUA_WCHANNEL_METATABLE);
if (!channel)
return error_set (e, "bad argument #%d to yield: %s", -i + n + 1,
"tasks can only yield wait channels");
if (channel->task)
return error_set (e, "bad argument #%d to yield: %s", -i + n + 1,
"wait channels can only be active in one task at most");
}
for (int i = -1; -i <= n; i--)
{
// Quietly ignore duplicate channels
struct lua_wait_channel *channel = lua_touserdata (L, i);
if (channel->task)
continue;
// By going in reverse the list ends up in the right order
channel->task = self;
LIST_PREPEND (self->active, channel);
lua_cache_store (self->plugin->L, channel, i);
}
lua_pop (L, n);
// There doesn't have to be a single channel
// We can also be waiting on a channel that is already ready
lua_task_wakeup (self);
return true;
}
static void
lua_task_resume (struct lua_task *self, int index)
{
lua_State *L = self->thread;
bool waiting_on_multiple = self->active && self->active->next;
// Since we've ended the wait, we don't need to hold on to them anymore
lua_task_unregister_channels (self);
// On the first run we also have the main function on the stack,
// before any initial arguments
int n = lua_gettop (L) - (lua_status (L) == LUA_OK);
// Pack the values in a table and prepend the index of the channel, so that
// the caller doesn't need to care about the number of return values
if (waiting_on_multiple)
{
lua_plugin_pack (L, n);
lua_pushinteger (L, index);
lua_insert (L, -2);
n = 2;
}
int res = lua_resume (L, NULL, n);
struct error *error = NULL;
if (res == LUA_YIELD)
{
// AFAIK we don't get any good error context information from here
if (lua_task_schedule (self, lua_gettop (L), &error))
return;
}
// For simplicity ignore any results from successful returns
else if (res != LUA_OK)
{
luaL_traceback (L, L, lua_tostring (L, -1), 0 /* or 1? */);
lua_plugin_process_error (self->plugin, lua_tostring (L, -1), &error);
lua_pop (L, 2);
}
if (error)
lua_plugin_log_error (self->plugin, "task", error);
lua_task_cancel_internal (self);
}
static void
lua_task_check (struct lua_task *self)
{
poller_idle_reset (&self->idle);
lua_Integer i = 0;
LIST_FOR_EACH (struct lua_wait_channel, iter, self->active)
{
i++;
if (iter->check (iter))
{
lua_task_resume (self, i);
return;
}
}
if (!self->active)
lua_task_resume (self, i);
}
// The task dies either when it finishes, it is cancelled, or at plugin unload
static luaL_Reg lua_task_table[] =
{
{ "cancel", lua_task_cancel },
{ "__gc", lua_task_cancel },
{ NULL, NULL }
};
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct lua_wait_timer
{
struct lua_wait_channel super; ///< The structure we're deriving
struct poller_timer timer; ///< Timer event
bool expired; ///< Whether the timer has expired
};
static bool
lua_wait_timer_check (struct lua_wait_channel *wchannel)
{
struct lua_wait_timer *self =
CONTAINER_OF (wchannel, struct lua_wait_timer, super);
return self->super.task && self->expired;
}
static void
lua_wait_timer_cleanup (struct lua_wait_channel *wchannel)
{
struct lua_wait_timer *self =
CONTAINER_OF (wchannel, struct lua_wait_timer, super);
poller_timer_reset (&self->timer);
}
static void
lua_wait_timer_dispatch (struct lua_wait_timer *self)
{
self->expired = true;
if (self->super.task)
lua_task_wakeup (self->super.task);
}
static int
lua_plugin_push_wait_timer (struct lua_plugin *plugin, lua_State *L,
lua_Integer timeout)
{
struct lua_wait_timer *self = lua_newuserdata (L, sizeof *self);
luaL_setmetatable (L, XLUA_WCHANNEL_METATABLE);
memset (self, 0, sizeof *self);
self->super.check = lua_wait_timer_check;
self->super.cleanup = lua_wait_timer_cleanup;
poller_timer_init (&self->timer, &plugin->ctx->poller);
self->timer.dispatcher = (poller_timer_fn) lua_wait_timer_dispatch;
self->timer.user_data = self;
if (timeout)
poller_timer_set (&self->timer, timeout);
else
self->expired = true;
return 1;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static int
lua_async_go (lua_State *L)
{
struct lua_plugin *plugin = lua_touserdata (L, lua_upvalueindex (1));
luaL_checktype (L, 1, LUA_TFUNCTION);
lua_State *thread = lua_newthread (L);
lua_cache_store (L, thread, -1);
lua_pop (L, 1);
// Move the main function w/ arguments to the thread
lua_xmove (L, thread, lua_gettop (L));
struct lua_task *task = lua_newuserdata (L, sizeof *task);
luaL_setmetatable (L, XLUA_TASK_METATABLE);
memset (task, 0, sizeof *task);
task->plugin = plugin;
task->thread = thread;
poller_idle_init (&task->idle, &plugin->ctx->poller);
task->idle.dispatcher = (poller_idle_fn) lua_task_check;
task->idle.user_data = task;
poller_idle_set (&task->idle);
// Make sure the task doesn't get garbage collected and return it
lua_cache_store (L, task, -1);
return 1;
}
static int
lua_async_timer_ms (lua_State *L)
{
struct lua_plugin *plugin = lua_touserdata (L, lua_upvalueindex (1));
lua_Integer timeout = luaL_checkinteger (L, 1);
if (timeout < 0)
luaL_argerror (L, 1, "timeout mustn't be negative");
return lua_plugin_push_wait_timer (plugin, L, timeout);
}
static luaL_Reg lua_async_library[] =
{
{ "go", lua_async_go },
{ "timer_ms", lua_async_timer_ms },
{ NULL, NULL },
};
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// Lua code can use weakly referenced wrappers for internal objects. // Lua code can use weakly referenced wrappers for internal objects.
typedef struct weak_ref_link * typedef struct weak_ref_link *
@ -8534,6 +8847,7 @@ static void
lua_weak_push (struct lua_plugin *plugin, void *object, lua_weak_push (struct lua_plugin *plugin, void *object,
struct lua_weak_info *info) struct lua_weak_info *info)
{ {
// FIXME: this may be called from a thread, then this is wrong
lua_State *L = plugin->L; lua_State *L = plugin->L;
if (!object) if (!object)
{ {
@ -9014,6 +9328,7 @@ static struct lua_hook *
lua_plugin_push_hook (struct lua_plugin *plugin, int callback_index, lua_plugin_push_hook (struct lua_plugin *plugin, int callback_index,
enum lua_hook_type type, int priority) enum lua_hook_type type, int priority)
{ {
// FIXME: this may be called from a thread, then this is wrong
lua_State *L = plugin->L; lua_State *L = plugin->L;
luaL_checktype (L, callback_index, LUA_TFUNCTION); luaL_checktype (L, callback_index, LUA_TFUNCTION);
@ -9261,6 +9576,7 @@ lua_plugin_add_config_schema (struct lua_plugin *plugin,
struct config_item *subtree, const char *name) struct config_item *subtree, const char *name)
{ {
struct config_item *item = str_map_find (&subtree->value.object, name); struct config_item *item = str_map_find (&subtree->value.object, name);
// FIXME: this may be called from a thread, then this is wrong
lua_State *L = plugin->L; lua_State *L = plugin->L;
// This should only ever happen because of a conflict with another plugin; // This should only ever happen because of a conflict with another plugin;
@ -9992,7 +10308,7 @@ lua_plugin_property_set (lua_State *L)
} }
static void static void
lua_plugin_reg_finish (lua_State *L, struct lua_weak_info *info) lua_plugin_add_accessors (lua_State *L, struct lua_weak_info *info)
{ {
// Emulate properties for convenience // Emulate properties for convenience
lua_pushlightuserdata (L, info); lua_pushlightuserdata (L, info);
@ -10000,7 +10316,6 @@ lua_plugin_reg_finish (lua_State *L, struct lua_weak_info *info)
lua_setfield (L, -2, "__index"); lua_setfield (L, -2, "__index");
lua_pushcfunction (L, lua_plugin_property_set); lua_pushcfunction (L, lua_plugin_property_set);
lua_setfield (L, -2, "__newindex"); lua_setfield (L, -2, "__newindex");
lua_pop (L, 1);
} }
static void static void
@ -10008,7 +10323,8 @@ lua_plugin_reg_meta (lua_State *L, const char *name, luaL_Reg *fns)
{ {
luaL_newmetatable (L, name); luaL_newmetatable (L, name);
luaL_setfuncs (L, fns, 0); luaL_setfuncs (L, fns, 0);
lua_plugin_reg_finish (L, NULL); lua_plugin_add_accessors (L, NULL);
lua_pop (L, 1);
} }
static void static void
@ -10020,7 +10336,8 @@ lua_plugin_reg_weak (lua_State *L, struct lua_weak_info *info, luaL_Reg *fns)
luaL_newmetatable (L, info->name); luaL_newmetatable (L, info->name);
luaL_setfuncs (L, fns, 0); luaL_setfuncs (L, fns, 0);
lua_plugin_reg_finish (L, info); lua_plugin_add_accessors (L, info);
lua_pop (L, 1);
} }
static struct plugin * static struct plugin *
@ -10050,7 +10367,14 @@ lua_plugin_load (struct app_context *ctx, const char *filename,
luaL_newmetatable (L, lua_ctx_info.name); luaL_newmetatable (L, lua_ctx_info.name);
lua_pushlightuserdata (L, plugin); lua_pushlightuserdata (L, plugin);
luaL_setfuncs (L, lua_plugin_library, 1); luaL_setfuncs (L, lua_plugin_library, 1);
lua_plugin_reg_finish (L, &lua_ctx_info); lua_plugin_add_accessors (L, &lua_ctx_info);
// Add the asynchronous library underneath
lua_newtable (L);
lua_pushlightuserdata (L, plugin);
luaL_setfuncs (L, lua_async_library, 1);
lua_setfield (L, -2, "async");
lua_pop (L, 1);
lua_weak_push (plugin, ctx, &lua_ctx_info); lua_weak_push (plugin, ctx, &lua_ctx_info);
lua_setglobal (L, lua_ctx_info.name); lua_setglobal (L, lua_ctx_info.name);
@ -10065,6 +10389,9 @@ lua_plugin_load (struct app_context *ctx, const char *filename,
lua_plugin_reg_meta (L, XLUA_CONNECTION_METATABLE, lua_connection_table); lua_plugin_reg_meta (L, XLUA_CONNECTION_METATABLE, lua_connection_table);
lua_plugin_reg_meta (L, XLUA_CONNECTOR_METATABLE, lua_connector_table); lua_plugin_reg_meta (L, XLUA_CONNECTOR_METATABLE, lua_connector_table);
lua_plugin_reg_meta (L, XLUA_TASK_METATABLE, lua_task_table);
lua_plugin_reg_meta (L, XLUA_WCHANNEL_METATABLE, lua_wchannel_table);
struct error *error = NULL; struct error *error = NULL;
if (luaL_loadfile (L, filename)) if (luaL_loadfile (L, filename))
error_set (e, "%s: %s", "Lua", lua_tostring (L, -1)); error_set (e, "%s: %s", "Lua", lua_tostring (L, -1));