Steady progress

I'm trying to figure out everything at once, i.e. the entire structure
of the application from top to bottom, trying to converge on a workable
design while refactoring still doesn't hurt as much as it would once
it's established.
This commit is contained in:
Přemysl Eric Janouch 2015-03-02 23:11:29 +01:00
parent 8a3241d5c4
commit a54230bddb
1 changed files with 296 additions and 132 deletions

View File

@ -18,8 +18,6 @@
* *
*/ */
#define LIBERTY_WANT_SSL
#define print_fatal_data ((void *) LOG_ERR) #define print_fatal_data ((void *) LOG_ERR)
#define print_error_data ((void *) LOG_ERR) #define print_error_data ((void *) LOG_ERR)
#define print_warning_data ((void *) LOG_WARNING) #define print_warning_data ((void *) LOG_WARNING)
@ -68,6 +66,61 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value)
#undef UNPACKER_INT_BEGIN #undef UNPACKER_INT_BEGIN
// --- libev helpers -----------------------------------------------------------
static bool
read_loop (EV_P_ ev_io *watcher,
bool (*cb) (EV_P_ ev_io *, const void *, ssize_t))
{
char buf[8192];
while (true)
{
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
if (n_read < 0)
{
if (errno == EAGAIN)
break;
if (errno == EINTR)
continue;
}
// The callback is called on EOF as well
if (n_read < 0 || !cb (EV_A_ watcher, buf, n_read))
return false;
if (!n_read)
return false;
}
return true;
}
static bool
flush_queue (write_queue_t *queue, ev_io *watcher)
{
struct iovec vec[queue->len], *vec_iter = vec;
for (write_req_t *iter = queue->head; iter; iter = iter->next)
*vec_iter++ = iter->data;
ssize_t written;
again:
written = writev (watcher->fd, vec, N_ELEMENTS (vec));
if (written < 0)
{
if (errno == EAGAIN)
goto skip;
if (errno == EINTR)
goto again;
return false;
}
write_queue_processed (queue, written);
skip:
if (write_queue_is_empty (queue))
ev_io_stop (EV_DEFAULT_ watcher);
else
ev_io_start (EV_DEFAULT_ watcher);
return true;
}
// --- Logging ----------------------------------------------------------------- // --- Logging -----------------------------------------------------------------
static void static void
@ -88,16 +141,6 @@ log_message_syslog (void *user_data, const char *quote, const char *fmt,
syslog (prio, "%s%s", quote, buf); syslog (prio, "%s%s", quote, buf);
} }
// --- Configuration (application-specific) ------------------------------------
static struct config_item g_config_table[] =
{
{ "bind_host", NULL, "Address of the server" },
{ "port_fastcgi", "9000", "Port to bind for FastCGI" },
{ "port_scgi", NULL, "Port to bind for SCGI" },
{ NULL, NULL, NULL }
};
// --- FastCGI ----------------------------------------------------------------- // --- FastCGI -----------------------------------------------------------------
// Constants from the FastCGI specification document // Constants from the FastCGI specification document
@ -472,9 +515,21 @@ scgi_parser_free (struct scgi_parser *self)
static bool static bool
scgi_parser_push (struct scgi_parser *self, scgi_parser_push (struct scgi_parser *self,
void *data, size_t len, struct error **e) const void *data, size_t len, struct error **e)
{ {
// This retarded netstring madness is even more complicated than FastCGI; if (!len)
{
if (self->state != SCGI_READING_CONTENT)
{
error_set (e, "premature EOF");
return false;
}
// TODO: a "on_eof" callback?
return true;
}
// Notice that this madness is significantly harder to parse than FastCGI;
// this procedure could also be optimized significantly // this procedure could also be optimized significantly
str_append_data (&self->input, data, len); str_append_data (&self->input, data, len);
@ -526,6 +581,7 @@ scgi_parser_push (struct scgi_parser *self,
return false; return false;
} }
self->state = SCGI_READING_CONTENT; self->state = SCGI_READING_CONTENT;
// TODO: a "on_headers_read" callback?
} }
else if (c != '\0') else if (c != '\0')
str_append_c (&self->name, c); str_append_c (&self->name, c);
@ -565,58 +621,21 @@ scgi_parser_push (struct scgi_parser *self,
break; break;
} }
case SCGI_READING_CONTENT: case SCGI_READING_CONTENT:
// TODO: I have no idea what to do with the contents // TODO: a "on_content" callback?
return true; return true;
break; break;
} }
} }
// --- ? ----------------------------------------------------------------------- // --- Server ------------------------------------------------------------------
// TODO static struct config_item g_config_table[] =
struct client
{ {
LIST_HEADER (struct client) { "bind_host", NULL, "Address of the server" },
{ "port_fastcgi", "9000", "Port to bind for FastCGI" },
struct server_context *ctx; ///< Server context { "port_scgi", NULL, "Port to bind for SCGI" },
{ NULL, NULL, NULL }
int socket_fd; ///< The TCP socket
struct str read_buffer; ///< Unprocessed input
write_queue_t write_queue; ///< Write queue
ev_io read_watcher; ///< The socket can be read from
ev_io write_watcher; ///< The socket can be written to
};
static void
client_init (struct client *self)
{
memset (self, 0, sizeof *self);
str_init (&self->read_buffer);
write_queue_init (&self->write_queue);
}
static void
client_free (struct client *self)
{
str_free (&self->read_buffer);
write_queue_free (&self->write_queue);
}
// --- ? -----------------------------------------------------------------------
enum listener_type
{
LISTENER_FCGI, ///< FastCGI
LISTENER_SCGI ///< SCGI
};
struct listener
{
int fd; ///< Listening socket FD
ev_io watcher; ///< New connection available
enum listener_type type; ///< The protocol
}; };
struct server_context struct server_context
@ -652,67 +671,218 @@ server_context_free (struct server_context *self)
str_map_free (&self->config); str_map_free (&self->config);
} }
// --- ? ----------------------------------------------------------------------- // --- JSON-RPC ----------------------------------------------------------------
// TODO: this is where we're actually supposed to do JSON-RPC 2.0 processing
// There's probably no reason to create an object for this.
//
// We probably just want a handler function that takes a JSON string, parses it,
// and returns back another JSON string.
//
// Then there should be another function that takes a parsed JSON request and
// returns back a JSON reply. This function may get called multiple times if
// the user sends a batch request.
// --- Requests ----------------------------------------------------------------
// TODO: something to read in the headers and decide what to do with the request
// e.g. whether to reject it with a 404, or do JSON-RPC, or ignore it with 200
#if 0
// This doesn't necessarily have to be an object by itself either; we can have
// a function that does/returns something based on the headers
struct request
{
};
static void
request_init (struct request *self)
{
}
static void
request_free (struct request *self)
{
}
#endif
// --- Client communication handlers -------------------------------------------
struct client
{
LIST_HEADER (struct client)
struct server_context *ctx; ///< Server context
int socket_fd; ///< The TCP socket
write_queue_t write_queue; ///< Write queue
ev_io read_watcher; ///< The socket can be read from
ev_io write_watcher; ///< The socket can be written to
struct client_impl *impl; ///< Client behaviour
void *impl_data; ///< Client behaviour data
};
struct client_impl
{
/// Initialize the client as needed
void (*init) (struct client *client);
/// Do any additional cleanup
void (*destroy) (struct client *client);
/// Process incoming data; "len == 0" means EOF
bool (*on_data) (struct client *client, const void *data, size_t len);
};
static void
client_init (struct client *self)
{
memset (self, 0, sizeof *self);
write_queue_init (&self->write_queue);
}
static void
client_free (struct client *self)
{
write_queue_free (&self->write_queue);
}
static void
client_write (struct client *client, const void *data, size_t len)
{
write_req_t *req = xcalloc (1, sizeof *req);
req->data.iov_base = memcpy (xmalloc (len), data, len);
req->data.iov_len = len;
write_queue_add (&client->write_queue, req);
ev_io_start (EV_DEFAULT_ &client->write_watcher);
}
// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct client_fcgi
{
struct fcgi_parser parser; ///< FastCGI stream parser
};
static void
client_fcgi_init (struct client *client)
{
struct client_fcgi *self = xcalloc (1, sizeof *self);
client->impl_data = self;
fcgi_parser_init (&self->parser);
// TODO: configure the parser
}
static void
client_fcgi_destroy (struct client *client)
{
struct client_fcgi *self = client->impl_data;
client->impl_data = NULL;
fcgi_parser_free (&self->parser);
free (self);
}
static bool
client_fcgi_on_data (struct client *client, const void *data, size_t len)
{
struct client_fcgi *self = client->impl_data;
fcgi_parser_push (&self->parser, data, len);
return true;
}
static struct client_impl g_client_fcgi =
{
.init = client_fcgi_init,
.destroy = client_fcgi_destroy,
.on_data = client_fcgi_on_data,
};
// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
struct client_scgi
{
struct scgi_parser parser; ///< SCGI stream parser
};
static void
client_scgi_init (struct client *client)
{
struct client_scgi *self = xcalloc (1, sizeof *self);
client->impl_data = self;
scgi_parser_init (&self->parser);
// TODO: configure the parser
}
static void
client_scgi_destroy (struct client *client)
{
struct client_scgi *self = client->impl_data;
client->impl_data = NULL;
scgi_parser_free (&self->parser);
free (self);
}
static bool
client_scgi_on_data (struct client *client, const void *data, size_t len)
{
struct client_scgi *self = client->impl_data;
struct error *e = NULL;
if (scgi_parser_push (&self->parser, data, len, &e))
return true;
print_debug ("SCGI parser failed: %s", e->message);
error_free (e);
return false;
}
static struct client_impl g_client_scgi =
{
.init = client_scgi_init,
.destroy = client_scgi_destroy,
.on_data = client_scgi_on_data,
};
// --- Basic server stuff ------------------------------------------------------
struct listener
{
int fd; ///< Listening socket FD
ev_io watcher; ///< New connection available
struct client_impl *impl; ///< Client behaviour
};
static void static void
remove_client (struct server_context *ctx, struct client *client) remove_client (struct server_context *ctx, struct client *client)
{ {
LIST_UNLINK (ctx->clients, client);
ctx->n_clients--;
// First uninitialize the higher-level implementation
client->impl->destroy (client);
ev_io_stop (EV_DEFAULT_ &client->read_watcher); ev_io_stop (EV_DEFAULT_ &client->read_watcher);
ev_io_stop (EV_DEFAULT_ &client->write_watcher); ev_io_stop (EV_DEFAULT_ &client->write_watcher);
xclose (client->socket_fd); xclose (client->socket_fd);
LIST_UNLINK (ctx->clients, client);
client_free (client); client_free (client);
free (client); free (client);
} }
static bool static bool
read_loop (EV_P_ ev_io *watcher, on_client_data (EV_P_ ev_io *watcher, const void *buf, ssize_t n_read)
bool (*cb) (EV_P_ ev_io *, const void *, ssize_t))
{ {
char buf[8192]; (void) loop;
while (true)
{
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
if (n_read < 0)
{
if (errno == EAGAIN)
break;
if (errno == EINTR)
continue;
}
if (n_read <= 0 || !cb (EV_A_ watcher, buf, n_read))
return false;
}
return true;
}
static bool struct client *client = watcher->data;
flush_queue (write_queue_t *queue, ev_io *watcher) return client->impl->on_data (client, buf, n_read);
{
struct iovec vec[queue->len], *vec_iter = vec;
for (write_req_t *iter = queue->head; iter; iter = iter->next)
*vec_iter++ = iter->data;
ssize_t written;
again:
written = writev (watcher->fd, vec, N_ELEMENTS (vec));
if (written < 0)
{
if (errno == EAGAIN)
goto skip;
if (errno == EINTR)
goto again;
return false;
}
write_queue_processed (queue, written);
skip:
if (write_queue_is_empty (queue))
ev_io_stop (EV_DEFAULT_ watcher);
else
ev_io_start (EV_DEFAULT_ watcher);
return true;
} }
static void static void
@ -734,12 +904,12 @@ error:
} }
static void static void
on_fcgi_client_available (EV_P_ ev_io *watcher, int revents) on_client_available (EV_P_ ev_io *watcher, int revents)
{ {
struct server_context *ctx = ev_userdata (loop); struct server_context *ctx = ev_userdata (loop);
struct listener *listener = watcher->data;
(void) revents; (void) revents;
// TODO
while (true) while (true)
{ {
int sock_fd = accept (watcher->fd, NULL, NULL); int sock_fd = accept (watcher->fd, NULL, NULL);
@ -759,11 +929,13 @@ on_fcgi_client_available (EV_P_ ev_io *watcher, int revents)
break; break;
} }
set_blocking (sock_fd, false);
struct client *client = xmalloc (sizeof *client); struct client *client = xmalloc (sizeof *client);
client_init (client); client_init (client);
client->socket_fd = sock_fd; client->socket_fd = sock_fd;
client->impl = listener->impl;
set_blocking (sock_fd, false);
ev_io_init (&client->read_watcher, on_client_ready, sock_fd, EV_READ); ev_io_init (&client->read_watcher, on_client_ready, sock_fd, EV_READ);
ev_io_init (&client->write_watcher, on_client_ready, sock_fd, EV_WRITE); ev_io_init (&client->write_watcher, on_client_ready, sock_fd, EV_WRITE);
client->read_watcher.data = client; client->read_watcher.data = client;
@ -772,6 +944,9 @@ on_fcgi_client_available (EV_P_ ev_io *watcher, int revents)
// We're only interested in reading as the write queue is empty now // We're only interested in reading as the write queue is empty now
ev_io_start (EV_A_ &client->read_watcher); ev_io_start (EV_A_ &client->read_watcher);
// Initialize the higher-level implementation
client->impl->init (client);
LIST_PREPEND (ctx->clients, client); LIST_PREPEND (ctx->clients, client);
ctx->n_clients++; ctx->n_clients++;
} }
@ -792,7 +967,7 @@ parse_config (struct server_context *ctx, struct error **e)
} }
static int static int
listen_finish (struct addrinfo *gai_iter) listener_finish (struct addrinfo *gai_iter)
{ {
int fd = socket (gai_iter->ai_family, int fd = socket (gai_iter->ai_family,
gai_iter->ai_socktype, gai_iter->ai_protocol); gai_iter->ai_socktype, gai_iter->ai_protocol);
@ -832,8 +1007,8 @@ listen_finish (struct addrinfo *gai_iter)
} }
static void static void
listen_resolve (struct server_context *ctx, const char *host, const char *port, listener_add (struct server_context *ctx, const char *host, const char *port,
struct addrinfo *gai_hints, enum listener_type type) struct addrinfo *gai_hints, struct client_impl *impl)
{ {
struct addrinfo *gai_result, *gai_iter; struct addrinfo *gai_result, *gai_iter;
int err = getaddrinfo (host, port, gai_hints, &gai_result); int err = getaddrinfo (host, port, gai_hints, &gai_result);
@ -849,24 +1024,15 @@ listen_resolve (struct server_context *ctx, const char *host, const char *port,
int fd; int fd;
for (gai_iter = gai_result; gai_iter; gai_iter = gai_iter->ai_next) for (gai_iter = gai_result; gai_iter; gai_iter = gai_iter->ai_next)
{ {
if ((fd = listen_finish (gai_iter)) == -1) if ((fd = listener_finish (gai_iter)) == -1)
continue; continue;
set_blocking (fd, false); set_blocking (fd, false);
struct listener *listener = &ctx->listeners[ctx->n_listeners++]; struct listener *listener = &ctx->listeners[ctx->n_listeners++];
switch ((listener->type = type)) ev_io_init (&listener->watcher, on_client_available, fd, EV_READ);
{
case LISTENER_FCGI:
ev_io_init (&listener->watcher,
on_fcgi_client_available, fd, EV_READ);
break;
case LISTENER_SCGI:
ev_io_init (&listener->watcher,
on_scgi_client_available, fd, EV_READ);
break;
}
ev_io_start (EV_DEFAULT_ &listener->watcher); ev_io_start (EV_DEFAULT_ &listener->watcher);
listener->watcher.data = listener;
listener->impl = impl;
break; break;
} }
freeaddrinfo (gai_result); freeaddrinfo (gai_result);
@ -885,10 +1051,8 @@ setup_listen_fds (struct server_context *ctx, struct error **e)
gai_hints.ai_socktype = SOCK_STREAM; gai_hints.ai_socktype = SOCK_STREAM;
gai_hints.ai_flags = AI_PASSIVE; gai_hints.ai_flags = AI_PASSIVE;
struct str_vector ports_fcgi; struct str_vector ports_fcgi; str_vector_init (&ports_fcgi);
struct str_vector ports_scgi; struct str_vector ports_scgi; str_vector_init (&ports_scgi);
str_vector_init (&ports_fcgi);
str_vector_init (&ports_scgi);
if (port_fcgi) if (port_fcgi)
split_str_ignore_empty (port_fcgi, ',', &ports_fcgi); split_str_ignore_empty (port_fcgi, ',', &ports_fcgi);
@ -899,11 +1063,11 @@ setup_listen_fds (struct server_context *ctx, struct error **e)
ctx->listeners = xcalloc (n_ports, sizeof *ctx->listeners); ctx->listeners = xcalloc (n_ports, sizeof *ctx->listeners);
for (size_t i = 0; i < ports_fcgi.len; i++) for (size_t i = 0; i < ports_fcgi.len; i++)
listen_resolve (ctx, bind_host, ports_fcgi.vector[i], listener_add (ctx, bind_host, ports_fcgi.vector[i],
&gai_hints, LISTENER_FCGI); &gai_hints, &g_client_fcgi);
for (size_t i = 0; i < ports_scgi.len; i++) for (size_t i = 0; i < ports_scgi.len; i++)
listen_resolve (ctx, bind_host, ports_scgi.vector[i], listener_add (ctx, bind_host, ports_scgi.vector[i],
&gai_hints, LISTENER_SCGI); &gai_hints, &g_client_scgi);
str_vector_free (&ports_fcgi); str_vector_free (&ports_fcgi);
str_vector_free (&ports_scgi); str_vector_free (&ports_scgi);