diff --git a/liberty.c b/liberty.c index e004464..fd9ccc7 100644 --- a/liberty.c +++ b/liberty.c @@ -1073,6 +1073,7 @@ struct async // "cancelled" may not be accesed or modified by the worker thread pthread_t worker; ///< Worker thread ID + bool started; ///< Worker thread ID is valid bool cancelled; ///< Task has been cancelled async_fn execute; ///< Worker main function @@ -1087,6 +1088,8 @@ async_init (struct async *self, struct async_manager *manager) self->manager = manager; } +static bool async_run (struct async *self); + // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - struct async_manager @@ -1095,6 +1098,12 @@ struct async_manager struct async *running; ///< Queue of running jobs struct async *finished; ///< Queue of completed/cancelled jobs + // It's upon the user to call async_manager_dispatch() to retry the delayed. + // It's somewhat questionable if this feature is of any use. Possibly if we + // provide a means of actively limiting the amount of running async jobs. + + struct async *delayed; ///< Resource exhaustion queue + // We need the pipe in order to abort polling (instead of using EINTR) pthread_cond_t finished_cond; ///< Signals that a task has finished @@ -1114,6 +1123,18 @@ async_manager_init (struct async_manager *self) set_cloexec (self->finished_pipe[1]); } +static bool +async_manager_retry (struct async_manager *self, struct async *async) +{ + if (async->cancelled) + { + if (async->destroy) + async->destroy (async); + return true; + } + return async_run (async); +} + static struct async * async_manager_dispatch_fetch (struct async_manager *self) { @@ -1145,6 +1166,13 @@ async_manager_dispatch (struct async_manager *self) if (iter->destroy) iter->destroy (iter); } + + LIST_FOR_EACH (struct async, iter, self->delayed) + { + LIST_UNLINK (self->delayed, iter); + if (!async_manager_retry (self, iter)) + break; + } } static void @@ -1164,6 +1192,8 @@ async_manager_cancel_all (struct async_manager *self) // Mark everything cancelled so that it's not actually dispatched LIST_FOR_EACH (struct async, iter, self->finished) iter->cancelled = true; + LIST_FOR_EACH (struct async, iter, self->delayed) + iter->cancelled = true; hard_assert (!pthread_mutex_unlock (&self->lock)); async_manager_dispatch (self); @@ -1187,7 +1217,8 @@ async_manager_free (struct async_manager *self) static void async_cancel (struct async *self) { - soft_assert (!pthread_cancel (self->worker)); + if (self->started) + soft_assert (!pthread_cancel (self->worker)); self->cancelled = true; } @@ -1219,7 +1250,7 @@ async_routine (void *user_data) return NULL; } -static void +static bool async_run (struct async *self) { hard_assert (!pthread_mutex_lock (&self->manager->lock)); @@ -1231,11 +1262,24 @@ async_run (struct async *self) hard_assert (!sigfillset (&all_blocked)); hard_assert (!pthread_sigmask (SIG_SETMASK, &all_blocked, &old_blocked)); - hard_assert (!pthread_create (&self->worker, NULL, - async_routine, self)); + int error = pthread_create (&self->worker, NULL, async_routine, self); // Now that we've created the thread, resume signal processing as usual hard_assert (!pthread_sigmask (SIG_SETMASK, &old_blocked, NULL)); + + if (error) + { + hard_assert (error == EAGAIN); + + hard_assert (!pthread_mutex_lock (&self->manager->lock)); + LIST_UNLINK (self->manager->running, self); + hard_assert (!pthread_mutex_unlock (&self->manager->lock)); + + // FIXME: we probably want to have some kind of a limit on the queue + LIST_PREPEND (self->manager->delayed, self); + return false; + } + return (self->started = true); } #endif // LIBERTY_WANT_ASYNC @@ -2138,7 +2182,17 @@ poller_common_free (struct poller_common *self) static int poller_common_get_timeout (struct poller_common *self) { - return self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers); + if (self->idle) + return 0; + + int timeout = poller_timers_get_poll_timeout (&self->timers); +#ifdef LIBERTY_WANT_ASYNC + // This is completely arbitrary, in general we have no idea when to retry, + // however one second doesn't sound like a particularly bad number + if (self->async.delayed) + timeout = MIN (timeout, 1000); +#endif // LIBERTY_WANT_ASYNC + return timeout; } static void