xC: reorganize relay code, improve logging
Even with one forward function declaration down, it was possible to move most code to a more convenient location. Most logging has thus been fixed to go to buffers.
This commit is contained in:
parent
126105fa4f
commit
840b646700
452
xC.c
452
xC.c
|
@ -1756,7 +1756,6 @@ client_destroy (struct client *self)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void client_kill (struct client *c);
|
static void client_kill (struct client *c);
|
||||||
static bool client_process_buffer (struct client *c);
|
|
||||||
|
|
||||||
// ~~~ Server ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
// ~~~ Server ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
@ -2808,7 +2807,7 @@ serialize_configuration (struct config_item *root, struct str *output)
|
||||||
config_item_write (root, true, output);
|
config_item_write (root, true, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Relay plumbing ----------------------------------------------------------
|
// --- Relay output ------------------------------------------------------------
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_kill (struct client *c)
|
client_kill (struct client *c)
|
||||||
|
@ -2822,59 +2821,6 @@ client_kill (struct client *c)
|
||||||
client_destroy (c);
|
client_destroy (c);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
|
||||||
client_try_read (struct client *c)
|
|
||||||
{
|
|
||||||
struct str *buf = &c->read_buffer;
|
|
||||||
ssize_t n_read;
|
|
||||||
|
|
||||||
while ((n_read = read (c->socket_fd, buf->str + buf->len,
|
|
||||||
buf->alloc - buf->len - 1 /* null byte */)) > 0)
|
|
||||||
{
|
|
||||||
buf->len += n_read;
|
|
||||||
if (!client_process_buffer (c))
|
|
||||||
break;
|
|
||||||
str_reserve (buf, 512);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n_read < 0)
|
|
||||||
{
|
|
||||||
if (errno == EAGAIN || errno == EINTR)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
print_debug ("%s: %s: %s", __func__, "read", strerror (errno));
|
|
||||||
}
|
|
||||||
|
|
||||||
client_kill (c);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
client_try_write (struct client *c)
|
|
||||||
{
|
|
||||||
struct str *buf = &c->write_buffer;
|
|
||||||
ssize_t n_written;
|
|
||||||
|
|
||||||
while (buf->len)
|
|
||||||
{
|
|
||||||
n_written = write (c->socket_fd, buf->str, buf->len);
|
|
||||||
if (n_written >= 0)
|
|
||||||
{
|
|
||||||
str_remove_slice (buf, 0, n_written);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (errno == EAGAIN || errno == EINTR)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
print_debug ("%s: %s: %s", __func__, "write", strerror (errno));
|
|
||||||
client_kill (c);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_update_poller (struct client *c, const struct pollfd *pfd)
|
client_update_poller (struct client *c, const struct pollfd *pfd)
|
||||||
{
|
{
|
||||||
|
@ -2887,177 +2833,8 @@ client_update_poller (struct client *c, const struct pollfd *pfd)
|
||||||
poller_fd_set (&c->socket_event, new_events);
|
poller_fd_set (&c->socket_event, new_events);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
on_client_ready (const struct pollfd *pfd, void *user_data)
|
|
||||||
{
|
|
||||||
struct client *c = user_data;
|
|
||||||
if (client_try_read (c) && client_try_write (c))
|
|
||||||
client_update_poller (c, pfd);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
relay_try_fetch_client (struct app_context *ctx, int listen_fd)
|
|
||||||
{
|
|
||||||
// XXX: `struct sockaddr_storage' is not the most portable thing
|
|
||||||
struct sockaddr_storage peer;
|
|
||||||
socklen_t peer_len = sizeof peer;
|
|
||||||
|
|
||||||
int fd = accept (listen_fd, (struct sockaddr *) &peer, &peer_len);
|
|
||||||
if (fd == -1)
|
|
||||||
{
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
|
||||||
return false;
|
|
||||||
if (errno == EINTR)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
// TODO: Try to make sure these find their way to the global buffer.
|
|
||||||
if (accept_error_is_transient (errno))
|
|
||||||
{
|
|
||||||
print_warning ("%s: %s", "accept", strerror (errno));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
print_error ("%s: %s", "accept", strerror (errno));
|
|
||||||
app_context_relay_stop (ctx);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
hard_assert (peer_len <= sizeof peer);
|
|
||||||
set_blocking (fd, false);
|
|
||||||
set_cloexec (fd);
|
|
||||||
|
|
||||||
// We already buffer our output, so reduce latencies.
|
|
||||||
int yes = 1;
|
|
||||||
soft_assert (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
|
|
||||||
&yes, sizeof yes) != -1);
|
|
||||||
|
|
||||||
struct client *c = client_new ();
|
|
||||||
c->ctx = ctx;
|
|
||||||
c->socket_fd = fd;
|
|
||||||
LIST_PREPEND (ctx->clients, c);
|
|
||||||
|
|
||||||
c->socket_event = poller_fd_make (&c->ctx->poller, c->socket_fd);
|
|
||||||
c->socket_event.dispatcher = (poller_fd_fn) on_client_ready;
|
|
||||||
c->socket_event.user_data = c;
|
|
||||||
|
|
||||||
client_update_poller (c, NULL);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
on_relay_client_available (const struct pollfd *pfd, void *user_data)
|
|
||||||
{
|
|
||||||
struct app_context *ctx = user_data;
|
|
||||||
while (relay_try_fetch_client (ctx, pfd->fd))
|
|
||||||
;
|
|
||||||
}
|
|
||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
static int
|
|
||||||
relay_listen (struct addrinfo *ai, struct error **e)
|
|
||||||
{
|
|
||||||
int fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol);
|
|
||||||
if (fd == -1)
|
|
||||||
{
|
|
||||||
error_set (e, "socket: %s", strerror (errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
set_cloexec (fd);
|
|
||||||
|
|
||||||
int yes = 1;
|
|
||||||
soft_assert (setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE,
|
|
||||||
&yes, sizeof yes) != -1);
|
|
||||||
soft_assert (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR,
|
|
||||||
&yes, sizeof yes) != -1);
|
|
||||||
|
|
||||||
if (bind (fd, ai->ai_addr, ai->ai_addrlen))
|
|
||||||
error_set (e, "bind: %s", strerror (errno));
|
|
||||||
else if (listen (fd, 16 /* arbitrary number */))
|
|
||||||
error_set (e, "listen: %s", strerror (errno));
|
|
||||||
else
|
|
||||||
return fd;
|
|
||||||
|
|
||||||
xclose (fd);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
relay_listen_with_context (struct addrinfo *ai, struct error **e)
|
|
||||||
{
|
|
||||||
char *address = gai_reconstruct_address (ai);
|
|
||||||
print_debug ("binding to `%s'", address);
|
|
||||||
|
|
||||||
struct error *error = NULL;
|
|
||||||
int fd = relay_listen (ai, &error);
|
|
||||||
if (fd == -1)
|
|
||||||
{
|
|
||||||
error_set (e, "binding to `%s' failed: %s", address, error->message);
|
|
||||||
error_free (error);
|
|
||||||
}
|
|
||||||
free (address);
|
|
||||||
return fd;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
relay_start (struct app_context *ctx, char *address, struct error **e)
|
|
||||||
{
|
|
||||||
const char *port = NULL, *host = tokenize_host_port (address, &port);
|
|
||||||
if (!port || !*port)
|
|
||||||
return error_set (e, "missing port");
|
|
||||||
|
|
||||||
struct addrinfo hints = {}, *result = NULL;
|
|
||||||
hints.ai_socktype = SOCK_STREAM;
|
|
||||||
hints.ai_flags = AI_PASSIVE;
|
|
||||||
|
|
||||||
int err = getaddrinfo (*host ? host : NULL, port, &hints, &result);
|
|
||||||
if (err)
|
|
||||||
{
|
|
||||||
return error_set (e, "failed to resolve `%s', port `%s': %s: %s",
|
|
||||||
host, port, "getaddrinfo", gai_strerror (err));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Just try the first one, disregarding IPv4/IPv6 ordering.
|
|
||||||
int fd = relay_listen_with_context (result, e);
|
|
||||||
freeaddrinfo (result);
|
|
||||||
if (fd == -1)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
set_blocking (fd, false);
|
|
||||||
|
|
||||||
struct poller_fd *event = &ctx->relay_event;
|
|
||||||
*event = poller_fd_make (&ctx->poller, fd);
|
|
||||||
event->dispatcher = (poller_fd_fn) on_relay_client_available;
|
|
||||||
event->user_data = ctx;
|
|
||||||
|
|
||||||
ctx->relay_fd = fd;
|
|
||||||
poller_fd_set (event, POLLIN);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
on_config_relay_bind_change (struct config_item *item)
|
|
||||||
{
|
|
||||||
struct app_context *ctx = item->user_data;
|
|
||||||
char *value = item->value.string.str;
|
|
||||||
app_context_relay_stop (ctx);
|
|
||||||
if (!value)
|
|
||||||
return;
|
|
||||||
|
|
||||||
struct error *e = NULL;
|
|
||||||
char *address = xstrdup (value);
|
|
||||||
if (!relay_start (ctx, address, &e))
|
|
||||||
{
|
|
||||||
// TODO: Try to make sure this finds its way to the global buffer.
|
|
||||||
print_error ("%s: %s", item->schema->name, e->message);
|
|
||||||
error_free (e);
|
|
||||||
}
|
|
||||||
free (address);
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Relay output ------------------------------------------------------------
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
relay_send (struct client *c)
|
relay_send (struct client *c)
|
||||||
{
|
{
|
||||||
|
@ -15637,6 +15414,233 @@ client_process_buffer (struct client *c)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Relay plumbing ----------------------------------------------------------
|
||||||
|
|
||||||
|
static bool
|
||||||
|
client_try_read (struct client *c)
|
||||||
|
{
|
||||||
|
struct str *buf = &c->read_buffer;
|
||||||
|
ssize_t n_read;
|
||||||
|
|
||||||
|
while ((n_read = read (c->socket_fd, buf->str + buf->len,
|
||||||
|
buf->alloc - buf->len - 1 /* null byte */)) > 0)
|
||||||
|
{
|
||||||
|
buf->len += n_read;
|
||||||
|
if (!client_process_buffer (c))
|
||||||
|
break;
|
||||||
|
str_reserve (buf, 512);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n_read < 0)
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN || errno == EINTR)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
log_global_debug (c->ctx,
|
||||||
|
"#s: #s: #l", __func__, "read", strerror (errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
client_kill (c);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
client_try_write (struct client *c)
|
||||||
|
{
|
||||||
|
struct str *buf = &c->write_buffer;
|
||||||
|
ssize_t n_written;
|
||||||
|
|
||||||
|
while (buf->len)
|
||||||
|
{
|
||||||
|
n_written = write (c->socket_fd, buf->str, buf->len);
|
||||||
|
if (n_written >= 0)
|
||||||
|
{
|
||||||
|
str_remove_slice (buf, 0, n_written);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (errno == EAGAIN || errno == EINTR)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
log_global_debug (c->ctx,
|
||||||
|
"#s: #s: #l", __func__, "write", strerror (errno));
|
||||||
|
client_kill (c);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
on_client_ready (const struct pollfd *pfd, void *user_data)
|
||||||
|
{
|
||||||
|
struct client *c = user_data;
|
||||||
|
if (client_try_read (c) && client_try_write (c))
|
||||||
|
client_update_poller (c, pfd);
|
||||||
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
static bool
|
||||||
|
relay_try_fetch_client (struct app_context *ctx, int listen_fd)
|
||||||
|
{
|
||||||
|
// XXX: `struct sockaddr_storage' is not the most portable thing
|
||||||
|
struct sockaddr_storage peer;
|
||||||
|
socklen_t peer_len = sizeof peer;
|
||||||
|
|
||||||
|
int fd = accept (listen_fd, (struct sockaddr *) &peer, &peer_len);
|
||||||
|
if (fd == -1)
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||||
|
return false;
|
||||||
|
if (errno == EINTR)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (accept_error_is_transient (errno))
|
||||||
|
{
|
||||||
|
log_global_debug (ctx, "#s: #l", "accept", strerror (errno));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
log_global_error (ctx, "#s: #l", "accept", strerror (errno));
|
||||||
|
app_context_relay_stop (ctx);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
hard_assert (peer_len <= sizeof peer);
|
||||||
|
set_blocking (fd, false);
|
||||||
|
set_cloexec (fd);
|
||||||
|
|
||||||
|
// We already buffer our output, so reduce latencies.
|
||||||
|
int yes = 1;
|
||||||
|
soft_assert (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY,
|
||||||
|
&yes, sizeof yes) != -1);
|
||||||
|
|
||||||
|
struct client *c = client_new ();
|
||||||
|
c->ctx = ctx;
|
||||||
|
c->socket_fd = fd;
|
||||||
|
LIST_PREPEND (ctx->clients, c);
|
||||||
|
|
||||||
|
c->socket_event = poller_fd_make (&c->ctx->poller, c->socket_fd);
|
||||||
|
c->socket_event.dispatcher = (poller_fd_fn) on_client_ready;
|
||||||
|
c->socket_event.user_data = c;
|
||||||
|
|
||||||
|
client_update_poller (c, NULL);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
on_relay_client_available (const struct pollfd *pfd, void *user_data)
|
||||||
|
{
|
||||||
|
struct app_context *ctx = user_data;
|
||||||
|
while (relay_try_fetch_client (ctx, pfd->fd))
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
static int
|
||||||
|
relay_listen (struct addrinfo *ai, struct error **e)
|
||||||
|
{
|
||||||
|
int fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol);
|
||||||
|
if (fd == -1)
|
||||||
|
{
|
||||||
|
error_set (e, "socket: %s", strerror (errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
set_cloexec (fd);
|
||||||
|
|
||||||
|
int yes = 1;
|
||||||
|
soft_assert (setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE,
|
||||||
|
&yes, sizeof yes) != -1);
|
||||||
|
soft_assert (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR,
|
||||||
|
&yes, sizeof yes) != -1);
|
||||||
|
|
||||||
|
if (bind (fd, ai->ai_addr, ai->ai_addrlen))
|
||||||
|
error_set (e, "bind: %s", strerror (errno));
|
||||||
|
else if (listen (fd, 16 /* arbitrary number */))
|
||||||
|
error_set (e, "listen: %s", strerror (errno));
|
||||||
|
else
|
||||||
|
return fd;
|
||||||
|
|
||||||
|
xclose (fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
relay_listen_with_context (struct app_context *ctx, struct addrinfo *ai,
|
||||||
|
struct error **e)
|
||||||
|
{
|
||||||
|
char *address = gai_reconstruct_address (ai);
|
||||||
|
log_global_debug (ctx, "binding to `#l'", address);
|
||||||
|
|
||||||
|
struct error *error = NULL;
|
||||||
|
int fd = relay_listen (ai, &error);
|
||||||
|
if (fd == -1)
|
||||||
|
{
|
||||||
|
error_set (e, "binding to `%s' failed: %s", address, error->message);
|
||||||
|
error_free (error);
|
||||||
|
}
|
||||||
|
free (address);
|
||||||
|
return fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
relay_start (struct app_context *ctx, char *address, struct error **e)
|
||||||
|
{
|
||||||
|
const char *port = NULL, *host = tokenize_host_port (address, &port);
|
||||||
|
if (!port || !*port)
|
||||||
|
return error_set (e, "missing port");
|
||||||
|
|
||||||
|
struct addrinfo hints = {}, *result = NULL;
|
||||||
|
hints.ai_socktype = SOCK_STREAM;
|
||||||
|
hints.ai_flags = AI_PASSIVE;
|
||||||
|
|
||||||
|
int err = getaddrinfo (*host ? host : NULL, port, &hints, &result);
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
return error_set (e, "failed to resolve `%s', port `%s': %s: %s",
|
||||||
|
host, port, "getaddrinfo", gai_strerror (err));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just try the first one, disregarding IPv4/IPv6 ordering.
|
||||||
|
int fd = relay_listen_with_context (ctx, result, e);
|
||||||
|
freeaddrinfo (result);
|
||||||
|
if (fd == -1)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
set_blocking (fd, false);
|
||||||
|
|
||||||
|
struct poller_fd *event = &ctx->relay_event;
|
||||||
|
*event = poller_fd_make (&ctx->poller, fd);
|
||||||
|
event->dispatcher = (poller_fd_fn) on_relay_client_available;
|
||||||
|
event->user_data = ctx;
|
||||||
|
|
||||||
|
ctx->relay_fd = fd;
|
||||||
|
poller_fd_set (event, POLLIN);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
on_config_relay_bind_change (struct config_item *item)
|
||||||
|
{
|
||||||
|
struct app_context *ctx = item->user_data;
|
||||||
|
char *value = item->value.string.str;
|
||||||
|
app_context_relay_stop (ctx);
|
||||||
|
if (!value)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// XXX: This should perhaps be reencoded as the locale encoding.
|
||||||
|
char *address = xstrdup (value);
|
||||||
|
|
||||||
|
struct error *e = NULL;
|
||||||
|
if (!relay_start (ctx, address, &e))
|
||||||
|
{
|
||||||
|
log_global_error (ctx, "#s: #l", item->schema->name, e->message);
|
||||||
|
error_free (e);
|
||||||
|
}
|
||||||
|
free (address);
|
||||||
|
}
|
||||||
|
|
||||||
// --- Tests -------------------------------------------------------------------
|
// --- Tests -------------------------------------------------------------------
|
||||||
|
|
||||||
// The application is quite monolithic and can only be partially unit-tested.
|
// The application is quite monolithic and can only be partially unit-tested.
|
||||||
|
|
Loading…
Reference in New Issue