From 572f4e2ea3bbfd6e0aa397b09e5924659fb2d4b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Janouch?= Date: Tue, 1 Nov 2016 04:07:53 +0100 Subject: [PATCH] degesch: implement Lua coroutine async basics --- degesch.c | 339 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 333 insertions(+), 6 deletions(-) diff --git a/degesch.c b/degesch.c index c599f0e..d85c814 100644 --- a/degesch.c +++ b/degesch.c @@ -8300,7 +8300,7 @@ struct lua_plugin { struct plugin super; ///< The structure we're deriving struct app_context *ctx; ///< Application context - lua_State *L; ///< Lua state + lua_State *L; ///< Lua state for the main thread struct lua_schema_item *schemas; ///< Registered schema items }; @@ -8385,6 +8385,7 @@ static bool lua_plugin_call (struct lua_plugin *self, int n_params, int n_results, struct error **e) { + // FIXME: this may be called from a thread, then this is wrong lua_State *L = self->L; // We need to pop the error handler at the end @@ -8450,6 +8451,15 @@ lua_plugin_log_error error_free (error); } +static void +lua_plugin_pack (lua_State *L, int n) +{ + lua_createtable (L, n, 0); + lua_insert (L, -n - 1); + for (int i = n; i; i--) + lua_rawseti (L, -i - 1, i); +} + // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - static void @@ -8497,6 +8507,309 @@ lua_plugin_parse (lua_State *L) // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +// The script can create as many wait channels as wanted. They only actually +// do anything once they get yielded to the main lua_resume() call. + +/// Identifier for the Lua metatable +#define XLUA_WCHANNEL_METATABLE "wchannel" + +struct lua_wait_channel +{ + LIST_HEADER (struct lua_wait_channel) + + struct lua_task *task; ///< The task we're active in + + /// Check if the event is ready and eventually push values to the thread; + /// the channel then may release any resources + bool (*check) (struct lua_wait_channel *self); + + /// Release all resources held by the subclass + void (*cleanup) (struct lua_wait_channel *self); +}; + +static int +lua_wchannel_gc (lua_State *L) +{ + struct lua_wait_channel *self = + luaL_checkudata (L, 1, XLUA_WCHANNEL_METATABLE); + if (self->cleanup) + self->cleanup (self); + return 0; +} + +static luaL_Reg lua_wchannel_table[] = +{ + { "__gc", lua_wchannel_gc }, + { NULL, NULL } +}; + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// A task encapsulates a thread so that wait channels yielded from its main +// function get waited upon by the event loop + +#define XLUA_TASK_METATABLE "task" ///< Identifier for the Lua metatable + +struct lua_task +{ + LIST_HEADER (struct lua_task) + + struct lua_plugin *plugin; ///< The plugin we belong to + lua_State *thread; ///< Lua thread + struct lua_wait_channel *active; ///< Channels we're waiting on + struct poller_idle idle; ///< Idle job +}; + +static void +lua_task_unregister_channels (struct lua_task *self) +{ + LIST_FOR_EACH (struct lua_wait_channel, iter, self->active) + { + iter->task = NULL; + LIST_UNLINK (self->active, iter); + lua_cache_invalidate (self->plugin->L, iter); + } +} + +static void +lua_task_cancel_internal (struct lua_task *self) +{ + if (self->thread) + { + lua_cache_invalidate (self->plugin->L, self->thread); + self->thread = NULL; + } + lua_task_unregister_channels (self); + poller_idle_reset (&self->idle); + + // The task no longer has to stay alive + lua_cache_invalidate (self->plugin->L, self); +} + +static int +lua_task_cancel (lua_State *L) +{ + struct lua_task *self = luaL_checkudata (L, 1, XLUA_TASK_METATABLE); + // We could also yield and make lua_task_resume() check "self->thread", + // however the main issue here is that the script should just return + luaL_argcheck (L, L != self->thread, 1, + "cannot cancel task from within itself"); + lua_task_cancel_internal (self); + return 0; +} + +#define lua_task_wakeup(self) poller_idle_set (&(self)->idle) + +static bool +lua_task_schedule (struct lua_task *self, int n, struct error **e) +{ + lua_State *L = self->thread; + for (int i = -1; -i <= n; i--) + { + struct lua_wait_channel *channel = + luaL_testudata (L, i, XLUA_WCHANNEL_METATABLE); + if (!channel) + return error_set (e, "bad argument #%d to yield: %s", -i + n + 1, + "tasks can only yield wait channels"); + if (channel->task) + return error_set (e, "bad argument #%d to yield: %s", -i + n + 1, + "wait channels can only be active in one task at most"); + } + for (int i = -1; -i <= n; i--) + { + // Quietly ignore duplicate channels + struct lua_wait_channel *channel = lua_touserdata (L, i); + if (channel->task) + continue; + + // By going in reverse the list ends up in the right order + channel->task = self; + LIST_PREPEND (self->active, channel); + lua_cache_store (self->plugin->L, channel, i); + } + lua_pop (L, n); + + // There doesn't have to be a single channel + // We can also be waiting on a channel that is already ready + lua_task_wakeup (self); + return true; +} + +static void +lua_task_resume (struct lua_task *self, int index) +{ + lua_State *L = self->thread; + bool waiting_on_multiple = self->active && self->active->next; + + // Since we've ended the wait, we don't need to hold on to them anymore + lua_task_unregister_channels (self); + + // On the first run we also have the main function on the stack, + // before any initial arguments + int n = lua_gettop (L) - (lua_status (L) == LUA_OK); + + // Pack the values in a table and prepend the index of the channel, so that + // the caller doesn't need to care about the number of return values + if (waiting_on_multiple) + { + lua_plugin_pack (L, n); + lua_pushinteger (L, index); + lua_insert (L, -2); + n = 2; + } + + int res = lua_resume (L, NULL, n); + struct error *error = NULL; + if (res == LUA_YIELD) + { + // AFAIK we don't get any good error context information from here + if (lua_task_schedule (self, lua_gettop (L), &error)) + return; + } + // For simplicity ignore any results from successful returns + else if (res != LUA_OK) + { + luaL_traceback (L, L, lua_tostring (L, -1), 0 /* or 1? */); + lua_plugin_process_error (self->plugin, lua_tostring (L, -1), &error); + lua_pop (L, 2); + } + if (error) + lua_plugin_log_error (self->plugin, "task", error); + lua_task_cancel_internal (self); +} + +static void +lua_task_check (struct lua_task *self) +{ + poller_idle_reset (&self->idle); + + lua_Integer i = 0; + LIST_FOR_EACH (struct lua_wait_channel, iter, self->active) + { + i++; + if (iter->check (iter)) + { + lua_task_resume (self, i); + return; + } + } + if (!self->active) + lua_task_resume (self, i); +} + +// The task dies either when it finishes, it is cancelled, or at plugin unload +static luaL_Reg lua_task_table[] = +{ + { "cancel", lua_task_cancel }, + { "__gc", lua_task_cancel }, + { NULL, NULL } +}; + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +struct lua_wait_timer +{ + struct lua_wait_channel super; ///< The structure we're deriving + struct poller_timer timer; ///< Timer event + bool expired; ///< Whether the timer has expired +}; + +static bool +lua_wait_timer_check (struct lua_wait_channel *wchannel) +{ + struct lua_wait_timer *self = + CONTAINER_OF (wchannel, struct lua_wait_timer, super); + return self->super.task && self->expired; +} + +static void +lua_wait_timer_cleanup (struct lua_wait_channel *wchannel) +{ + struct lua_wait_timer *self = + CONTAINER_OF (wchannel, struct lua_wait_timer, super); + poller_timer_reset (&self->timer); +} + +static void +lua_wait_timer_dispatch (struct lua_wait_timer *self) +{ + self->expired = true; + if (self->super.task) + lua_task_wakeup (self->super.task); +} + +static int +lua_plugin_push_wait_timer (struct lua_plugin *plugin, lua_State *L, + lua_Integer timeout) +{ + struct lua_wait_timer *self = lua_newuserdata (L, sizeof *self); + luaL_setmetatable (L, XLUA_WCHANNEL_METATABLE); + memset (self, 0, sizeof *self); + + self->super.check = lua_wait_timer_check; + self->super.cleanup = lua_wait_timer_cleanup; + + poller_timer_init (&self->timer, &plugin->ctx->poller); + self->timer.dispatcher = (poller_timer_fn) lua_wait_timer_dispatch; + self->timer.user_data = self; + + if (timeout) + poller_timer_set (&self->timer, timeout); + else + self->expired = true; + return 1; +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +static int +lua_async_go (lua_State *L) +{ + struct lua_plugin *plugin = lua_touserdata (L, lua_upvalueindex (1)); + luaL_checktype (L, 1, LUA_TFUNCTION); + + lua_State *thread = lua_newthread (L); + lua_cache_store (L, thread, -1); + lua_pop (L, 1); + + // Move the main function w/ arguments to the thread + lua_xmove (L, thread, lua_gettop (L)); + + struct lua_task *task = lua_newuserdata (L, sizeof *task); + luaL_setmetatable (L, XLUA_TASK_METATABLE); + memset (task, 0, sizeof *task); + task->plugin = plugin; + task->thread = thread; + + poller_idle_init (&task->idle, &plugin->ctx->poller); + task->idle.dispatcher = (poller_idle_fn) lua_task_check; + task->idle.user_data = task; + poller_idle_set (&task->idle); + + // Make sure the task doesn't get garbage collected and return it + lua_cache_store (L, task, -1); + return 1; +} + +static int +lua_async_timer_ms (lua_State *L) +{ + struct lua_plugin *plugin = lua_touserdata (L, lua_upvalueindex (1)); + lua_Integer timeout = luaL_checkinteger (L, 1); + if (timeout < 0) + luaL_argerror (L, 1, "timeout mustn't be negative"); + return lua_plugin_push_wait_timer (plugin, L, timeout); +} + +static luaL_Reg lua_async_library[] = +{ + { "go", lua_async_go }, + { "timer_ms", lua_async_timer_ms }, + { NULL, NULL }, +}; + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + // Lua code can use weakly referenced wrappers for internal objects. typedef struct weak_ref_link * @@ -8534,6 +8847,7 @@ static void lua_weak_push (struct lua_plugin *plugin, void *object, struct lua_weak_info *info) { + // FIXME: this may be called from a thread, then this is wrong lua_State *L = plugin->L; if (!object) { @@ -9014,6 +9328,7 @@ static struct lua_hook * lua_plugin_push_hook (struct lua_plugin *plugin, int callback_index, enum lua_hook_type type, int priority) { + // FIXME: this may be called from a thread, then this is wrong lua_State *L = plugin->L; luaL_checktype (L, callback_index, LUA_TFUNCTION); @@ -9261,6 +9576,7 @@ lua_plugin_add_config_schema (struct lua_plugin *plugin, struct config_item *subtree, const char *name) { struct config_item *item = str_map_find (&subtree->value.object, name); + // FIXME: this may be called from a thread, then this is wrong lua_State *L = plugin->L; // This should only ever happen because of a conflict with another plugin; @@ -9992,7 +10308,7 @@ lua_plugin_property_set (lua_State *L) } static void -lua_plugin_reg_finish (lua_State *L, struct lua_weak_info *info) +lua_plugin_add_accessors (lua_State *L, struct lua_weak_info *info) { // Emulate properties for convenience lua_pushlightuserdata (L, info); @@ -10000,7 +10316,6 @@ lua_plugin_reg_finish (lua_State *L, struct lua_weak_info *info) lua_setfield (L, -2, "__index"); lua_pushcfunction (L, lua_plugin_property_set); lua_setfield (L, -2, "__newindex"); - lua_pop (L, 1); } static void @@ -10008,7 +10323,8 @@ lua_plugin_reg_meta (lua_State *L, const char *name, luaL_Reg *fns) { luaL_newmetatable (L, name); luaL_setfuncs (L, fns, 0); - lua_plugin_reg_finish (L, NULL); + lua_plugin_add_accessors (L, NULL); + lua_pop (L, 1); } static void @@ -10020,7 +10336,8 @@ lua_plugin_reg_weak (lua_State *L, struct lua_weak_info *info, luaL_Reg *fns) luaL_newmetatable (L, info->name); luaL_setfuncs (L, fns, 0); - lua_plugin_reg_finish (L, info); + lua_plugin_add_accessors (L, info); + lua_pop (L, 1); } static struct plugin * @@ -10050,7 +10367,14 @@ lua_plugin_load (struct app_context *ctx, const char *filename, luaL_newmetatable (L, lua_ctx_info.name); lua_pushlightuserdata (L, plugin); luaL_setfuncs (L, lua_plugin_library, 1); - lua_plugin_reg_finish (L, &lua_ctx_info); + lua_plugin_add_accessors (L, &lua_ctx_info); + + // Add the asynchronous library underneath + lua_newtable (L); + lua_pushlightuserdata (L, plugin); + luaL_setfuncs (L, lua_async_library, 1); + lua_setfield (L, -2, "async"); + lua_pop (L, 1); lua_weak_push (plugin, ctx, &lua_ctx_info); lua_setglobal (L, lua_ctx_info.name); @@ -10065,6 +10389,9 @@ lua_plugin_load (struct app_context *ctx, const char *filename, lua_plugin_reg_meta (L, XLUA_CONNECTION_METATABLE, lua_connection_table); lua_plugin_reg_meta (L, XLUA_CONNECTOR_METATABLE, lua_connector_table); + lua_plugin_reg_meta (L, XLUA_TASK_METATABLE, lua_task_table); + lua_plugin_reg_meta (L, XLUA_WCHANNEL_METATABLE, lua_wchannel_table); + struct error *error = NULL; if (luaL_loadfile (L, filename)) error_set (e, "%s: %s", "Lua", lua_tostring (L, -1));