Add a framework for asynchronous jobs
This commit is contained in:
parent
80815519b3
commit
ee40af0031
191
liberty.c
191
liberty.c
@ -49,6 +49,7 @@
|
|||||||
#include <fnmatch.h>
|
#include <fnmatch.h>
|
||||||
#include <iconv.h>
|
#include <iconv.h>
|
||||||
#include <pwd.h>
|
#include <pwd.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
@ -1046,6 +1047,196 @@ str_map_unset_iter_free (struct str_map_unset_iter *self)
|
|||||||
str_map_shrink (map);
|
str_map_shrink (map);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Asynchronous jobs -------------------------------------------------------
|
||||||
|
|
||||||
|
// For operations that can block execution but can be run independently on the
|
||||||
|
// rest of the program, such as getaddrinfo(), read(), write(), fsync().
|
||||||
|
//
|
||||||
|
// The async structure is meant to be extended for the various usages with
|
||||||
|
// new fields and provide an appropriate callback for its destruction.
|
||||||
|
//
|
||||||
|
// This is designed so that it can be used in other event loops than poller.
|
||||||
|
|
||||||
|
#ifdef LIBERTY_WANT_ASYNC
|
||||||
|
|
||||||
|
struct async;
|
||||||
|
typedef void (*async_fn) (struct async *);
|
||||||
|
|
||||||
|
struct async
|
||||||
|
{
|
||||||
|
LIST_HEADER (struct async)
|
||||||
|
struct async_manager *manager; ///< Our manager object
|
||||||
|
|
||||||
|
// "cancelled" may not be accesed or modified by the worker thread
|
||||||
|
|
||||||
|
pthread_t worker; ///< Worker thread ID
|
||||||
|
bool cancelled; ///< Task has been cancelled
|
||||||
|
|
||||||
|
async_fn execute; ///< Worker main function
|
||||||
|
async_fn dispatcher; ///< Main thread result dispatcher
|
||||||
|
async_fn destroy; ///< Destroys the whole object
|
||||||
|
};
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_init (struct async *self, struct async_manager *manager)
|
||||||
|
{
|
||||||
|
memset (self, 0, sizeof *self);
|
||||||
|
self->manager = manager;
|
||||||
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
struct async_manager
|
||||||
|
{
|
||||||
|
pthread_mutex_t lock; ///< Lock for the queues
|
||||||
|
struct async *running; ///< Queue of running jobs
|
||||||
|
struct async *finished; ///< Queue of completed/cancelled jobs
|
||||||
|
|
||||||
|
// We need the pipe in order to abort polling (instead of using EINTR)
|
||||||
|
|
||||||
|
pthread_cond_t finished_cond; ///< Signals that a task has finished
|
||||||
|
int finished_pipe[2]; ///< Signals that a task has finished
|
||||||
|
};
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_manager_init (struct async_manager *self)
|
||||||
|
{
|
||||||
|
memset (self, 0, sizeof *self);
|
||||||
|
hard_assert (!pthread_mutex_init (&self->lock, NULL));
|
||||||
|
hard_assert (!pthread_cond_init (&self->finished_cond, NULL));
|
||||||
|
|
||||||
|
hard_assert (!pipe (self->finished_pipe));
|
||||||
|
hard_assert (set_blocking (self->finished_pipe[0], false));
|
||||||
|
set_cloexec (self->finished_pipe[0]);
|
||||||
|
set_cloexec (self->finished_pipe[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct async *
|
||||||
|
async_manager_dispatch_fetch (struct async_manager *self)
|
||||||
|
{
|
||||||
|
// We don't want to hold the mutex for too long, mainly to prevent
|
||||||
|
// a deadlock when trying to add an async job while dispatching another
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->lock));
|
||||||
|
struct async *result;
|
||||||
|
if ((result = self->finished))
|
||||||
|
LIST_UNLINK (self->finished, result);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->lock));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_manager_dispatch (struct async_manager *self)
|
||||||
|
{
|
||||||
|
char dummy;
|
||||||
|
while (read (self->finished_pipe[0], &dummy, 1) > 0)
|
||||||
|
; // Just emptying the signalling pipe
|
||||||
|
|
||||||
|
struct async *iter;
|
||||||
|
while ((iter = async_manager_dispatch_fetch (self)))
|
||||||
|
{
|
||||||
|
// The thread has finished execution already
|
||||||
|
soft_assert (!pthread_join (iter->worker, NULL));
|
||||||
|
|
||||||
|
if (iter->dispatcher && !iter->cancelled)
|
||||||
|
iter->dispatcher (iter);
|
||||||
|
if (iter->destroy)
|
||||||
|
iter->destroy (iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_manager_cancel_all (struct async_manager *self)
|
||||||
|
{
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->lock));
|
||||||
|
|
||||||
|
// Cancel all running jobs
|
||||||
|
LIST_FOR_EACH (struct async, iter, self->running)
|
||||||
|
soft_assert (!pthread_cancel (iter->worker));
|
||||||
|
|
||||||
|
// Wait until no jobs are running anymore (we need to release the lock
|
||||||
|
// here so that worker threads can move their jobs to the finished queue)
|
||||||
|
while (self->running)
|
||||||
|
hard_assert (!pthread_cond_wait (&self->finished_cond, &self->lock));
|
||||||
|
|
||||||
|
// Mark everything cancelled so that it's not actually dispatched
|
||||||
|
LIST_FOR_EACH (struct async, iter, self->finished)
|
||||||
|
iter->cancelled = true;
|
||||||
|
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->lock));
|
||||||
|
async_manager_dispatch (self);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_manager_free (struct async_manager *self)
|
||||||
|
{
|
||||||
|
async_manager_cancel_all (self);
|
||||||
|
hard_assert (!pthread_cond_destroy (&self->finished_cond));
|
||||||
|
hard_assert (!pthread_mutex_destroy (&self->lock));
|
||||||
|
|
||||||
|
xclose (self->finished_pipe[0]);
|
||||||
|
xclose (self->finished_pipe[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
/// Only allowed from the main thread once the job has been started but before
|
||||||
|
/// the results have been dispatched
|
||||||
|
static void
|
||||||
|
async_cancel (struct async *self)
|
||||||
|
{
|
||||||
|
soft_assert (!pthread_cancel (self->worker));
|
||||||
|
self->cancelled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_cleanup (void *user_data)
|
||||||
|
{
|
||||||
|
struct async *self = user_data;
|
||||||
|
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
|
LIST_UNLINK (self->manager->running, self);
|
||||||
|
LIST_PREPEND (self->manager->finished, self);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
||||||
|
|
||||||
|
hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond));
|
||||||
|
hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *
|
||||||
|
async_routine (void *user_data)
|
||||||
|
{
|
||||||
|
// Beware that we mustn't trigger any cancellation point before we set up
|
||||||
|
// the cleanup handler, otherwise we'd need to disable it first
|
||||||
|
struct async *self = user_data;
|
||||||
|
pthread_cleanup_push (async_cleanup, self);
|
||||||
|
|
||||||
|
self->execute (self);
|
||||||
|
|
||||||
|
pthread_cleanup_pop (true);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_run (struct async *self)
|
||||||
|
{
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
|
LIST_PREPEND (self->manager->running, self);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
||||||
|
|
||||||
|
// Block all signals so that the new thread doesn't receive any (inherited)
|
||||||
|
sigset_t all_blocked, old_blocked;
|
||||||
|
hard_assert (!sigfillset (&all_blocked));
|
||||||
|
hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
|
||||||
|
|
||||||
|
hard_assert (!pthread_create (&self->worker, NULL,
|
||||||
|
async_routine, self));
|
||||||
|
|
||||||
|
// Now that we've created the thread, resume signal processing as usual
|
||||||
|
hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
|
||||||
// --- Event loop --------------------------------------------------------------
|
// --- Event loop --------------------------------------------------------------
|
||||||
|
|
||||||
#ifdef LIBERTY_WANT_POLLER
|
#ifdef LIBERTY_WANT_POLLER
|
||||||
|
Loading…
Reference in New Issue
Block a user