Shuffle code
This commit is contained in:
parent
38d105dede
commit
ff046ea596
180
liberty.c
180
liberty.c
@ -1081,17 +1081,6 @@ struct async
|
|||||||
async_fn destroy; ///< Destroys the whole object
|
async_fn destroy; ///< Destroys the whole object
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
|
||||||
async_init (struct async *self, struct async_manager *manager)
|
|
||||||
{
|
|
||||||
memset (self, 0, sizeof *self);
|
|
||||||
self->manager = manager;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool async_run (struct async *self);
|
|
||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
||||||
|
|
||||||
struct async_manager
|
struct async_manager
|
||||||
{
|
{
|
||||||
pthread_mutex_t lock; ///< Lock for the queues
|
pthread_mutex_t lock; ///< Lock for the queues
|
||||||
@ -1110,19 +1099,87 @@ struct async_manager
|
|||||||
int finished_pipe[2]; ///< Signals that a task has finished
|
int finished_pipe[2]; ///< Signals that a task has finished
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
static void
|
static void
|
||||||
async_manager_init (struct async_manager *self)
|
async_init (struct async *self, struct async_manager *manager)
|
||||||
{
|
{
|
||||||
memset (self, 0, sizeof *self);
|
memset (self, 0, sizeof *self);
|
||||||
hard_assert (!pthread_mutex_init (&self->lock, NULL));
|
self->manager = manager;
|
||||||
hard_assert (!pthread_cond_init (&self->finished_cond, NULL));
|
|
||||||
|
|
||||||
hard_assert (!pipe (self->finished_pipe));
|
|
||||||
hard_assert (set_blocking (self->finished_pipe[0], false));
|
|
||||||
set_cloexec (self->finished_pipe[0]);
|
|
||||||
set_cloexec (self->finished_pipe[1]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Only allowed from the main thread once the job has been started but before
|
||||||
|
/// the results have been dispatched
|
||||||
|
static void
|
||||||
|
async_cancel (struct async *self)
|
||||||
|
{
|
||||||
|
if (self->started)
|
||||||
|
soft_assert (!pthread_cancel (self->worker));
|
||||||
|
self->cancelled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_cleanup (void *user_data)
|
||||||
|
{
|
||||||
|
struct async *self = user_data;
|
||||||
|
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
|
LIST_UNLINK (self->manager->running, self);
|
||||||
|
LIST_PREPEND (self->manager->finished, self);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
||||||
|
|
||||||
|
hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond));
|
||||||
|
hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *
|
||||||
|
async_routine (void *user_data)
|
||||||
|
{
|
||||||
|
// Beware that we mustn't trigger any cancellation point before we set up
|
||||||
|
// the cleanup handler, otherwise we'd need to disable it first
|
||||||
|
struct async *self = user_data;
|
||||||
|
pthread_cleanup_push (async_cleanup, self);
|
||||||
|
|
||||||
|
self->execute (self);
|
||||||
|
|
||||||
|
pthread_cleanup_pop (true);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
async_run (struct async *self)
|
||||||
|
{
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
|
LIST_PREPEND (self->manager->running, self);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
||||||
|
|
||||||
|
// Block all signals so that the new thread doesn't receive any (inherited)
|
||||||
|
sigset_t all_blocked, old_blocked;
|
||||||
|
hard_assert (!sigfillset (&all_blocked));
|
||||||
|
hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
|
||||||
|
|
||||||
|
int error = pthread_create (&self->worker, NULL, async_routine, self);
|
||||||
|
|
||||||
|
// Now that we've created the thread, resume signal processing as usual
|
||||||
|
hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
|
||||||
|
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
hard_assert (error == EAGAIN);
|
||||||
|
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
|
LIST_UNLINK (self->manager->running, self);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
||||||
|
|
||||||
|
// FIXME: we probably want to have some kind of a limit on the queue
|
||||||
|
LIST_PREPEND (self->manager->delayed, self);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return (self->started = true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
async_manager_retry (struct async_manager *self, struct async *async)
|
async_manager_retry (struct async_manager *self, struct async *async)
|
||||||
{
|
{
|
||||||
@ -1199,6 +1256,19 @@ async_manager_cancel_all (struct async_manager *self)
|
|||||||
async_manager_dispatch (self);
|
async_manager_dispatch (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
async_manager_init (struct async_manager *self)
|
||||||
|
{
|
||||||
|
memset (self, 0, sizeof *self);
|
||||||
|
hard_assert (!pthread_mutex_init (&self->lock, NULL));
|
||||||
|
hard_assert (!pthread_cond_init (&self->finished_cond, NULL));
|
||||||
|
|
||||||
|
hard_assert (!pipe (self->finished_pipe));
|
||||||
|
hard_assert (set_blocking (self->finished_pipe[0], false));
|
||||||
|
set_cloexec (self->finished_pipe[0]);
|
||||||
|
set_cloexec (self->finished_pipe[1]);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
async_manager_free (struct async_manager *self)
|
async_manager_free (struct async_manager *self)
|
||||||
{
|
{
|
||||||
@ -1210,78 +1280,6 @@ async_manager_free (struct async_manager *self)
|
|||||||
xclose (self->finished_pipe[1]);
|
xclose (self->finished_pipe[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
||||||
|
|
||||||
/// Only allowed from the main thread once the job has been started but before
|
|
||||||
/// the results have been dispatched
|
|
||||||
static void
|
|
||||||
async_cancel (struct async *self)
|
|
||||||
{
|
|
||||||
if (self->started)
|
|
||||||
soft_assert (!pthread_cancel (self->worker));
|
|
||||||
self->cancelled = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
async_cleanup (void *user_data)
|
|
||||||
{
|
|
||||||
struct async *self = user_data;
|
|
||||||
|
|
||||||
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
|
||||||
LIST_UNLINK (self->manager->running, self);
|
|
||||||
LIST_PREPEND (self->manager->finished, self);
|
|
||||||
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
|
||||||
|
|
||||||
hard_assert (!pthread_cond_broadcast (&self->manager->finished_cond));
|
|
||||||
hard_assert (write (self->manager->finished_pipe[1], "", 1) > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *
|
|
||||||
async_routine (void *user_data)
|
|
||||||
{
|
|
||||||
// Beware that we mustn't trigger any cancellation point before we set up
|
|
||||||
// the cleanup handler, otherwise we'd need to disable it first
|
|
||||||
struct async *self = user_data;
|
|
||||||
pthread_cleanup_push (async_cleanup, self);
|
|
||||||
|
|
||||||
self->execute (self);
|
|
||||||
|
|
||||||
pthread_cleanup_pop (true);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
async_run (struct async *self)
|
|
||||||
{
|
|
||||||
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
|
||||||
LIST_PREPEND (self->manager->running, self);
|
|
||||||
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
|
||||||
|
|
||||||
// Block all signals so that the new thread doesn't receive any (inherited)
|
|
||||||
sigset_t all_blocked, old_blocked;
|
|
||||||
hard_assert (!sigfillset (&all_blocked));
|
|
||||||
hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
|
|
||||||
|
|
||||||
int error = pthread_create (&self->worker, NULL, async_routine, self);
|
|
||||||
|
|
||||||
// Now that we've created the thread, resume signal processing as usual
|
|
||||||
hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
|
|
||||||
|
|
||||||
if (error)
|
|
||||||
{
|
|
||||||
hard_assert (error == EAGAIN);
|
|
||||||
|
|
||||||
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
|
||||||
LIST_UNLINK (self->manager->running, self);
|
|
||||||
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
|
||||||
|
|
||||||
// FIXME: we probably want to have some kind of a limit on the queue
|
|
||||||
LIST_PREPEND (self->manager->delayed, self);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return (self->started = true);
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif // LIBERTY_WANT_ASYNC
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
|
||||||
// --- Event loop --------------------------------------------------------------
|
// --- Event loop --------------------------------------------------------------
|
||||||
|
Loading…
Reference in New Issue
Block a user