More changes to fit libev's model

This commit is contained in:
Přemysl Eric Janouch 2014-10-28 02:59:39 +01:00
parent 74965b0f66
commit 8838e88d91
1 changed files with 33 additions and 27 deletions

View File

@ -86,7 +86,8 @@ struct client
LIST_HEADER (client_t)
int fd; ///< Client connection
ev_io watcher; ///< Client connection watcher
ev_io read_watcher; ///< Client readability watcher
ev_io write_watcher; ///< Client writability watcher
struct msg_reader msg_reader; ///< Client message reader
write_queue_t write_queue; ///< Write queue
};
@ -106,7 +107,8 @@ struct app_context
// Client:
int server_fd; ///< Server connection
ev_io server_watcher; ///< Server connection watcher
ev_io server_read_watcher; ///< Server readability watcher
ev_io server_write_watcher; ///< Server writability watcher
struct msg_reader msg_reader; ///< Server message reader
write_queue_t write_queue; ///< Server write queue
@ -145,6 +147,8 @@ static void
app_init (app_context_t *self)
{
memset (self, 0, sizeof *self);
self->server_fd = -1;
self->listen_fd = -1;
msg_reader_init (&self->msg_reader);
write_queue_init (&self->write_queue);
}
@ -193,7 +197,6 @@ flush_queue (write_queue_t *queue, ev_io *watcher)
for (write_req_t *iter = queue->head; iter; iter = iter->next)
*vec_iter++ = iter->data;
int new_events = EV_READ;
ssize_t written;
again:
written = writev (watcher->fd, vec, N_ELEMENTS (vec));
@ -209,11 +212,9 @@ again:
write_queue_processed (queue, written);
skip:
if (!write_queue_is_empty (queue))
new_events |= EV_WRITE;
if (write_queue_is_empty (queue))
ev_io_stop (EV_DEFAULT_ watcher);
ev_io_set (watcher, watcher->fd, new_events);
else
ev_io_start (EV_DEFAULT_ watcher);
return true;
}
@ -230,18 +231,14 @@ static void
flush_writer_to_client (struct msg_writer *writer, client_t *client)
{
write_queue_add (&client->write_queue, flush_writer (writer));
ev_io_stop (EV_DEFAULT_ &client->watcher);
ev_io_set (&client->watcher, client->fd, EV_READ | EV_WRITE);
ev_io_start (EV_DEFAULT_ &client->watcher);
ev_io_start (EV_DEFAULT_ &client->write_watcher);
}
static void
flush_writer_to_server (struct msg_writer *writer, app_context_t *app)
{
write_queue_add (&app->write_queue, flush_writer (writer));
ev_io_stop (EV_DEFAULT_ &app->server_watcher);
ev_io_set (&app->server_watcher, app->server_fd, EV_READ | EV_WRITE);
ev_io_start (EV_DEFAULT_ &app->server_watcher);
ev_io_start (EV_DEFAULT_ &app->server_write_watcher);
}
static void
@ -983,10 +980,13 @@ typedef bool (*server_handler_fn) (app_context_t *, struct msg_unpacker *);
static void
on_server_disconnected (app_context_t *app)
{
// TODO: cancel any write requests?
// XXX: should we unref it?
write_queue_free (&app->write_queue);
write_queue_init (&app->write_queue);
ev_io_stop (EV_DEFAULT_ &app->server_read_watcher);
ev_io_stop (EV_DEFAULT_ &app->server_write_watcher);
xclose (app->server_fd);
ev_io_stop (EV_DEFAULT_ &app->server_watcher);
app->server_fd = -1;
display ("Disconnected!");
beep (); // Beep beep! Made a boo-boo.
@ -1128,13 +1128,12 @@ typedef bool (*client_handler_fn)
static void
remove_client (app_context_t *app, client_t *client)
{
// TODO: stop any watchers?
// TODO: cancel any write requests?
// XXX: should we unref it?
ev_io_stop (EV_DEFAULT_ &client->read_watcher);
ev_io_stop (EV_DEFAULT_ &client->write_watcher);
xclose (client->fd);
LIST_UNLINK (app->clients, client);
msg_reader_free (&client->msg_reader);
write_queue_free (&client->write_queue);
LIST_UNLINK (app->clients, client);
free (client);
}
@ -1269,9 +1268,13 @@ on_new_client (EV_P_ ev_io *watcher, int revents)
write_queue_init (&client->write_queue);
set_blocking (sock_fd, false);
ev_io_init (&client->watcher, on_client_ready, sock_fd, EV_READ);
client->watcher.data = client;
ev_io_start (EV_A_ &client->watcher);
ev_io_init (&client->read_watcher, on_client_ready, sock_fd, EV_READ);
ev_io_init (&client->write_watcher, on_client_ready, sock_fd, EV_WRITE);
client->read_watcher.data = client;
client->write_watcher.data = client;
// We're only interested in reading as the write queue is empty now
ev_io_start (EV_A_ &client->read_watcher);
LIST_PREPEND (app->clients, client);
}
@ -1460,8 +1463,11 @@ initialize_client (app_context_t *app, struct addrinfo *address)
set_blocking (sock_fd, false);
app->server_fd = sock_fd;
ev_io_init (&app->server_watcher, on_server_ready, sock_fd, EV_READ);
ev_io_start (EV_DEFAULT_ &app->server_watcher);
ev_io_init (&app->server_read_watcher, on_server_ready, sock_fd, EV_READ);
ev_io_init (&app->server_write_watcher, on_server_ready, sock_fd, EV_WRITE);
// We're only interested in reading as the write queue is empty now
ev_io_start (EV_DEFAULT_ &app->server_read_watcher);
send_hello_request (app);
send_get_bitmap_request (app);