Implement basic connection teardown

I finally understand the codebase again.  It's rather complicated.
This commit is contained in:
Přemysl Eric Janouch 2018-10-17 08:40:18 +02:00
parent efd500ca3c
commit a3ec0942f8
Signed by: p
GPG Key ID: A0420B94F92B9493
1 changed files with 146 additions and 92 deletions

View File

@ -47,10 +47,10 @@ enum { PIPE_READ, PIPE_WRITE };
#define FIND_CONTAINER(name, pointer, type, member) \ #define FIND_CONTAINER(name, pointer, type, member) \
type *name = CONTAINER_OF (pointer, type, member) type *name = CONTAINER_OF (pointer, type, member)
// --- libev helpers ----------------------------------------------------------- // --- Utilities ---------------------------------------------------------------
static bool static bool
flush_queue (struct write_queue *queue, ev_io *watcher) flush_queue (struct write_queue *queue, int fd)
{ {
struct iovec vec[queue->len], *vec_iter = vec; struct iovec vec[queue->len], *vec_iter = vec;
LIST_FOR_EACH (struct write_req, iter, queue->head) LIST_FOR_EACH (struct write_req, iter, queue->head)
@ -58,24 +58,17 @@ flush_queue (struct write_queue *queue, ev_io *watcher)
ssize_t written; ssize_t written;
again: again:
written = writev (watcher->fd, vec, N_ELEMENTS (vec)); if ((written = writev (fd, vec, N_ELEMENTS (vec))) >= 0)
if (written < 0)
{ {
if (errno == EAGAIN) write_queue_processed (queue, written);
goto skip; return true;
}
if (errno == EINTR) if (errno == EINTR)
goto again; goto again;
return false; if (errno == EAGAIN)
}
write_queue_processed (queue, written);
skip:
if (write_queue_is_empty (queue))
ev_io_stop (EV_DEFAULT_ watcher);
else
ev_io_start (EV_DEFAULT_ watcher);
return true; return true;
return false;
} }
// --- Logging ----------------------------------------------------------------- // --- Logging -----------------------------------------------------------------
@ -135,16 +128,17 @@ struct fcgi_muxer
/// Write data to the underlying transport /// Write data to the underlying transport
void (*write_cb) (struct fcgi_muxer *, const void *data, size_t len); void (*write_cb) (struct fcgi_muxer *, const void *data, size_t len);
/// Close the underlying transport /// Close the underlying transport. You are allowed to destroy the muxer
// TODO: consider half-close and the subsequent handling /// directly from within the callback.
void (*close_cb) (struct fcgi_muxer *); void (*close_cb) (struct fcgi_muxer *);
/// Start processing a request. Return false if no further action is /// Start processing a request. Return false if no further action is
/// to be done and the request should be finished. /// to be done and the request should be finished.
bool (*request_start_cb) (struct fcgi_muxer *, struct fcgi_request *); bool (*request_start_cb) (struct fcgi_request *);
/// Handle incoming data. "len == 0" means EOF. /// Handle incoming data. "len == 0" means EOF. Returns false if
void (*request_push_cb) /// the underlying transport should be closed, this being the last request.
bool (*request_push_cb)
(struct fcgi_request *, const void *data, size_t len); (struct fcgi_request *, const void *data, size_t len);
/// Destroy the handler's data stored in the request object /// Destroy the handler's data stored in the request object
@ -230,7 +224,7 @@ fcgi_request_push_params
{ {
// TODO: probably check the state of the header parser // TODO: probably check the state of the header parser
// TODO: request_start() can return false, end the request here? // TODO: request_start() can return false, end the request here?
(void) self->muxer->request_start_cb (self->muxer, self); (void) self->muxer->request_start_cb (self);
self->state = FCGI_REQUEST_STDIN; self->state = FCGI_REQUEST_STDIN;
} }
} }
@ -282,7 +276,9 @@ fcgi_request_write (struct fcgi_request *self, const void *data, size_t len)
} }
} }
static void /// Mark the request as done. Returns false if the underlying transport
/// should be closed, this being the last request.
static bool
fcgi_request_finish (struct fcgi_request *self) fcgi_request_finish (struct fcgi_request *self)
{ {
fcgi_request_flush (self); fcgi_request_flush (self);
@ -292,14 +288,26 @@ fcgi_request_finish (struct fcgi_request *self)
0 /* TODO app_status, although ignored */, 0 /* TODO app_status, although ignored */,
FCGI_REQUEST_COMPLETE /* TODO protocol_status, may be different */); FCGI_REQUEST_COMPLETE /* TODO protocol_status, may be different */);
if (!(self->flags & FCGI_KEEP_CONN)) bool should_close = !(self->flags & FCGI_KEEP_CONN);
{
// TODO: tear down (shut down) the connection
}
self->muxer->active_requests--; self->muxer->active_requests--;
self->muxer->requests[self->request_id] = NULL; self->muxer->requests[self->request_id] = NULL;
fcgi_request_destroy (self); fcgi_request_destroy (self);
// TODO: tear down (shut down) the connection. This is called from:
//
// 1. client_fcgi_request_push <- request_push_cb
// <- fcgi_request_push_stdin <- fcgi_muxer_on_stdin
// <- fcgi_muxer_on_message <- fcgi_parser_push <- fcgi_muxer_push
// <- client_fcgi_push <- client_read_loop
// => in this case no close_cb may be called
// -> need to pass a false boolean aaall the way up,
// then client_fcgi_finalize eventually cleans up the rest
//
// 2. client_fcgi_request_close_cb <- request_finish
// => our direct caller must call fcgi_muxer::close_cb
// -> not very nice to delegate it there
return !should_close;
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -418,6 +426,7 @@ fcgi_muxer_on_abort_request
// TODO: abort the request: let it somehow produce FCGI_END_REQUEST, // TODO: abort the request: let it somehow produce FCGI_END_REQUEST,
// make sure to send an stdout EOF record // make sure to send an stdout EOF record
// TODO: and if that was not a FCGI_KEEP_CONN request, close the transport
} }
static void static void
@ -1841,23 +1850,24 @@ struct client
{ {
LIST_HEADER (struct client) LIST_HEADER (struct client)
// XXX: do we really need this here? struct client_vtable *vtable; ///< Client behaviour
struct server_context *ctx; ///< Server context
int socket_fd; ///< The TCP socket int socket_fd; ///< The network socket
bool received_eof; ///< Whether EOF has been received yet
bool closing; ///< Whether we're just flushing buffers
bool half_closed; ///< Transport half-closed while closing
struct write_queue write_queue; ///< Write queue struct write_queue write_queue; ///< Write queue
ev_timer flush_timeout_watcher; ///< Write queue flush timer
ev_io read_watcher; ///< The socket can be read from ev_io read_watcher; ///< The socket can be read from
ev_io write_watcher; ///< The socket can be written to ev_io write_watcher; ///< The socket can be written to
struct client_vtable *vtable; ///< Client behaviour
}; };
/// The concrete behaviour to serve a particular client's requests /// The concrete behaviour to serve a particular client's requests
struct client_vtable struct client_vtable
{ {
/// Process incoming data; "len == 0" means EOF. /// Process incoming data; "len == 0" means EOF.
/// If the method returns false, the client is destroyed by caller. /// If the method returns false, client_close() is called by the caller.
bool (*push) (struct client *client, const void *data, size_t len); bool (*push) (struct client *client, const void *data, size_t len);
// TODO: optional push_error() to inform about network I/O errors // TODO: optional push_error() to inform about network I/O errors
@ -1885,8 +1895,8 @@ client_write (struct client *self, const void *data, size_t len)
static void static void
client_destroy (struct client *self) client_destroy (struct client *self)
{ {
struct server_context *ctx = self->ctx; // XXX: this codebase halfway pretends there could be other contexts
struct server_context *ctx = ev_userdata (EV_DEFAULT);
LIST_UNLINK (ctx->clients, self); LIST_UNLINK (ctx->clients, self);
ctx->n_clients--; ctx->n_clients--;
@ -1897,63 +1907,111 @@ client_destroy (struct client *self)
ev_io_stop (EV_DEFAULT_ &self->write_watcher); ev_io_stop (EV_DEFAULT_ &self->write_watcher);
xclose (self->socket_fd); xclose (self->socket_fd);
write_queue_free (&self->write_queue); write_queue_free (&self->write_queue);
ev_timer_stop (EV_DEFAULT_ &self->flush_timeout_watcher);
free (self); free (self);
try_finish_quit (ctx); try_finish_quit (ctx);
} }
/// Try to cleanly close the connection, waiting for the remote client to close
/// its own side of the connection as a sign that it has processed all the data
/// it wanted to. The client implementation will not receive any further data.
/// May directly call client_destroy().
static void
client_close (struct client *self)
{
if (self->closing)
return;
self->closing = true;
ev_timer_start (EV_DEFAULT_ &self->flush_timeout_watcher);
ev_feed_event (EV_DEFAULT_ &self->write_watcher, EV_WRITE);
// We assume the remote client doesn't want our data if it half-closes
if (self->received_eof)
client_destroy (self);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static bool static bool
client_read_loop (EV_P_ struct client *client, ev_io *watcher) client_read_loop (EV_P_ struct client *client, ev_io *watcher)
{ {
char buf[8192]; char buf[8192];
while (true) ssize_t n_read;
again:
while ((n_read = recv (watcher->fd, buf, sizeof buf, 0)) >= 0)
{ {
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
if (n_read >= 0)
{
if (!client->vtable->push (client, buf, n_read))
return false;
if (!n_read) if (!n_read)
break; {
// Don't deliver the EOF condition repeatedly
ev_io_stop (EV_A_ watcher);
client->received_eof = true;
} }
else if (errno == EAGAIN) if (!client->closing
return true; && !client->vtable->push (client, buf, n_read))
else if (errno != EINTR) {
client_close (client);
return false; return false;
} }
if (!n_read)
// Don't receive the EOF condition repeatedly
ev_io_stop (EV_A_ watcher);
// We can probably still write, so let's just return
// XXX: if there's nothing to be written, shouldn't we close the connection?
return true; return true;
}
if (errno == EINTR)
goto again;
if (errno == EAGAIN)
return true;
client_destroy (client);
return false;
} }
static void static void
on_client_ready (EV_P_ ev_io *watcher, int revents) on_client_readable (EV_P_ ev_io *watcher, int revents)
{ {
struct client *client = watcher->data; struct client *client = watcher->data;
// XXX: although read and write are in a sequence, if we create response (void) revents;
// data, we'll still likely need to go back to the event loop.
if (client_read_loop (EV_A_ client, watcher)
&& client->closing && client->received_eof)
client_destroy (client);
}
static void
on_client_writable (EV_P_ ev_io *watcher, int revents)
{
struct client *client = watcher->data;
(void) loop;
(void) revents;
if (revents & EV_READ)
if (!client_read_loop (EV_A_ client, watcher))
goto close;
if (revents & EV_WRITE)
// TODO: add "closing link" functionality -> automatic shutdown
// (half-close) once we manage to flush the write buffer,
// which is logically followed by waiting for an EOF from the client
// TODO: some sort of "on_buffers_flushed" callback for streaming huge // TODO: some sort of "on_buffers_flushed" callback for streaming huge
// chunks of external (or generated) data. // chunks of external (or generated) data. That will need to be
if (!flush_queue (&client->write_queue, watcher)) // forwarded to "struct request_handler".
goto close; if (!flush_queue (&client->write_queue, watcher->fd))
{
client_destroy (client);
return;
}
if (!write_queue_is_empty (&client->write_queue))
return; return;
close: ev_io_stop (EV_A_ watcher);
if (client->closing && !client->half_closed)
{
if (!shutdown (client->socket_fd, SHUT_WR))
client->half_closed = true;
else
client_destroy (client); client_destroy (client);
}
}
static void
on_client_timeout (EV_P_ ev_timer *watcher, int revents)
{
(void) loop;
(void) revents;
client_destroy (watcher->data);
} }
/// Create a new instance of a subclass with the given size. /// Create a new instance of a subclass with the given size.
@ -1964,14 +2022,15 @@ client_new (EV_P_ size_t size, int sock_fd)
struct server_context *ctx = ev_userdata (loop); struct server_context *ctx = ev_userdata (loop);
struct client *self = xcalloc (1, size); struct client *self = xcalloc (1, size);
self->ctx = ctx;
self->write_queue = write_queue_make (); self->write_queue = write_queue_make ();
ev_timer_init (&self->flush_timeout_watcher, on_client_timeout, 5., 0.);
self->flush_timeout_watcher.data = self;
set_blocking (sock_fd, false); set_blocking (sock_fd, false);
self->socket_fd = sock_fd; self->socket_fd = sock_fd;
ev_io_init (&self->read_watcher, on_client_ready, sock_fd, EV_READ); ev_io_init (&self->read_watcher, on_client_readable, sock_fd, EV_READ);
ev_io_init (&self->write_watcher, on_client_ready, sock_fd, EV_WRITE); ev_io_init (&self->write_watcher, on_client_writable, sock_fd, EV_WRITE);
self->read_watcher.data = self; self->read_watcher.data = self;
self->write_watcher.data = self; self->write_watcher.data = self;
@ -2010,37 +2069,36 @@ static void
client_fcgi_request_close_cb (struct request *req) client_fcgi_request_close_cb (struct request *req)
{ {
FIND_CONTAINER (self, req, struct client_fcgi_request, request); FIND_CONTAINER (self, req, struct client_fcgi_request, request);
// No more data to send, terminate the substream/request struct fcgi_muxer *muxer = self->fcgi_request->muxer;
// XXX: this will most probably end up with client_fcgi_request_destroy(), // No more data to send, terminate the substream/request,
// we might or might not need to defer this action // and also the transport if the client didn't specifically ask to keep it
fcgi_request_finish (self->fcgi_request); if (!fcgi_request_finish (self->fcgi_request))
muxer->close_cb (muxer);
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static bool static bool
client_fcgi_request_start client_fcgi_request_start (struct fcgi_request *fcgi_request)
(struct fcgi_muxer *mux, struct fcgi_request *fcgi_request)
{ {
FIND_CONTAINER (self, mux, struct client_fcgi, muxer);
struct client_fcgi_request *request = struct client_fcgi_request *request =
fcgi_request->handler_data = xcalloc (1, sizeof *request); fcgi_request->handler_data = xcalloc (1, sizeof *request);
request->fcgi_request = fcgi_request; request->fcgi_request = fcgi_request;
request_init (&request->request); request_init (&request->request);
request->request.ctx = self->client.ctx; request->request.ctx = ev_userdata (EV_DEFAULT);
request->request.write_cb = client_fcgi_request_write_cb; request->request.write_cb = client_fcgi_request_write_cb;
request->request.close_cb = client_fcgi_request_close_cb; request->request.close_cb = client_fcgi_request_close_cb;
return request_start (&request->request, &fcgi_request->headers); return request_start (&request->request, &fcgi_request->headers);
} }
static void static bool
client_fcgi_request_push client_fcgi_request_push
(struct fcgi_request *req, const void *data, size_t len) (struct fcgi_request *req, const void *data, size_t len)
{ {
struct client_fcgi_request *request = req->handler_data; struct client_fcgi_request *request = req->handler_data;
request_push (&request->request, data, len); return request_push (&request->request, data, len)
|| fcgi_request_finish (req);
} }
static void static void
@ -2064,9 +2122,7 @@ static void
client_fcgi_close_cb (struct fcgi_muxer *mux) client_fcgi_close_cb (struct fcgi_muxer *mux)
{ {
FIND_CONTAINER (self, mux, struct client_fcgi, muxer); FIND_CONTAINER (self, mux, struct client_fcgi, muxer);
// FIXME: we should probably call something like client_shutdown(), client_close (&self->client);
// which may have an argument whether we should really use close()
client_destroy (&self->client);
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -2137,11 +2193,9 @@ client_scgi_write_cb (struct request *req, const void *data, size_t len)
static void static void
client_scgi_close_cb (struct request *req) client_scgi_close_cb (struct request *req)
{ {
// NOTE: this rather really means "close me [the request]"
FIND_CONTAINER (self, req, struct client_scgi, request); FIND_CONTAINER (self, req, struct client_scgi, request);
// FIXME: we should probably call something like client_shutdown(), // NOTE: this rather really means "close me [the request]"
// which may have an argument whether we should really use close() client_close (&self->client);
client_destroy (&self->client);
} }
static bool static bool
@ -2201,7 +2255,6 @@ client_scgi_create (EV_P_ int sock_fd)
self->client.vtable = &client_scgi_vtable; self->client.vtable = &client_scgi_vtable;
request_init (&self->request); request_init (&self->request);
self->request.ctx = self->client.ctx;
self->request.write_cb = client_scgi_write_cb; self->request.write_cb = client_scgi_write_cb;
self->request.close_cb = client_scgi_close_cb; self->request.close_cb = client_scgi_close_cb;
@ -2231,8 +2284,9 @@ client_ws_on_message (struct ws_handler *handler,
return false; return false;
} }
struct server_context *ctx = ev_userdata (EV_DEFAULT);
struct str response = str_make (); struct str response = str_make ();
process_json_rpc (self->client.ctx, data, len, &response); process_json_rpc (ctx, data, len, &response);
if (response.len) if (response.len)
ws_handler_send (&self->handler, ws_handler_send (&self->handler,
WS_OPCODE_TEXT, response.str, response.len); WS_OPCODE_TEXT, response.str, response.len);