Add an async job manager to the poller
This commit is contained in:
parent
ee40af0031
commit
455f2cec82
120
liberty.c
120
liberty.c
@ -1245,7 +1245,6 @@ async_run (struct async *self)
|
|||||||
// to instead use those tested and proven libraries but we don't need much
|
// to instead use those tested and proven libraries but we don't need much
|
||||||
// and it's interesting to implement.
|
// and it's interesting to implement.
|
||||||
|
|
||||||
// Actually it mustn't be totally shitty as scanning exercises it quite a bit.
|
|
||||||
// We sacrifice some memory to allow for O(1) and O(log n) operations.
|
// We sacrifice some memory to allow for O(1) and O(log n) operations.
|
||||||
|
|
||||||
typedef void (*poller_fd_fn) (const struct pollfd *, void *);
|
typedef void (*poller_fd_fn) (const struct pollfd *, void *);
|
||||||
@ -1448,6 +1447,23 @@ poller_idle_dispatch (struct poller_idle *list)
|
|||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
struct poller_common
|
||||||
|
{
|
||||||
|
struct poller_timers timers; ///< Timeouts
|
||||||
|
struct poller_idle *idle; ///< Idle events
|
||||||
|
#ifdef LIBERTY_WANT_ASYNC
|
||||||
|
struct async_manager async; ///< Asynchronous jobs
|
||||||
|
struct poller_fd async_event; ///< Asynchronous jobs have finished
|
||||||
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
};
|
||||||
|
|
||||||
|
static void poller_common_init (struct poller_common *, struct poller *);
|
||||||
|
static void poller_common_free (struct poller_common *);
|
||||||
|
static int poller_common_get_timeout (struct poller_common *);
|
||||||
|
static void poller_common_dispatch (struct poller_common *);
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
|
||||||
@ -1459,10 +1475,7 @@ struct poller
|
|||||||
struct epoll_event *revents; ///< Output array for epoll_wait()
|
struct epoll_event *revents; ///< Output array for epoll_wait()
|
||||||
size_t len; ///< Number of polled descriptors
|
size_t len; ///< Number of polled descriptors
|
||||||
size_t alloc; ///< Number of entries allocated
|
size_t alloc; ///< Number of entries allocated
|
||||||
|
struct poller_common common; ///< Stuff common to all backends
|
||||||
struct poller_timers timers; ///< Timeouts
|
|
||||||
struct poller_idle *idle; ///< Idle events
|
|
||||||
|
|
||||||
int revents_len; ///< Number of entries in `revents'
|
int revents_len; ///< Number of entries in `revents'
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1480,8 +1493,7 @@ poller_init (struct poller *self)
|
|||||||
self->revents = xcalloc (self->alloc, sizeof *self->revents);
|
self->revents = xcalloc (self->alloc, sizeof *self->revents);
|
||||||
self->revents_len = 0;
|
self->revents_len = 0;
|
||||||
|
|
||||||
poller_timers_init (&self->timers);
|
poller_common_init (&self->common, self);
|
||||||
self->idle = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -1494,7 +1506,7 @@ poller_free (struct poller *self)
|
|||||||
EPOLL_CTL_DEL, fd->fd, (void *) "") != -1);
|
EPOLL_CTL_DEL, fd->fd, (void *) "") != -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
poller_timers_free (&self->timers);
|
poller_common_free (&self->common);
|
||||||
|
|
||||||
xclose (self->epoll_fd);
|
xclose (self->epoll_fd);
|
||||||
free (self->fds);
|
free (self->fds);
|
||||||
@ -1618,7 +1630,7 @@ poller_run (struct poller *self)
|
|||||||
int n_fds;
|
int n_fds;
|
||||||
do
|
do
|
||||||
n_fds = epoll_wait (self->epoll_fd, self->revents, self->alloc,
|
n_fds = epoll_wait (self->epoll_fd, self->revents, self->alloc,
|
||||||
self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers));
|
poller_common_get_timeout (&self->common));
|
||||||
while (n_fds == -1 && errno == EINTR);
|
while (n_fds == -1 && errno == EINTR);
|
||||||
|
|
||||||
if (n_fds == -1)
|
if (n_fds == -1)
|
||||||
@ -1628,8 +1640,7 @@ poller_run (struct poller *self)
|
|||||||
qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds);
|
qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds);
|
||||||
self->revents_len = n_fds;
|
self->revents_len = n_fds;
|
||||||
|
|
||||||
poller_timers_dispatch (&self->timers);
|
poller_common_dispatch (&self->common);
|
||||||
poller_idle_dispatch (self->idle);
|
|
||||||
|
|
||||||
for (int i = 0; i < n_fds; i++)
|
for (int i = 0; i < n_fds; i++)
|
||||||
{
|
{
|
||||||
@ -1667,10 +1678,7 @@ struct poller
|
|||||||
struct kevent *revents; ///< Output array for kevent()
|
struct kevent *revents; ///< Output array for kevent()
|
||||||
size_t len; ///< Number of polled descriptors
|
size_t len; ///< Number of polled descriptors
|
||||||
size_t alloc; ///< Number of entries allocated
|
size_t alloc; ///< Number of entries allocated
|
||||||
|
struct poller_common common; ///< Stuff common to all backends
|
||||||
struct poller_timers timers; ///< Timeouts
|
|
||||||
struct poller_idle *idle; ///< Idle events
|
|
||||||
|
|
||||||
int revents_len; ///< Number of entries in `revents'
|
int revents_len; ///< Number of entries in `revents'
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1686,19 +1694,16 @@ poller_init (struct poller *self)
|
|||||||
self->fds = xcalloc (self->alloc, sizeof *self->fds);
|
self->fds = xcalloc (self->alloc, sizeof *self->fds);
|
||||||
self->revents = xcalloc (self->alloc, sizeof *self->revents);
|
self->revents = xcalloc (self->alloc, sizeof *self->revents);
|
||||||
self->revents_len = 0;
|
self->revents_len = 0;
|
||||||
|
poller_common_init (&self->common, self);
|
||||||
poller_timers_init (&self->timers);
|
|
||||||
self->idle = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
poller_free (struct poller *self)
|
poller_free (struct poller *self)
|
||||||
{
|
{
|
||||||
poller_timers_free (&self->timers);
|
|
||||||
|
|
||||||
xclose (self->kqueue_fd);
|
xclose (self->kqueue_fd);
|
||||||
free (self->fds);
|
free (self->fds);
|
||||||
free (self->revents);
|
free (self->revents);
|
||||||
|
poller_common_free (&self->common);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -1841,7 +1846,7 @@ poller_run (struct poller *self)
|
|||||||
do
|
do
|
||||||
{
|
{
|
||||||
struct timespec ts = poller_timeout_to_timespec
|
struct timespec ts = poller_timeout_to_timespec
|
||||||
(self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers));
|
(poller_common_get_timeout (&self->common));
|
||||||
n_fds = kevent (self->kqueue_fd,
|
n_fds = kevent (self->kqueue_fd,
|
||||||
NULL, 0, self->revents, self->len, &ts);
|
NULL, 0, self->revents, self->len, &ts);
|
||||||
}
|
}
|
||||||
@ -1854,8 +1859,7 @@ poller_run (struct poller *self)
|
|||||||
qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds);
|
qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds);
|
||||||
self->revents_len = n_fds;
|
self->revents_len = n_fds;
|
||||||
|
|
||||||
poller_timers_dispatch (&self->timers);
|
poller_common_dispatch (&self->common);
|
||||||
poller_idle_dispatch (self->idle);
|
|
||||||
|
|
||||||
for (int i = 0; i < n_fds; i++)
|
for (int i = 0; i < n_fds; i++)
|
||||||
{
|
{
|
||||||
@ -1894,9 +1898,7 @@ struct poller
|
|||||||
struct poller_fd **fds_data; ///< Additional information for each FD
|
struct poller_fd **fds_data; ///< Additional information for each FD
|
||||||
size_t len; ///< Number of polled descriptors
|
size_t len; ///< Number of polled descriptors
|
||||||
size_t alloc; ///< Number of entries allocated
|
size_t alloc; ///< Number of entries allocated
|
||||||
|
struct poller_common common; ///< Stuff common to all backends
|
||||||
struct poller_timers timers; ///< Timers
|
|
||||||
struct poller_idle *idle; ///< Idle events
|
|
||||||
int dispatch_next; ///< The next dispatched FD or -1
|
int dispatch_next; ///< The next dispatched FD or -1
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1907,7 +1909,7 @@ poller_init (struct poller *self)
|
|||||||
self->len = 0;
|
self->len = 0;
|
||||||
self->fds = xcalloc (self->alloc, sizeof *self->fds);
|
self->fds = xcalloc (self->alloc, sizeof *self->fds);
|
||||||
self->fds_data = xcalloc (self->alloc, sizeof *self->fds_data);
|
self->fds_data = xcalloc (self->alloc, sizeof *self->fds_data);
|
||||||
poller_timers_init (&self->timers);
|
poller_common_init (&self->common, self);
|
||||||
self->dispatch_next = -1;
|
self->dispatch_next = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1916,7 +1918,7 @@ poller_free (struct poller *self)
|
|||||||
{
|
{
|
||||||
free (self->fds);
|
free (self->fds);
|
||||||
free (self->fds_data);
|
free (self->fds_data);
|
||||||
poller_timers_free (&self->timers);
|
poller_common_free (&self->common);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -1986,14 +1988,13 @@ poller_run (struct poller *self)
|
|||||||
int result;
|
int result;
|
||||||
do
|
do
|
||||||
result = poll (self->fds, self->len,
|
result = poll (self->fds, self->len,
|
||||||
self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers));
|
poller_common_get_timeout (&self->common));
|
||||||
while (result == -1 && errno == EINTR);
|
while (result == -1 && errno == EINTR);
|
||||||
|
|
||||||
if (result == -1)
|
if (result == -1)
|
||||||
exit_fatal ("%s: %s", "poll", strerror (errno));
|
exit_fatal ("%s: %s", "poll", strerror (errno));
|
||||||
|
|
||||||
poller_timers_dispatch (&self->timers);
|
poller_common_dispatch (&self->common);
|
||||||
poller_idle_dispatch (self->idle);
|
|
||||||
|
|
||||||
for (int i = 0; i < (int) self->len; )
|
for (int i = 0; i < (int) self->len; )
|
||||||
{
|
{
|
||||||
@ -2016,7 +2017,7 @@ static void
|
|||||||
poller_timer_init (struct poller_timer *self, struct poller *poller)
|
poller_timer_init (struct poller_timer *self, struct poller *poller)
|
||||||
{
|
{
|
||||||
memset (self, 0, sizeof *self);
|
memset (self, 0, sizeof *self);
|
||||||
self->timers = &poller->timers;
|
self->timers = &poller->common.timers;
|
||||||
self->index = -1;
|
self->index = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2055,7 +2056,7 @@ poller_idle_set (struct poller_idle *self)
|
|||||||
if (self->active)
|
if (self->active)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
LIST_PREPEND (self->poller->idle, self);
|
LIST_PREPEND (self->poller->common.idle, self);
|
||||||
self->active = true;
|
self->active = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2065,7 +2066,7 @@ poller_idle_reset (struct poller_idle *self)
|
|||||||
if (!self->active)
|
if (!self->active)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
LIST_UNLINK (self->poller->idle, self);
|
LIST_UNLINK (self->poller->common.idle, self);
|
||||||
self->prev = NULL;
|
self->prev = NULL;
|
||||||
self->next = NULL;
|
self->next = NULL;
|
||||||
self->active = false;
|
self->active = false;
|
||||||
@ -2096,6 +2097,57 @@ poller_fd_reset (struct poller_fd *self)
|
|||||||
poller_remove_at_index (self->poller, self->index);
|
poller_remove_at_index (self->poller, self->index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
static void
|
||||||
|
poller_common_dummy_dispatcher (const struct pollfd *pfd, void *user_data)
|
||||||
|
{
|
||||||
|
(void) pfd;
|
||||||
|
(void) user_data;
|
||||||
|
|
||||||
|
// The async manager will empty the pipe when we invoke dispatch
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
poller_common_init (struct poller_common *self, struct poller *poller)
|
||||||
|
{
|
||||||
|
poller_timers_init (&self->timers);
|
||||||
|
self->idle = NULL;
|
||||||
|
#ifdef LIBERTY_WANT_ASYNC
|
||||||
|
async_manager_init (&self->async);
|
||||||
|
|
||||||
|
poller_fd_init (&self->async_event, poller, self->async.finished_pipe[0]);
|
||||||
|
poller_fd_set (&self->async_event, POLLIN);
|
||||||
|
self->async_event.dispatcher = poller_common_dummy_dispatcher;
|
||||||
|
self->async_event.user_data = self;
|
||||||
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
poller_common_free (struct poller_common *self)
|
||||||
|
{
|
||||||
|
poller_timers_free (&self->timers);
|
||||||
|
#ifdef LIBERTY_WANT_ASYNC
|
||||||
|
async_manager_free (&self->async);
|
||||||
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
poller_common_get_timeout (struct poller_common *self)
|
||||||
|
{
|
||||||
|
return self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
poller_common_dispatch (struct poller_common *self)
|
||||||
|
{
|
||||||
|
poller_timers_dispatch (&self->timers);
|
||||||
|
poller_idle_dispatch (self->idle);
|
||||||
|
#ifdef LIBERTY_WANT_ASYNC
|
||||||
|
async_manager_dispatch (&self->async);
|
||||||
|
#endif // LIBERTY_WANT_ASYNC
|
||||||
|
}
|
||||||
|
|
||||||
#endif // LIBERTY_WANT_POLLER
|
#endif // LIBERTY_WANT_POLLER
|
||||||
|
|
||||||
// --- libuv-style write adaptor -----------------------------------------------
|
// --- libuv-style write adaptor -----------------------------------------------
|
||||||
|
Loading…
Reference in New Issue
Block a user