Don't crash when new threads can't be created
This commit is contained in:
parent
a90aeaf0d9
commit
38d105dede
64
liberty.c
64
liberty.c
@ -1073,6 +1073,7 @@ struct async
|
|||||||
// "cancelled" may not be accesed or modified by the worker thread
|
// "cancelled" may not be accesed or modified by the worker thread
|
||||||
|
|
||||||
pthread_t worker; ///< Worker thread ID
|
pthread_t worker; ///< Worker thread ID
|
||||||
|
bool started; ///< Worker thread ID is valid
|
||||||
bool cancelled; ///< Task has been cancelled
|
bool cancelled; ///< Task has been cancelled
|
||||||
|
|
||||||
async_fn execute; ///< Worker main function
|
async_fn execute; ///< Worker main function
|
||||||
@ -1087,6 +1088,8 @@ async_init (struct async *self, struct async_manager *manager)
|
|||||||
self->manager = manager;
|
self->manager = manager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool async_run (struct async *self);
|
||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
struct async_manager
|
struct async_manager
|
||||||
@ -1095,6 +1098,12 @@ struct async_manager
|
|||||||
struct async *running; ///< Queue of running jobs
|
struct async *running; ///< Queue of running jobs
|
||||||
struct async *finished; ///< Queue of completed/cancelled jobs
|
struct async *finished; ///< Queue of completed/cancelled jobs
|
||||||
|
|
||||||
|
// It's upon the user to call async_manager_dispatch() to retry the delayed.
|
||||||
|
// It's somewhat questionable if this feature is of any use. Possibly if we
|
||||||
|
// provide a means of actively limiting the amount of running async jobs.
|
||||||
|
|
||||||
|
struct async *delayed; ///< Resource exhaustion queue
|
||||||
|
|
||||||
// We need the pipe in order to abort polling (instead of using EINTR)
|
// We need the pipe in order to abort polling (instead of using EINTR)
|
||||||
|
|
||||||
pthread_cond_t finished_cond; ///< Signals that a task has finished
|
pthread_cond_t finished_cond; ///< Signals that a task has finished
|
||||||
@ -1114,6 +1123,18 @@ async_manager_init (struct async_manager *self)
|
|||||||
set_cloexec (self->finished_pipe[1]);
|
set_cloexec (self->finished_pipe[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
async_manager_retry (struct async_manager *self, struct async *async)
|
||||||
|
{
|
||||||
|
if (async->cancelled)
|
||||||
|
{
|
||||||
|
if (async->destroy)
|
||||||
|
async->destroy (async);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return async_run (async);
|
||||||
|
}
|
||||||
|
|
||||||
static struct async *
|
static struct async *
|
||||||
async_manager_dispatch_fetch (struct async_manager *self)
|
async_manager_dispatch_fetch (struct async_manager *self)
|
||||||
{
|
{
|
||||||
@ -1145,6 +1166,13 @@ async_manager_dispatch (struct async_manager *self)
|
|||||||
if (iter->destroy)
|
if (iter->destroy)
|
||||||
iter->destroy (iter);
|
iter->destroy (iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIST_FOR_EACH (struct async, iter, self->delayed)
|
||||||
|
{
|
||||||
|
LIST_UNLINK (self->delayed, iter);
|
||||||
|
if (!async_manager_retry (self, iter))
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -1164,6 +1192,8 @@ async_manager_cancel_all (struct async_manager *self)
|
|||||||
// Mark everything cancelled so that it's not actually dispatched
|
// Mark everything cancelled so that it's not actually dispatched
|
||||||
LIST_FOR_EACH (struct async, iter, self->finished)
|
LIST_FOR_EACH (struct async, iter, self->finished)
|
||||||
iter->cancelled = true;
|
iter->cancelled = true;
|
||||||
|
LIST_FOR_EACH (struct async, iter, self->delayed)
|
||||||
|
iter->cancelled = true;
|
||||||
|
|
||||||
hard_assert (!pthread_mutex_unlock (&self->lock));
|
hard_assert (!pthread_mutex_unlock (&self->lock));
|
||||||
async_manager_dispatch (self);
|
async_manager_dispatch (self);
|
||||||
@ -1187,7 +1217,8 @@ async_manager_free (struct async_manager *self)
|
|||||||
static void
|
static void
|
||||||
async_cancel (struct async *self)
|
async_cancel (struct async *self)
|
||||||
{
|
{
|
||||||
soft_assert (!pthread_cancel (self->worker));
|
if (self->started)
|
||||||
|
soft_assert (!pthread_cancel (self->worker));
|
||||||
self->cancelled = true;
|
self->cancelled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1219,7 +1250,7 @@ async_routine (void *user_data)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static bool
|
||||||
async_run (struct async *self)
|
async_run (struct async *self)
|
||||||
{
|
{
|
||||||
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
@ -1231,11 +1262,24 @@ async_run (struct async *self)
|
|||||||
hard_assert (!sigfillset (&all_blocked));
|
hard_assert (!sigfillset (&all_blocked));
|
||||||
hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
|
hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked));
|
||||||
|
|
||||||
hard_assert (!pthread_create (&self->worker, NULL,
|
int error = pthread_create (&self->worker, NULL, async_routine, self);
|
||||||
async_routine, self));
|
|
||||||
|
|
||||||
// Now that we've created the thread, resume signal processing as usual
|
// Now that we've created the thread, resume signal processing as usual
|
||||||
hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
|
hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL));
|
||||||
|
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
hard_assert (error == EAGAIN);
|
||||||
|
|
||||||
|
hard_assert (!pthread_mutex_lock (&self->manager->lock));
|
||||||
|
LIST_UNLINK (self->manager->running, self);
|
||||||
|
hard_assert (!pthread_mutex_unlock (&self->manager->lock));
|
||||||
|
|
||||||
|
// FIXME: we probably want to have some kind of a limit on the queue
|
||||||
|
LIST_PREPEND (self->manager->delayed, self);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return (self->started = true);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // LIBERTY_WANT_ASYNC
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
@ -2138,7 +2182,17 @@ poller_common_free (struct poller_common *self)
|
|||||||
static int
|
static int
|
||||||
poller_common_get_timeout (struct poller_common *self)
|
poller_common_get_timeout (struct poller_common *self)
|
||||||
{
|
{
|
||||||
return self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers);
|
if (self->idle)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
int timeout = poller_timers_get_poll_timeout (&self->timers);
|
||||||
|
#ifdef LIBERTY_WANT_ASYNC
|
||||||
|
// This is completely arbitrary, in general we have no idea when to retry,
|
||||||
|
// however one second doesn't sound like a particularly bad number
|
||||||
|
if (self->async.delayed)
|
||||||
|
timeout = MIN (timeout, 1000);
|
||||||
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
return timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
Loading…
Reference in New Issue
Block a user