diff --git a/liberty.c b/liberty.c index fd9ccc7..de57812 100644 --- a/liberty.c +++ b/liberty.c @@ -1081,17 +1081,6 @@ struct async async_fn destroy; ///< Destroys the whole object }; -static void -async_init (struct async *self, struct async_manager *manager) -{ - memset (self, 0, sizeof *self); - self->manager = manager; -} - -static bool async_run (struct async *self); - -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - struct async_manager { pthread_mutex_t lock; ///< Lock for the queues @@ -1110,19 +1099,87 @@ struct async_manager int finished_pipe[2]; ///< Signals that a task has finished }; +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + static void -async_manager_init (struct async_manager *self) +async_init (struct async *self, struct async_manager *manager) { memset (self, 0, sizeof *self); - hard_assert (!pthread_mutex_init (&self->lock, NULL)); - hard_assert (!pthread_cond_init (&self->finished_cond, NULL)); - - hard_assert (!pipe (self->finished_pipe)); - hard_assert (set_blocking (self->finished_pipe[0], false)); - set_cloexec (self->finished_pipe[0]); - set_cloexec (self->finished_pipe[1]); + self->manager = manager; } +/// Only allowed from the main thread once the job has been started but before +/// the results have been dispatched +static void +async_cancel (struct async *self) +{ + if (self->started) + soft_assert (!pthread_cancel (self->worker)); + self->cancelled = true; +} + +static void +async_cleanup (void *user_data) +{ + struct async *self = user_data; + + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_UNLINK (self->manager->running, self); + LIST_PREPEND (self->manager->finished, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond)); + hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0); +} + +static void * +async_routine (void *user_data) +{ + // Beware that we mustn't trigger any cancellation point before we set up + // the cleanup handler, otherwise we'd need to disable it first + struct async *self = user_data; + pthread_cleanup_push (async_cleanup, self); + + self->execute (self); + + pthread_cleanup_pop (true); + return NULL; +} + +static bool +async_run (struct async *self) +{ + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_PREPEND (self->manager->running, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + // Block all signals so that the new thread doesn't receive any (inherited) + sigset_t all_blocked, old_blocked; + hard_assert (!sigfillset (&all_blocked)); + hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked)); + + int error = pthread_create (&self->worker, NULL, async_routine, self); + + // Now that we've created the thread, resume signal processing as usual + hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL)); + + if (error) + { + hard_assert (error == EAGAIN); + + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_UNLINK (self->manager->running, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + // FIXME: we probably want to have some kind of a limit on the queue + LIST_PREPEND (self->manager->delayed, self); + return false; + } + return (self->started = true); +} + +// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + static bool async_manager_retry (struct async_manager *self, struct async *async) { @@ -1199,6 +1256,19 @@ async_manager_cancel_all (struct async_manager *self) async_manager_dispatch (self); } +static void +async_manager_init (struct async_manager *self) +{ + memset (self, 0, sizeof *self); + hard_assert (!pthread_mutex_init (&self->lock, NULL)); + hard_assert (!pthread_cond_init (&self->finished_cond, NULL)); + + hard_assert (!pipe (self->finished_pipe)); + hard_assert (set_blocking (self->finished_pipe[0], false)); + set_cloexec (self->finished_pipe[0]); + set_cloexec (self->finished_pipe[1]); +} + static void async_manager_free (struct async_manager *self) { @@ -1210,78 +1280,6 @@ async_manager_free (struct async_manager *self) xclose (self->finished_pipe[1]); } -// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -/// Only allowed from the main thread once the job has been started but before -/// the results have been dispatched -static void -async_cancel (struct async *self) -{ - if (self->started) - soft_assert (!pthread_cancel (self->worker)); - self->cancelled = true; -} - -static void -async_cleanup (void *user_data) -{ - struct async *self = user_data; - - hard_assert (!pthread_mutex_lock (&self->manager->lock)); - LIST_UNLINK (self->manager->running, self); - LIST_PREPEND (self->manager->finished, self); - hard_assert (!pthread_mutex_unlock (&self->manager->lock)); - - hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond)); - hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0); -} - -static void * -async_routine (void *user_data) -{ - // Beware that we mustn't trigger any cancellation point before we set up - // the cleanup handler, otherwise we'd need to disable it first - struct async *self = user_data; - pthread_cleanup_push (async_cleanup, self); - - self->execute (self); - - pthread_cleanup_pop (true); - return NULL; -} - -static bool -async_run (struct async *self) -{ - hard_assert (!pthread_mutex_lock (&self->manager->lock)); - LIST_PREPEND (self->manager->running, self); - hard_assert (!pthread_mutex_unlock (&self->manager->lock)); - - // Block all signals so that the new thread doesn't receive any (inherited) - sigset_t all_blocked, old_blocked; - hard_assert (!sigfillset (&all_blocked)); - hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked)); - - int error = pthread_create (&self->worker, NULL, async_routine, self); - - // Now that we've created the thread, resume signal processing as usual - hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL)); - - if (error) - { - hard_assert (error == EAGAIN); - - hard_assert (!pthread_mutex_lock (&self->manager->lock)); - LIST_UNLINK (self->manager->running, self); - hard_assert (!pthread_mutex_unlock (&self->manager->lock)); - - // FIXME: we probably want to have some kind of a limit on the queue - LIST_PREPEND (self->manager->delayed, self); - return false; - } - return (self->started = true); -} - #endif // LIBERTY_WANT_ASYNC // --- Event loop --------------------------------------------------------------