Rewrite connector to use asynchronous getaddrinfo

This commit is contained in:
Přemysl Eric Janouch 2016-01-01 18:11:10 +01:00
parent 733de7bae2
commit 66340e08d7
1 changed files with 93 additions and 54 deletions

147
liberty.c
View File

@ -3916,7 +3916,7 @@ test_run (struct test *self)
// --- Connector --------------------------------------------------------------- // --- Connector ---------------------------------------------------------------
#ifdef LIBERTY_WANT_POLLER #if defined LIBERTY_WANT_POLLER && defined LIBERTY_WANT_ASYNC
// This is a helper that tries to establish a connection with any address on // This is a helper that tries to establish a connection with any address on
// a given list. Sadly it also introduces a bit of a callback hell. // a given list. Sadly it also introduces a bit of a callback hell.
@ -3924,10 +3924,14 @@ test_run (struct test *self)
struct connector_target struct connector_target
{ {
LIST_HEADER (struct connector_target) LIST_HEADER (struct connector_target)
struct connector *connector; ///< Parent connector
char *hostname; ///< Target hostname or address char *hostname; ///< Target hostname or address
char *service; ///< Target service name or port char *service; ///< Target service name or port
struct async *getaddrinfo_event; ///< Address resolution
struct error *getaddrinfo_error; ///< Address resolution error
struct addrinfo *results; ///< Resolved target struct addrinfo *results; ///< Resolved target
struct addrinfo *iter; ///< Current endpoint struct addrinfo *iter; ///< Current endpoint
}; };
@ -3935,13 +3939,18 @@ struct connector_target
static struct connector_target * static struct connector_target *
connector_target_new (void) connector_target_new (void)
{ {
struct connector_target *self = xmalloc (sizeof *self); struct connector_target *self = xcalloc (1, sizeof *self);
return self; return self;
} }
static void static void
connector_target_destroy (struct connector_target *self) connector_target_destroy (struct connector_target *self)
{ {
if (self->getaddrinfo_event)
async_cancel (self->getaddrinfo_event);
if (self->getaddrinfo_error)
error_free (self->getaddrinfo_error);
free (self->hostname); free (self->hostname);
free (self->service); free (self->service);
freeaddrinfo (self->results); freeaddrinfo (self->results);
@ -3952,6 +3961,7 @@ connector_target_destroy (struct connector_target *self)
struct connector struct connector
{ {
struct poller *poller; ///< Poller
int socket; ///< Socket FD for the connection int socket; ///< Socket FD for the connection
struct poller_fd connected_event; ///< We've connected or failed struct poller_fd connected_event; ///< We've connected or failed
struct connector_target *targets; ///< Targets struct connector_target *targets; ///< Targets
@ -3983,28 +3993,24 @@ connector_notify_connecting (struct connector *self,
return; return;
const char *real_host = target->hostname; const char *real_host = target->hostname;
// We don't really need this, so we can let it quietly fail
char buf[NI_MAXHOST]; char buf[NI_MAXHOST];
int err = getnameinfo (gai_iter->ai_addr, gai_iter->ai_addrlen,
buf, sizeof buf, NULL, 0, NI_NUMERICHOST); if (gai_iter)
if (err) {
LOG_FUNC_FAILURE ("getnameinfo", gai_strerror (err)); // We don't really need this, so we can let it quietly fail
else int err = getnameinfo (gai_iter->ai_addr, gai_iter->ai_addrlen,
real_host = buf; buf, sizeof buf, NULL, 0, NI_NUMERICHOST);
if (err)
LOG_FUNC_FAILURE ("getnameinfo", gai_strerror (err));
else
real_host = buf;
}
char *address = format_host_port_pair (real_host, target->service); char *address = format_host_port_pair (real_host, target->service);
self->on_connecting (self->user_data, address); self->on_connecting (self->user_data, address);
free (address); free (address);
} }
static void
connector_notify_error (struct connector *self, const char *error)
{
if (self->on_error)
self->on_error (self->user_data, error);
}
static void static void
connector_notify_connected (struct connector *self, int fd) connector_notify_connected (struct connector *self, int fd)
{ {
@ -4016,23 +4022,38 @@ static void
connector_prepare_next (struct connector *self) connector_prepare_next (struct connector *self)
{ {
struct connector_target *target = self->targets; struct connector_target *target = self->targets;
if (!(target->iter = target->iter->ai_next)) if (!target->iter || !(target->iter = target->iter->ai_next))
{ {
LIST_UNLINK_WITH_TAIL (self->targets, self->targets_t, target); LIST_UNLINK_WITH_TAIL (self->targets, self->targets_t, target);
connector_target_destroy (target); connector_target_destroy (target);
} }
} }
static void connector_handle_error (struct connector *self, const char *error);
/// See if there's any target remaining at all -- it can however either still
/// be waiting for address resolution to finish, or have already failed
static bool
connector_check_target (struct connector *self, struct connector_target *target)
{
if (!target)
self->on_failure (self->user_data);
else if (target->getaddrinfo_error)
{
connector_notify_connecting (self, target, NULL);
connector_handle_error (self, target->getaddrinfo_error->message);
}
else if (target->results)
return true;
return false;
}
static void static void
connector_step (struct connector *self) connector_step (struct connector *self)
{ {
struct connector_target *target = self->targets; struct connector_target *target = self->targets;
if (!target) if (!connector_check_target (self, target))
{
// Total failure, none of the targets has succeeded
self->on_failure (self->user_data);
return; return;
}
struct addrinfo *gai_iter = target->iter; struct addrinfo *gai_iter = target->iter;
hard_assert (gai_iter != NULL); hard_assert (gai_iter != NULL);
@ -4043,10 +4064,7 @@ connector_step (struct connector *self)
gai_iter->ai_socktype, gai_iter->ai_protocol); gai_iter->ai_socktype, gai_iter->ai_protocol);
if (fd == -1) if (fd == -1)
{ {
connector_notify_error (self, strerror (errno)); connector_handle_error (self, strerror (errno));
connector_prepare_next (self);
connector_step (self);
return; return;
} }
@ -4066,14 +4084,21 @@ connector_step (struct connector *self)
} }
else else
{ {
connector_notify_error (self, strerror (errno)); connector_handle_error (self, strerror (errno));
xclose (fd); xclose (fd);
connector_prepare_next (self);
connector_step (self);
} }
} }
static void
connector_handle_error (struct connector *self, const char *error)
{
if (self->on_error)
self->on_error (self->user_data, error);
connector_prepare_next (self);
connector_step (self);
}
static void static void
connector_on_ready (const struct pollfd *pfd, struct connector *self) connector_on_ready (const struct pollfd *pfd, struct connector *self)
{ {
@ -4087,14 +4112,11 @@ connector_on_ready (const struct pollfd *pfd, struct connector *self)
if (error) if (error)
{ {
connector_notify_error (self, strerror (error));
poller_fd_reset (&self->connected_event); poller_fd_reset (&self->connected_event);
xclose (self->socket); xclose (self->socket);
self->socket = -1; self->socket = -1;
connector_prepare_next (self); connector_handle_error (self, strerror (error));
connector_step (self);
} }
else else
{ {
@ -4108,6 +4130,7 @@ static void
connector_init (struct connector *self, struct poller *poller) connector_init (struct connector *self, struct poller *poller)
{ {
memset (self, 0, sizeof *self); memset (self, 0, sizeof *self);
self->poller = poller;
self->socket = -1; self->socket = -1;
poller_fd_init (&self->connected_event, poller, self->socket); poller_fd_init (&self->connected_event, poller, self->socket);
self->connected_event.user_data = self; self->connected_event.user_data = self;
@ -4125,34 +4148,50 @@ connector_free (struct connector *self)
connector_target_destroy (iter); connector_target_destroy (iter);
} }
static bool static void
connector_add_target (struct connector *self, connector_on_getaddrinfo (int err, struct addrinfo *results, void *user_data)
const char *hostname, const char *service, struct error **e)
{ {
struct addrinfo hints, *results; struct connector_target *self = user_data;
if (err)
{
error_set (&self->getaddrinfo_error,
"%s: %s", "getaddrinfo", gai_strerror (err));
}
self->results = self->iter = results;
self->getaddrinfo_event = NULL;
// We've been waiting for this address to be resolved
if (self == self->connector->targets)
connector_step (self->connector);
}
/// Connection will be attempted asynchronously once you add any target
static void
connector_add_target (struct connector *self,
const char *hostname, const char *service)
{
struct connector_target *target = connector_target_new ();
target->connector = self;
target->hostname = xstrdup (hostname);
target->service = xstrdup (service);
struct addrinfo hints;
memset (&hints, 0, sizeof hints); memset (&hints, 0, sizeof hints);
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
// TODO: even this should be done asynchronously, most likely in struct async_getaddrinfo *gai = async_getaddrinfo
// a thread pool, similarly to how libuv does it (&self->poller->common.async, hostname, service, &hints);
int err = getaddrinfo (hostname, service, &hints, &results);
if (err)
{
error_set (e, "%s: %s", "getaddrinfo", gai_strerror (err));
return false;
}
struct connector_target *target = connector_target_new (); gai->dispatcher = connector_on_getaddrinfo;
target->hostname = xstrdup (hostname); gai->user_data = target;
target->service = xstrdup (service); target->getaddrinfo_event = &gai->async;
target->results = results;
target->iter = target->results;
LIST_APPEND_WITH_TAIL (self->targets, self->targets_t, target); LIST_APPEND_WITH_TAIL (self->targets, self->targets_t, target);
return true;
} }
#endif // LIBERTY_WANT_POLLER #endif // defined LIBERTY_WANT_POLLER && defined LIBERTY_WANT_ASYNC
// --- Advanced configuration -------------------------------------------------- // --- Advanced configuration --------------------------------------------------