Import optimized event loop from ponymap

This commit is contained in:
Přemysl Eric Janouch 2015-02-12 01:53:03 +01:00
parent cd1a55a0d1
commit e8fe0dad81
3 changed files with 392 additions and 246 deletions

450
common.c
View File

@ -813,42 +813,61 @@ xclose (int fd)
break; break;
} }
// --- Polling ----------------------------------------------------------------- // --- Event loop --------------------------------------------------------------
// Basically the poor man's GMainLoop/libev/libuv. It might make some sense // Basically the poor man's GMainLoop/libev/libuv. It might make some sense
// to instead use those tested and proven libraries but we don't need much // to instead use those tested and proven libraries but we don't need much
// and it's interesting to implement. // and it's interesting to implement.
// At the moment the FD's are stored in an unsorted array. This is not ideal // Actually it mustn't be totally shitty as scanning exercises it quite a bit.
// complexity-wise but I don't think I have much of a choice with poll(), // We sacrifice some memory to allow for O(1) and O(log n) operations.
// and neither with epoll for that matter.
//
// unsorted array sorted array
// search O(n) O(log n) [O(log log n)]
// insert by fd O(n) O(n)
// delete by fd O(n) O(n)
//
// Insertion in the unsorted array can be reduced to O(1) if I maintain a
// bitmap of present FD's but that's still not a huge win.
//
// I don't expect this to be much of an issue, as there are typically not going
// to be that many FD's to watch, and the linear approach is cache-friendly.
typedef void (*poller_dispatcher_fn) (const struct pollfd *, void *); typedef void (*poller_fd_fn) (const struct pollfd *, void *);
typedef void (*poller_timer_fn) (void *); typedef void (*poller_timer_fn) (void *);
typedef void (*poller_idle_fn) (void *);
#define POLLER_MIN_ALLOC 16 #define POLLER_MIN_ALLOC 16
struct poller_timer_info struct poller_timer
{ {
struct poller_timers *timers; ///< The timers part of our poller
ssize_t index; ///< Where we are in the array, or -1
int64_t when; ///< When is the timer to expire int64_t when; ///< When is the timer to expire
poller_timer_fn dispatcher; ///< Event dispatcher poller_timer_fn dispatcher; ///< Event dispatcher
void *user_data; ///< User data void *user_data; ///< User data
}; };
struct poller_fd
{
struct poller *poller; ///< Our poller
ssize_t index; ///< Where we are in the array, or -1
int fd; ///< Our file descriptor
short events; ///< The poll() events we registered for
bool closed; ///< Whether fd has been closed already
poller_fd_fn dispatcher; ///< Event dispatcher
void *user_data; ///< User data
};
struct poller_idle
{
LIST_HEADER (poller_idle)
struct poller *poller; ///< Our poller
bool active; ///< Whether we're on the list
poller_idle_fn dispatcher; ///< Event dispatcher
void *user_data; ///< User data
};
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct poller_timers struct poller_timers
{ {
struct poller_timer_info *info; ///< Min-heap of timers struct poller_timer **heap; ///< Min-heap of timers
size_t len; ///< Number of scheduled timers size_t len; ///< Number of scheduled timers
size_t alloc; ///< Number of timers allocated size_t alloc; ///< Number of timers allocated
}; };
@ -858,13 +877,13 @@ poller_timers_init (struct poller_timers *self)
{ {
self->alloc = POLLER_MIN_ALLOC; self->alloc = POLLER_MIN_ALLOC;
self->len = 0; self->len = 0;
self->info = xmalloc (self->alloc * sizeof *self->info); self->heap = xmalloc (self->alloc * sizeof *self->heap);
} }
static void static void
poller_timers_free (struct poller_timers *self) poller_timers_free (struct poller_timers *self)
{ {
free (self->info); free (self->heap);
} }
static int64_t static int64_t
@ -884,28 +903,30 @@ poller_timers_get_current_time (void)
static void static void
poller_timers_heapify_down (struct poller_timers *self, size_t index) poller_timers_heapify_down (struct poller_timers *self, size_t index)
{ {
typedef struct poller_timer_info info_t; typedef struct poller_timer *timer_t;
info_t *end = self->info + self->len; timer_t *end = self->heap + self->len;
while (true) while (true)
{ {
info_t *parent = self->info + index; timer_t *parent = self->heap + index;
info_t *left = self->info + 2 * index + 1; timer_t *left = self->heap + 2 * index + 1;
info_t *right = self->info + 2 * index + 2; timer_t *right = self->heap + 2 * index + 2;
info_t *lowest = parent; timer_t *lowest = parent;
if (left < end && left->when < lowest->when) if (left < end && (*left) ->when < (*lowest)->when)
lowest = left; lowest = left;
if (right < end && right->when < lowest->when) if (right < end && (*right)->when < (*lowest)->when)
lowest = right; lowest = right;
if (parent == lowest) if (parent == lowest)
break; break;
info_t tmp = *parent; timer_t tmp = *parent;
*parent = *lowest; *parent = *lowest;
*lowest = tmp; *lowest = tmp;
index = lowest - self->info; (*parent)->index = parent - self->heap;
(*lowest)->index = lowest - self->heap;
index = lowest - self->heap;
} }
} }
@ -913,10 +934,12 @@ static void
poller_timers_remove_at_index (struct poller_timers *self, size_t index) poller_timers_remove_at_index (struct poller_timers *self, size_t index)
{ {
hard_assert (index < self->len); hard_assert (index < self->len);
self->heap[index]->index = -1;
if (index == --self->len) if (index == --self->len)
return; return;
self->info[index] = self->info[self->len]; self->heap[index] = self->heap[self->len];
self->heap[index]->index = index;
poller_timers_heapify_down (self, index); poller_timers_heapify_down (self, index);
} }
@ -924,11 +947,11 @@ static void
poller_timers_dispatch (struct poller_timers *self) poller_timers_dispatch (struct poller_timers *self)
{ {
int64_t now = poller_timers_get_current_time (); int64_t now = poller_timers_get_current_time ();
while (self->len && self->info->when <= now) while (self->len && self->heap[0]->when <= now)
{ {
struct poller_timer_info info = *self->info; struct poller_timer *timer = self->heap[0];
poller_timers_remove_at_index (self, 0); poller_timers_remove_at_index (self, 0);
info.dispatcher (info.user_data); timer->dispatcher (timer->user_data);
} }
} }
@ -938,49 +961,35 @@ poller_timers_heapify_up (struct poller_timers *self, size_t index)
while (index != 0) while (index != 0)
{ {
size_t parent = (index - 1) / 2; size_t parent = (index - 1) / 2;
if (self->info[parent].when <= self->info[index].when) if (self->heap[parent]->when <= self->heap[index]->when)
break; break;
struct poller_timer_info tmp = self->info[parent]; struct poller_timer *tmp = self->heap[parent];
self->info[parent] = self->info[index]; self->heap[parent] = self->heap[index];
self->info[index] = tmp; self->heap[index] = tmp;
self->heap[parent]->index = parent;
self->heap[index] ->index = index;
index = parent; index = parent;
} }
} }
static ssize_t
poller_timers_find (struct poller_timers *self,
poller_timer_fn dispatcher, void *data)
{
// NOTE: there may be duplicates.
for (size_t i = 0; i < self->len; i++)
if (self->info[i].dispatcher == dispatcher
&& self->info[i].user_data == data)
return i;
return -1;
}
static ssize_t
poller_timers_find_by_data (struct poller_timers *self, void *data)
{
for (size_t i = 0; i < self->len; i++)
if (self->info[i].user_data == data)
return i;
return -1;
}
static void static void
poller_timers_add (struct poller_timers *self, poller_timers_set (struct poller_timers *self, struct poller_timer *timer)
poller_timer_fn dispatcher, void *data, int timeout_ms)
{ {
if (self->len == self->alloc) hard_assert (timer->timers == self);
self->info = xreallocarray (self->info, if (timer->index != -1)
self->alloc <<= 1, sizeof *self->info); {
poller_timers_heapify_down (self, timer->index);
poller_timers_heapify_up (self, timer->index);
return;
}
self->info[self->len] = (struct poller_timer_info) { if (self->len == self->alloc)
.when = poller_timers_get_current_time () + timeout_ms, self->heap = xreallocarray (self->heap,
.dispatcher = dispatcher, .user_data = data }; self->alloc <<= 1, sizeof *self->heap);
self->heap[self->len] = timer;
timer->index = self->len;
poller_timers_heapify_up (self, self->len++); poller_timers_heapify_up (self, self->len++);
} }
@ -990,7 +999,7 @@ poller_timers_get_poll_timeout (struct poller_timers *self)
if (!self->len) if (!self->len)
return -1; return -1;
int64_t timeout = self->info->when - poller_timers_get_current_time (); int64_t timeout = self->heap[0]->when - poller_timers_get_current_time ();
if (timeout <= 0) if (timeout <= 0)
return 0; return 0;
if (timeout > INT_MAX) if (timeout > INT_MAX)
@ -998,35 +1007,37 @@ poller_timers_get_poll_timeout (struct poller_timers *self)
return timeout; return timeout;
} }
#ifdef __linux__ // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// I don't really need this, I've basically implemented this just because I can. static void
poller_idle_dispatch (struct poller_idle *list)
#include <sys/epoll.h>
struct poller_info
{ {
int fd; ///< Our file descriptor struct poller_idle *iter, *next;
short events; ///< The poll() events we registered for for (iter = list; iter; iter = next)
poller_dispatcher_fn dispatcher; ///< Event dispatcher {
void *user_data; ///< User data next = iter->next;
}; iter->dispatcher (iter->user_data);
}
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
#ifdef __linux__
#include <sys/epoll.h>
struct poller struct poller
{ {
int epoll_fd; ///< The epoll FD int epoll_fd; ///< The epoll FD
struct poller_info **info; ///< Information associated with each FD struct poller_fd **fds; ///< Information associated with each FD
int *dummy; ///< For poller_remove_from_dispatch()
struct epoll_event *revents; ///< Output array for epoll_wait() struct epoll_event *revents; ///< Output array for epoll_wait()
size_t len; ///< Number of polled descriptors size_t len; ///< Number of polled descriptors
size_t alloc; ///< Number of entries allocated size_t alloc; ///< Number of entries allocated
struct poller_timers timers; ///< Timeouts struct poller_timers timers; ///< Timeouts
struct poller_idle *idle; ///< Idle events
/// Index of the element in `revents' that's about to be dispatched next. int revents_len; ///< Number of entries in `revents'
int dispatch_next;
/// The total number of entries stored in `revents' by epoll_wait().
int dispatch_total;
}; };
static void static void
@ -1038,13 +1049,13 @@ poller_init (struct poller *self)
self->len = 0; self->len = 0;
self->alloc = POLLER_MIN_ALLOC; self->alloc = POLLER_MIN_ALLOC;
self->info = xcalloc (self->alloc, sizeof *self->info); self->fds = xcalloc (self->alloc, sizeof *self->fds);
self->dummy = xcalloc (self->alloc, sizeof *self->dummy);
self->revents = xcalloc (self->alloc, sizeof *self->revents); self->revents = xcalloc (self->alloc, sizeof *self->revents);
self->revents_len = 0;
poller_timers_init (&self->timers); poller_timers_init (&self->timers);
self->idle = NULL;
self->dispatch_next = 0;
self->dispatch_total = 0;
} }
static void static void
@ -1052,28 +1063,19 @@ poller_free (struct poller *self)
{ {
for (size_t i = 0; i < self->len; i++) for (size_t i = 0; i < self->len; i++)
{ {
struct poller_info *info = self->info[i]; struct poller_fd *fd = self->fds[i];
hard_assert (epoll_ctl (self->epoll_fd, hard_assert (epoll_ctl (self->epoll_fd,
EPOLL_CTL_DEL, info->fd, (void *) "") != -1); EPOLL_CTL_DEL, fd->fd, (void *) "") != -1);
free (info);
} }
poller_timers_free (&self->timers); poller_timers_free (&self->timers);
xclose (self->epoll_fd); xclose (self->epoll_fd);
free (self->info); free (self->fds);
free (self->dummy);
free (self->revents); free (self->revents);
} }
static ssize_t
poller_find_by_fd (struct poller *self, int fd)
{
for (size_t i = 0; i < self->len; i++)
if (self->info[i]->fd == fd)
return i;
return -1;
}
static void static void
poller_ensure_space (struct poller *self) poller_ensure_space (struct poller *self)
{ {
@ -1085,8 +1087,10 @@ poller_ensure_space (struct poller *self)
self->revents = xreallocarray self->revents = xreallocarray
(self->revents, sizeof *self->revents, self->alloc); (self->revents, sizeof *self->revents, self->alloc);
self->info = xreallocarray self->fds = xreallocarray
(self->info, sizeof *self->info, self->alloc); (self->fds, sizeof *self->fds, self->alloc);
self->dummy = xreallocarray
(self->dummy, sizeof *self->dummy, self->alloc);
} }
static short static short
@ -1114,118 +1118,122 @@ poller_poll_to_epoll_events (short events)
} }
static void static void
poller_set (struct poller *self, int fd, short events, poller_set (struct poller *self, struct poller_fd *fd)
poller_dispatcher_fn dispatcher, void *data)
{ {
ssize_t index = poller_find_by_fd (self, fd); hard_assert (fd->poller == self);
bool modifying = true; bool modifying = true;
if (index == -1) if (fd->index == -1)
{ {
poller_ensure_space (self); poller_ensure_space (self);
self->info[index = self->len++] = xcalloc (1, sizeof **self->info); self->fds[fd->index = self->len++] = fd;
modifying = false; modifying = false;
} }
struct poller_info *info = self->info[index];
info->fd = fd;
info->events = events;
info->dispatcher = dispatcher;
info->user_data = data;
struct epoll_event event; struct epoll_event event;
event.events = poller_poll_to_epoll_events (events); event.events = poller_poll_to_epoll_events (fd->events);
event.data.ptr = info; event.data.ptr = fd;
hard_assert (epoll_ctl (self->epoll_fd, hard_assert (epoll_ctl (self->epoll_fd,
modifying ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &event) != -1); modifying ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd->fd, &event) != -1);
}
static int
poller_compare_fds (const void *ax, const void *bx)
{
const struct epoll_event *ay = ax, *by = bx;
struct poller_fd *a = ay->data.ptr, *b = by->data.ptr;
return a->fd - b->fd;
} }
static void static void
poller_remove_from_dispatch (struct poller *self, poller_remove_from_dispatch (struct poller *self, const struct poller_fd *fd)
const struct poller_info *info)
{ {
if (!self->dispatch_total) if (!self->revents_len)
return; return;
int i; struct epoll_event key = { .data.ptr = (void *) fd }, *fd_event;
for (i = self->dispatch_next; i < self->dispatch_total; i++) if ((fd_event = bsearch (&key, self->revents,
if (self->revents[i].data.ptr == info) self->revents_len, sizeof *self->revents, poller_compare_fds)))
break; {
if (i == self->dispatch_total) fd_event->events = -1;
return;
if (i != --self->dispatch_total) // Don't let any further bsearch()'s touch possibly freed memory
self->revents[i] = self->revents[self->dispatch_total]; int *dummy = self->dummy + (fd_event - self->revents);
*dummy = fd->fd;
fd_event->data.ptr =
(uint8_t *) dummy - offsetof (struct poller_fd, fd);
}
} }
static void static void
poller_remove_at_index (struct poller *self, size_t index) poller_remove_at_index (struct poller *self, size_t index)
{ {
hard_assert (index < self->len); hard_assert (index < self->len);
struct poller_info *info = self->info[index]; struct poller_fd *fd = self->fds[index];
fd->index = -1;
poller_remove_from_dispatch (self, info); poller_remove_from_dispatch (self, fd);
if (!fd->closed)
hard_assert (epoll_ctl (self->epoll_fd, hard_assert (epoll_ctl (self->epoll_fd,
EPOLL_CTL_DEL, info->fd, (void *) "") != -1); EPOLL_CTL_DEL, fd->fd, (void *) "") != -1);
free (info);
if (index != --self->len) if (index != --self->len)
self->info[index] = self->info[self->len]; {
self->fds[index] = self->fds[self->len];
self->fds[index]->index = index;
}
} }
static void static void
poller_run (struct poller *self) poller_run (struct poller *self)
{ {
// Not reentrant // Not reentrant
hard_assert (!self->dispatch_total); hard_assert (!self->revents_len);
int n_fds; int n_fds;
do do
n_fds = epoll_wait (self->epoll_fd, self->revents, self->len, n_fds = epoll_wait (self->epoll_fd, self->revents, self->len,
poller_timers_get_poll_timeout (&self->timers)); self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers));
while (n_fds == -1 && errno == EINTR); while (n_fds == -1 && errno == EINTR);
if (n_fds == -1) if (n_fds == -1)
exit_fatal ("%s: %s", "epoll", strerror (errno)); exit_fatal ("%s: %s", "epoll", strerror (errno));
self->dispatch_next = 0; // Sort them by file descriptor number for binary search
self->dispatch_total = n_fds; qsort (self->revents, n_fds, sizeof *self->revents, poller_compare_fds);
self->revents_len = n_fds;
poller_timers_dispatch (&self->timers); poller_timers_dispatch (&self->timers);
poller_idle_dispatch (self->idle);
while (self->dispatch_next < self->dispatch_total) for (int i = 0; i < n_fds; i++)
{ {
struct epoll_event *revents = self->revents + self->dispatch_next; struct epoll_event *revents = self->revents + i;
struct poller_info *info = revents->data.ptr; if (revents->events == (uint32_t) -1)
continue;
struct poller_fd *fd = revents->data.ptr;
struct pollfd pfd; struct pollfd pfd;
pfd.fd = info->fd; pfd.fd = fd->fd;
pfd.revents = poller_epoll_to_poll_events (revents->events); pfd.revents = poller_epoll_to_poll_events (revents->events);
pfd.events = info->events; pfd.events = fd->events;
self->dispatch_next++; fd->dispatcher (&pfd, fd->user_data);
info->dispatcher (&pfd, info->user_data);
} }
self->dispatch_next = 0; self->revents_len = 0;
self->dispatch_total = 0;
} }
#else // !__linux__ #else // !__linux__
struct poller_info
{
poller_dispatcher_func dispatcher; ///< Event dispatcher
void *user_data; ///< User data
};
struct poller struct poller
{ {
struct pollfd *fds; ///< Polled descriptors struct pollfd *fds; ///< Polled descriptors
struct poller_info *fds_info; ///< Additional information for each FD struct poller_fd **fds_data; ///< Additional information for each FD
size_t len; ///< Number of polled descriptors size_t len; ///< Number of polled descriptors
size_t alloc; ///< Number of entries allocated size_t alloc; ///< Number of entries allocated
struct poller_timers timers; ///< Timers struct poller_timers timers; ///< Timers
struct poller_idle *idle; ///< Idle events
int dispatch_next; ///< The next dispatched FD or -1 int dispatch_next; ///< The next dispatched FD or -1
}; };
@ -1235,7 +1243,7 @@ poller_init (struct poller *self)
self->alloc = POLLER_MIN_ALLOC; self->alloc = POLLER_MIN_ALLOC;
self->len = 0; self->len = 0;
self->fds = xcalloc (self->alloc, sizeof *self->fds); self->fds = xcalloc (self->alloc, sizeof *self->fds);
self->fds_info = xcalloc (self->alloc, sizeof *self->fds_info); self->fds_data = xcalloc (self->alloc, sizeof *self->fds_data);
poller_timers_init (&self->timers); poller_timers_init (&self->timers);
self->dispatch_next = -1; self->dispatch_next = -1;
} }
@ -1244,19 +1252,10 @@ static void
poller_free (struct poller *self) poller_free (struct poller *self)
{ {
free (self->fds); free (self->fds);
free (self->fds_info); free (self->fds_data);
poller_timers_free (&self->timers); poller_timers_free (&self->timers);
} }
static ssize_t
poller_find_by_fd (struct poller *self, int fd)
{
for (size_t i = 0; i < self->len; i++)
if (self->fds[i].fd == fd)
return i;
return -1;
}
static void static void
poller_ensure_space (struct poller *self) poller_ensure_space (struct poller *self)
{ {
@ -1265,33 +1264,33 @@ poller_ensure_space (struct poller *self)
self->alloc <<= 1; self->alloc <<= 1;
self->fds = xreallocarray (self->fds, sizeof *self->fds, self->alloc); self->fds = xreallocarray (self->fds, sizeof *self->fds, self->alloc);
self->fds_info = xreallocarray self->fds_data = xreallocarray
(self->fds_info, sizeof *self->fds_info, self->alloc); (self->fds_data, sizeof *self->fds_data, self->alloc);
} }
static void static void
poller_set (struct poller *self, int fd, short events, poller_set (struct poller *self, struct poller_fd *fd)
poller_dispatcher_func dispatcher, void *data)
{ {
ssize_t index = poller_find_by_fd (self, fd); hard_assert (fd->poller == self);
if (index == -1) if (fd->index == -1)
{ {
poller_ensure_space (self); poller_ensure_space (self);
index = self->len++; self->fds_data[fd->index = self->len++] = fd;
} }
struct pollfd *new_entry = self->fds + index; struct pollfd *new_entry = self->fds + fd->index;
memset (new_entry, 0, sizeof *new_entry); memset (new_entry, 0, sizeof *new_entry);
new_entry->fd = fd; new_entry->fd = fd->fd;
new_entry->events = events; new_entry->events = fd->events;
self->fds_info[index] = (struct poller_info) { dispatcher, data };
} }
static void static void
poller_remove_at_index (struct poller *self, size_t index) poller_remove_at_index (struct poller *self, size_t index)
{ {
hard_assert (index < self->len); hard_assert (index < self->len);
struct poller_fd *fd = self->fds_data[index];
fd->index = -1;
if (index == --self->len) if (index == --self->len)
return; return;
@ -1300,14 +1299,18 @@ poller_remove_at_index (struct poller *self, size_t index)
{ {
memmove (self->fds + index, self->fds + index + 1, memmove (self->fds + index, self->fds + index + 1,
(self->len - index) * sizeof *self->fds); (self->len - index) * sizeof *self->fds);
memmove (self->fds_info + index, self->fds_info + index + 1, memmove (self->fds_data + index, self->fds_data + index + 1,
(self->len - index) * sizeof *self->fds_info); (self->len - index) * sizeof *self->fds_data);
for (size_t i = index; i < self->len; i++)
self->fds_data[i]->index = i;
self->dispatch_next--; self->dispatch_next--;
} }
else else
{ {
self->fds[index] = self->fds [self->len]; self->fds[index] = self->fds [self->len];
self->fds_info[index] = self->fds_info[self->len]; self->fds_data[index] = self->fds_data[self->len];
self->fds_data[index]->index = index;
} }
} }
@ -1320,21 +1323,22 @@ poller_run (struct poller *self)
int result; int result;
do do
result = poll (self->fds, self->len, result = poll (self->fds, self->len,
poller_timers_get_poll_timeout (&self->timers)); self->idle ? 0 : poller_timers_get_poll_timeout (&self->timers));
while (result == -1 && errno == EINTR); while (result == -1 && errno == EINTR);
if (result == -1) if (result == -1)
exit_fatal ("%s: %s", "poll", strerror (errno)); exit_fatal ("%s: %s", "poll", strerror (errno));
poller_timers_dispatch (&self->timers); poller_timers_dispatch (&self->timers);
poller_idle_dispatch (self->idle);
for (int i = 0; i < (int) self->len; ) for (int i = 0; i < (int) self->len; )
{ {
struct pollfd pfd = self->fds[i]; struct pollfd pfd = self->fds[i];
struct poller_info *info = self->fds_info + i; struct poller_fd *fd = self->fds_data[i];
self->dispatch_next = ++i; self->dispatch_next = ++i;
if (pfd.revents) if (pfd.revents)
info->dispatcher (&pfd, info->user_data); fd->dispatcher (&pfd, fd->user_data);
i = self->dispatch_next; i = self->dispatch_next;
} }
@ -1343,6 +1347,86 @@ poller_run (struct poller *self)
#endif // !__linux__ #endif // !__linux__
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void
poller_timer_init (struct poller_timer *self, struct poller *poller)
{
memset (self, 0, sizeof *self);
self->timers = &poller->timers;
self->index = -1;
}
static void
poller_timer_set (struct poller_timer *self, int timeout_ms)
{
self->when = poller_timers_get_current_time () + timeout_ms;
poller_timers_set (self->timers, self);
}
static void
poller_timer_reset (struct poller_timer *self)
{
if (self->index != -1)
poller_timers_remove_at_index (self->timers, self->index);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void
poller_idle_init (struct poller_idle *self, struct poller *poller)
{
memset (self, 0, sizeof *self);
self->poller = poller;
}
static void
poller_idle_set (struct poller_idle *self)
{
if (self->active)
return;
LIST_PREPEND (self->poller->idle, self);
self->active = true;
}
static void
poller_idle_reset (struct poller_idle *self)
{
if (!self->active)
return;
LIST_UNLINK (self->poller->idle, self);
self->prev = NULL;
self->next = NULL;
self->active = false;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void
poller_fd_init (struct poller_fd *self, struct poller *poller, int fd)
{
memset (self, 0, sizeof *self);
self->poller = poller;
self->index = -1;
self->fd = fd;
}
static void
poller_fd_set (struct poller_fd *self, short events)
{
self->events = events;
poller_set (self->poller, self);
}
static void
poller_fd_reset (struct poller_fd *self)
{
if (self->index != -1)
poller_remove_at_index (self->poller, self->index);
}
// --- Utilities --------------------------------------------------------------- // --- Utilities ---------------------------------------------------------------
static void static void

83
kike.c
View File

@ -303,6 +303,11 @@ struct client
struct str read_buffer; ///< Unprocessed input struct str read_buffer; ///< Unprocessed input
struct str write_buffer; ///< Output yet to be sent out struct str write_buffer; ///< Output yet to be sent out
struct poller_fd socket_event; ///< The socket can be read/written to
struct poller_timer ping_timer; ///< We should send a ping
struct poller_timer timeout_timer; ///< Connection seems to be dead
struct poller_timer kill_timer; ///< Hard kill timeout
bool initialized; ///< Has any data been received yet? bool initialized; ///< Has any data been received yet?
bool registered; ///< The user has registered bool registered; ///< The user has registered
bool closing_link; ///< Closing link bool closing_link; ///< Closing link
@ -512,6 +517,7 @@ channel_user_count (const struct channel *chan)
struct server_context struct server_context
{ {
int *listen_fds; ///< Listening socket FD's int *listen_fds; ///< Listening socket FD's
struct poller_fd *listen_events; ///< New connections available
size_t n_listen_fds; ///< Number of listening sockets size_t n_listen_fds; ///< Number of listening sockets
SSL_CTX *ssl_ctx; ///< SSL context SSL_CTX *ssl_ctx; ///< SSL context
@ -526,6 +532,8 @@ struct server_context
bool quitting; ///< User requested quitting bool quitting; ///< User requested quitting
bool polling; ///< The event loop is running bool polling; ///< The event loop is running
struct poller_fd signal_event; ///< Got a signal
struct str_map config; ///< Server configuration struct str_map config; ///< Server configuration
char *server_name; ///< Our server name char *server_name; ///< Our server name
unsigned ping_interval; ///< Ping interval in seconds unsigned ping_interval; ///< Ping interval in seconds
@ -539,6 +547,7 @@ static void
server_context_init (struct server_context *self) server_context_init (struct server_context *self)
{ {
self->listen_fds = NULL; self->listen_fds = NULL;
self->listen_events = NULL;
self->n_listen_fds = 0; self->n_listen_fds = 0;
self->clients = NULL; self->clients = NULL;
self->n_clients = 0; self->n_clients = 0;
@ -555,6 +564,8 @@ server_context_init (struct server_context *self)
self->quitting = false; self->quitting = false;
self->polling = false; self->polling = false;
memset (&self->signal_event, 0, sizeof self->signal_event);
str_map_init (&self->config); str_map_init (&self->config);
self->config.free = free; self->config.free = free;
load_config_defaults (&self->config, g_config_table); load_config_defaults (&self->config, g_config_table);
@ -575,8 +586,13 @@ server_context_free (struct server_context *self)
str_map_free (&self->config); str_map_free (&self->config);
for (size_t i = 0; i < self->n_listen_fds; i++) for (size_t i = 0; i < self->n_listen_fds; i++)
{
xclose (self->listen_fds[i]); xclose (self->listen_fds[i]);
self->listen_events[i].closed = true;
poller_fd_reset (&self->listen_events[i]);
}
free (self->listen_fds); free (self->listen_fds);
free (self->listen_events);
if (self->ssl_ctx) if (self->ssl_ctx)
SSL_CTX_free (self->ssl_ctx); SSL_CTX_free (self->ssl_ctx);
@ -628,10 +644,9 @@ irc_initiate_quit (struct server_context *ctx)
for (size_t i = 0; i < ctx->n_listen_fds; i++) for (size_t i = 0; i < ctx->n_listen_fds; i++)
{ {
ssize_t index = poller_find_by_fd (&ctx->poller, ctx->listen_fds[i]);
if (soft_assert (index != -1))
poller_remove_at_index (&ctx->poller, index);
xclose (ctx->listen_fds[i]); xclose (ctx->listen_fds[i]);
ctx->listen_events[i].closed = true;
poller_fd_reset (&ctx->listen_events[i]);
} }
ctx->n_listen_fds = 0; ctx->n_listen_fds = 0;
@ -770,15 +785,15 @@ client_kill (struct client *c, const char *reason)
client_unregister (c, reason ? reason : "Client exited"); client_unregister (c, reason ? reason : "Client exited");
struct server_context *ctx = c->ctx; struct server_context *ctx = c->ctx;
ssize_t i = poller_find_by_fd (&ctx->poller, c->socket_fd);
if (i != -1)
poller_remove_at_index (&ctx->poller, i);
client_cancel_timers (c);
if (c->ssl) if (c->ssl)
(void) SSL_shutdown (c->ssl); (void) SSL_shutdown (c->ssl);
xclose (c->socket_fd); xclose (c->socket_fd);
c->socket_event.closed = true;
poller_fd_reset (&c->socket_event);
client_cancel_timers (c);
print_debug ("closed connection to %s (%s)", print_debug ("closed connection to %s (%s)",
c->address, reason ? reason : "Reason omitted"); c->address, reason ? reason : "Reason omitted");
@ -858,17 +873,17 @@ client_get_ssl_cert_fingerprint (struct client *c)
static void static void
client_cancel_timers (struct client *c) client_cancel_timers (struct client *c)
{ {
ssize_t i; poller_timer_reset (&c->kill_timer);
struct poller_timers *timers = &c->ctx->poller.timers; poller_timer_reset (&c->timeout_timer);
while ((i = poller_timers_find_by_data (timers, c)) != -1) poller_timer_reset (&c->ping_timer);
poller_timers_remove_at_index (timers, i);
} }
static void static void
client_set_timer (struct client *c, poller_timer_fn fn, unsigned interval) client_set_timer (struct client *c,
struct poller_timer *timer, unsigned interval)
{ {
client_cancel_timers (c); client_cancel_timers (c);
poller_timers_add (&c->ctx->poller.timers, fn, c, interval * 1000); poller_timer_set (timer, interval * 1000);
} }
static void static void
@ -882,7 +897,7 @@ on_client_kill_timer (void *user_data)
static void static void
client_set_kill_timer (struct client *c) client_set_kill_timer (struct client *c)
{ {
client_set_timer (c, on_client_kill_timer, c->ctx->ping_interval); client_set_timer (c, &c->kill_timer, c->ctx->ping_interval);
} }
static void static void
@ -901,13 +916,13 @@ on_client_ping_timer (void *user_data)
struct client *c = user_data; struct client *c = user_data;
hard_assert (!c->closing_link); hard_assert (!c->closing_link);
client_send (c, "PING :%s", c->ctx->server_name); client_send (c, "PING :%s", c->ctx->server_name);
client_set_timer (c, on_client_timeout_timer, c->ctx->ping_interval); client_set_timer (c, &c->timeout_timer, c->ctx->ping_interval);
} }
static void static void
client_set_ping_timer (struct client *c) client_set_ping_timer (struct client *c)
{ {
client_set_timer (c, on_client_ping_timer, c->ctx->ping_interval); client_set_timer (c, &c->ping_timer, c->ctx->ping_interval);
} }
// --- IRC command handling ---------------------------------------------------- // --- IRC command handling ----------------------------------------------------
@ -2627,8 +2642,7 @@ client_update_poller (struct client *c, const struct pollfd *pfd)
hard_assert (new_events != 0); hard_assert (new_events != 0);
if (!pfd || pfd->events != new_events) if (!pfd || pfd->events != new_events)
poller_set (&c->ctx->poller, c->socket_fd, new_events, poller_fd_set (&c->socket_event, new_events);
(poller_dispatcher_fn) on_client_ready, c);
} }
static void static void
@ -2686,6 +2700,22 @@ on_irc_client_available (const struct pollfd *pfd, void *user_data)
LIST_PREPEND (ctx->clients, c); LIST_PREPEND (ctx->clients, c);
ctx->n_clients++; ctx->n_clients++;
poller_fd_init (&c->socket_event, &c->ctx->poller, c->socket_fd);
c->socket_event.dispatcher = (poller_fd_fn) on_client_ready;
c->socket_event.user_data = c;
poller_timer_init (&c->kill_timer, &c->ctx->poller);
c->kill_timer.dispatcher = on_client_kill_timer;
c->kill_timer.user_data = c;
poller_timer_init (&c->timeout_timer, &c->ctx->poller);
c->timeout_timer.dispatcher = on_client_timeout_timer;
c->timeout_timer.user_data = c;
poller_timer_init (&c->ping_timer, &c->ctx->poller);
c->ping_timer.dispatcher = on_client_ping_timer;
c->ping_timer.user_data = c;
set_blocking (fd, false); set_blocking (fd, false);
client_update_poller (c, NULL); client_update_poller (c, NULL);
client_set_kill_timer (c); client_set_kill_timer (c);
@ -2985,11 +3015,15 @@ irc_listen_resolve (struct server_context *ctx,
{ {
if ((fd = irc_listen (gai_iter)) == -1) if ((fd = irc_listen (gai_iter)) == -1)
continue; continue;
set_blocking (fd, false);
struct poller_fd *event = &ctx->listen_events[ctx->n_listen_fds];
poller_fd_init (event, &ctx->poller, fd);
event->dispatcher = (poller_fd_fn) on_irc_client_available;
event->user_data = ctx;
ctx->listen_fds[ctx->n_listen_fds++] = fd; ctx->listen_fds[ctx->n_listen_fds++] = fd;
set_blocking (fd, false); poller_fd_set (event, POLLIN);
poller_set (&ctx->poller, fd, POLLIN,
(poller_dispatcher_fn) on_irc_client_available, ctx);
break; break;
} }
freeaddrinfo (gai_result); freeaddrinfo (gai_result);
@ -3012,6 +3046,7 @@ irc_setup_listen_fds (struct server_context *ctx, struct error **e)
str_vector_init (&ports); str_vector_init (&ports);
split_str_ignore_empty (bind_port, ',', &ports); split_str_ignore_empty (bind_port, ',', &ports);
ctx->listen_fds = xcalloc (ports.len, sizeof *ctx->listen_fds); ctx->listen_fds = xcalloc (ports.len, sizeof *ctx->listen_fds);
ctx->listen_events = xcalloc (ports.len, sizeof *ctx->listen_events);
for (size_t i = 0; i < ports.len; i++) for (size_t i = 0; i < ports.len; i++)
irc_listen_resolve (ctx, bind_host, ports.vector[i], &gai_hints); irc_listen_resolve (ctx, bind_host, ports.vector[i], &gai_hints);
str_vector_free (&ports); str_vector_free (&ports);
@ -3134,8 +3169,10 @@ main (int argc, char *argv[])
exit (EXIT_FAILURE); exit (EXIT_FAILURE);
} }
poller_set (&ctx.poller, g_signal_pipe[0], POLLIN, poller_fd_init (&ctx.signal_event, &ctx.poller, g_signal_pipe[0]);
(poller_dispatcher_fn) on_signal_pipe_readable, &ctx); ctx.signal_event.dispatcher = (poller_fd_fn) on_signal_pipe_readable;
ctx.signal_event.user_data = &ctx;
poller_fd_set (&ctx.signal_event, POLLIN);
if (!irc_initialize_ssl (&ctx, &e) if (!irc_initialize_ssl (&ctx, &e)
|| !irc_initialize_server_name (&ctx, &e) || !irc_initialize_server_name (&ctx, &e)

101
zyklonb.c
View File

@ -73,6 +73,9 @@ struct plugin_data
int read_fd; ///< The read end of the comm. pipe int read_fd; ///< The read end of the comm. pipe
int write_fd; ///< The write end of the comm. pipe int write_fd; ///< The write end of the comm. pipe
struct poller_fd read_event; ///< Read FD event
struct poller_fd write_event; ///< Write FD event
struct str read_buffer; ///< Unprocessed input struct str read_buffer; ///< Unprocessed input
struct str write_buffer; ///< Output yet to be sent out struct str write_buffer; ///< Output yet to be sent out
}; };
@ -118,8 +121,14 @@ struct bot_context
int irc_fd; ///< Socket FD of the server int irc_fd; ///< Socket FD of the server
struct str read_buffer; ///< Input yet to be processed struct str read_buffer; ///< Input yet to be processed
struct poller_fd irc_event; ///< IRC FD event
bool irc_ready; ///< Whether we may send messages now bool irc_ready; ///< Whether we may send messages now
struct poller_fd signal_event; ///< Signal FD event
struct poller_timer ping_tmr; ///< We should send a ping
struct poller_timer timeout_tmr; ///< Connection seems to be dead
struct poller_timer reconnect_tmr; ///< We should reconnect now
SSL_CTX *ssl_ctx; ///< SSL context SSL_CTX *ssl_ctx; ///< SSL context
SSL *ssl; ///< SSL connection SSL *ssl; ///< SSL connection
@ -131,6 +140,10 @@ struct bot_context
bool polling; ///< The event loop is running bool polling; ///< The event loop is running
}; };
static void on_irc_ping_timeout (void *user_data);
static void on_irc_timeout (void *user_data);
static void on_irc_reconnect_timeout (void *user_data);
static void static void
bot_context_init (struct bot_context *self) bot_context_init (struct bot_context *self)
{ {
@ -152,6 +165,18 @@ bot_context_init (struct bot_context *self)
poller_init (&self->poller); poller_init (&self->poller);
self->quitting = false; self->quitting = false;
self->polling = false; self->polling = false;
poller_timer_init (&self->timeout_tmr, &self->poller);
self->timeout_tmr.dispatcher = on_irc_timeout;
self->timeout_tmr.user_data = self;
poller_timer_init (&self->ping_tmr, &self->poller);
self->ping_tmr.dispatcher = on_irc_ping_timeout;
self->ping_tmr.user_data = self;
poller_timer_init (&self->reconnect_tmr, &self->poller);
self->reconnect_tmr.dispatcher = on_irc_reconnect_timeout;
self->reconnect_tmr.user_data = self;
} }
static void static void
@ -172,7 +197,10 @@ bot_context_free (struct bot_context *self)
} }
if (self->irc_fd != -1) if (self->irc_fd != -1)
{
xclose (self->irc_fd); xclose (self->irc_fd);
poller_fd_reset (&self->irc_event);
}
if (self->ssl) if (self->ssl)
SSL_free (self->ssl); SSL_free (self->ssl);
if (self->ssl_ctx) if (self->ssl_ctx)
@ -1060,10 +1088,7 @@ plugin_zombify (struct plugin_data *plugin)
// empty before closing it... and then on EOF check if `pid == -1' and // empty before closing it... and then on EOF check if `pid == -1' and
// only then dispose of it (it'd be best to simulate that both of these // only then dispose of it (it'd be best to simulate that both of these
// cases may happen). // cases may happen).
ssize_t poller_idx = poller_fd_reset (&plugin->write_event);
poller_find_by_fd (&plugin->ctx->poller, plugin->write_fd);
if (poller_idx != -1)
poller_remove_at_index (&plugin->ctx->poller, poller_idx);
// TODO: try to flush the write buffer (non-blocking)? // TODO: try to flush the write buffer (non-blocking)?
@ -1083,7 +1108,6 @@ plugin_zombify (struct plugin_data *plugin)
static void static void
on_plugin_writable (const struct pollfd *fd, struct plugin_data *plugin) on_plugin_writable (const struct pollfd *fd, struct plugin_data *plugin)
{ {
struct bot_context *ctx = plugin->ctx;
struct str *buf = &plugin->write_buffer; struct str *buf = &plugin->write_buffer;
size_t written_total = 0; size_t written_total = 0;
@ -1124,12 +1148,8 @@ on_plugin_writable (const struct pollfd *fd, struct plugin_data *plugin)
str_remove_slice (buf, 0, written_total); str_remove_slice (buf, 0, written_total);
if (buf->len == 0) if (buf->len == 0)
{
// Everything has been written, there's no need to end up in here again // Everything has been written, there's no need to end up in here again
ssize_t index = poller_find_by_fd (&ctx->poller, fd->fd); poller_fd_reset (&plugin->write_event);
if (index != -1)
poller_remove_at_index (&ctx->poller, index);
}
} }
static void static void
@ -1148,9 +1168,7 @@ plugin_queue_write (struct plugin_data *plugin)
plugin_zombify (plugin); plugin_zombify (plugin);
return; return;
} }
poller_fd_set (&plugin->write_event, POLLOUT);
poller_set (&plugin->ctx->poller, plugin->write_fd, POLLOUT,
(poller_dispatcher_fn) on_plugin_writable, plugin);
} }
static void static void
@ -1412,11 +1430,18 @@ plugin_load (struct bot_context *ctx, const char *name, struct error **e)
plugin->read_fd = stdout_pipe[0]; plugin->read_fd = stdout_pipe[0];
plugin->write_fd = stdin_pipe[1]; plugin->write_fd = stdin_pipe[1];
poller_fd_init (&plugin->read_event, &ctx->poller, plugin->read_fd);
plugin->read_event.dispatcher = (poller_fd_fn) on_plugin_readable;
plugin->read_event.user_data = plugin;
poller_fd_init (&plugin->write_event, &ctx->poller, plugin->write_fd);
plugin->write_event.dispatcher = (poller_fd_fn) on_plugin_writable;
plugin->write_event.user_data = plugin;
LIST_PREPEND (ctx->plugins, plugin); LIST_PREPEND (ctx->plugins, plugin);
str_map_set (&ctx->plugins_by_name, name, plugin); str_map_set (&ctx->plugins_by_name, name, plugin);
poller_set (&ctx->poller, stdout_pipe[0], POLLIN, poller_fd_set (&plugin->read_event, POLLIN);
(poller_dispatcher_fn) on_plugin_readable, plugin);
return true; return true;
fail_3: fail_3:
@ -1818,14 +1843,13 @@ static void irc_queue_reconnect (struct bot_context *);
static void static void
irc_cancel_timers (struct bot_context *ctx) irc_cancel_timers (struct bot_context *ctx)
{ {
ssize_t i; poller_timer_reset (&ctx->timeout_tmr);
struct poller_timers *timers = &ctx->poller.timers; poller_timer_reset (&ctx->ping_tmr);
while ((i = poller_timers_find_by_data (timers, ctx)) != -1) poller_timer_reset (&ctx->reconnect_tmr);
poller_timers_remove_at_index (timers, i);
} }
static void static void
irc_on_reconnect_timeout (void *user_data) on_irc_reconnect_timeout (void *user_data)
{ {
struct bot_context *ctx = user_data; struct bot_context *ctx = user_data;
@ -1847,8 +1871,7 @@ irc_queue_reconnect (struct bot_context *ctx)
hard_assert (ctx->irc_fd == -1); hard_assert (ctx->irc_fd == -1);
print_status ("trying to reconnect in %ld seconds...", print_status ("trying to reconnect in %ld seconds...",
ctx->reconnect_delay); ctx->reconnect_delay);
poller_timers_add (&ctx->poller.timers, poller_timer_set (&ctx->reconnect_tmr, ctx->reconnect_delay * 1000);
irc_on_reconnect_timeout, ctx, ctx->reconnect_delay * 1000);
} }
static void static void
@ -1863,14 +1886,13 @@ on_irc_disconnected (struct bot_context *ctx)
ctx->ssl_ctx = NULL; ctx->ssl_ctx = NULL;
} }
ssize_t i = poller_find_by_fd (&ctx->poller, ctx->irc_fd);
if (i != -1)
poller_remove_at_index (&ctx->poller, i);
xclose (ctx->irc_fd); xclose (ctx->irc_fd);
ctx->irc_fd = -1; ctx->irc_fd = -1;
ctx->irc_ready = false; ctx->irc_ready = false;
ctx->irc_event.closed = true;
poller_fd_reset (&ctx->irc_event);
// TODO: inform plugins about the disconnect event // TODO: inform plugins about the disconnect event
// All of our timers have lost their meaning now // All of our timers have lost their meaning now
@ -1905,10 +1927,8 @@ static void
irc_reset_connection_timeouts (struct bot_context *ctx) irc_reset_connection_timeouts (struct bot_context *ctx)
{ {
irc_cancel_timers (ctx); irc_cancel_timers (ctx);
poller_timers_add (&ctx->poller.timers, poller_timer_set (&ctx->timeout_tmr, 3 * 60 * 1000);
on_irc_timeout, ctx, 3 * 60 * 1000); poller_timer_set (&ctx->ping_tmr, (3 * 60 + 30) * 1000);
poller_timers_add (&ctx->poller.timers,
on_irc_ping_timeout, ctx, (3 * 60 + 30) * 1000);
} }
static void static void
@ -2026,11 +2046,14 @@ irc_connect (struct bot_context *ctx, struct error **e)
} }
print_status ("connection established"); print_status ("connection established");
poller_fd_init (&ctx->irc_event, &ctx->poller, ctx->irc_fd);
ctx->irc_event.dispatcher = (poller_fd_fn) on_irc_readable;
ctx->irc_event.user_data = ctx;
// TODO: in exec try: 1/ set blocking, 2/ setsockopt() SO_LINGER, // TODO: in exec try: 1/ set blocking, 2/ setsockopt() SO_LINGER,
// (struct linger) { .l_onoff = true; .l_linger = 1 /* 1s should do */; } // (struct linger) { .l_onoff = true; .l_linger = 1 /* 1s should do */; }
// 3/ /* O_CLOEXEC */ But only if the QUIT message proves unreliable. // 3/ /* O_CLOEXEC */ But only if the QUIT message proves unreliable.
poller_set (&ctx->poller, ctx->irc_fd, POLLIN, poller_fd_set (&ctx->irc_event, POLLIN);
(poller_dispatcher_fn) on_irc_readable, ctx);
irc_reset_connection_timeouts (ctx); irc_reset_connection_timeouts (ctx);
irc_send (ctx, "NICK %s", nickname); irc_send (ctx, "NICK %s", nickname);
@ -2134,13 +2157,12 @@ on_signal_pipe_readable (const struct pollfd *fd, struct bot_context *ctx)
plugin->pid = -1; plugin->pid = -1;
ssize_t poller_idx = poller_find_by_fd (&ctx->poller, plugin->read_fd);
if (poller_idx != -1)
poller_remove_at_index (&ctx->poller, poller_idx);
xclose (plugin->read_fd); xclose (plugin->read_fd);
plugin->read_fd = -1; plugin->read_fd = -1;
plugin->read_event.closed = true;
poller_fd_reset (&plugin->read_event);
LIST_UNLINK (ctx->plugins, plugin); LIST_UNLINK (ctx->plugins, plugin);
plugin_data_free (plugin); plugin_data_free (plugin);
free (plugin); free (plugin);
@ -2215,8 +2237,11 @@ main (int argc, char *argv[])
} }
setup_recovery_handler (&ctx); setup_recovery_handler (&ctx);
poller_set (&ctx.poller, g_signal_pipe[0], POLLIN,
(poller_dispatcher_fn) on_signal_pipe_readable, &ctx); poller_fd_init (&ctx.signal_event, &ctx.poller, g_signal_pipe[0]);
ctx.signal_event.dispatcher = (poller_fd_fn) on_signal_pipe_readable;
ctx.signal_event.user_data = &ctx;
poller_fd_set (&ctx.signal_event, POLLIN);
plugin_load_all_from_config (&ctx); plugin_load_all_from_config (&ctx);
if (!parse_config (&ctx, &e) if (!parse_config (&ctx, &e)