Figuring out how to close the connection
This commit is contained in:
parent
0b0d64124b
commit
2733ead30f
|
@ -68,30 +68,6 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value)
|
||||||
|
|
||||||
// --- libev helpers -----------------------------------------------------------
|
// --- libev helpers -----------------------------------------------------------
|
||||||
|
|
||||||
static bool
|
|
||||||
read_loop (EV_P_ ev_io *watcher,
|
|
||||||
bool (*cb) (EV_P_ ev_io *, const void *, ssize_t))
|
|
||||||
{
|
|
||||||
char buf[8192];
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
|
|
||||||
if (n_read < 0)
|
|
||||||
{
|
|
||||||
if (errno == EAGAIN)
|
|
||||||
break;
|
|
||||||
if (errno == EINTR)
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// The callback is called on EOF as well
|
|
||||||
if (n_read < 0 || !cb (EV_A_ watcher, buf, n_read))
|
|
||||||
return false;
|
|
||||||
if (!n_read)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
flush_queue (write_queue_t *queue, ev_io *watcher)
|
flush_queue (write_queue_t *queue, ev_io *watcher)
|
||||||
{
|
{
|
||||||
|
@ -504,11 +480,13 @@ struct scgi_parser
|
||||||
struct str name; ///< Header name so far
|
struct str name; ///< Header name so far
|
||||||
struct str value; ///< Header value so far
|
struct str value; ///< Header value so far
|
||||||
|
|
||||||
/// Finished parsing request headers
|
/// Finished parsing request headers.
|
||||||
void (*on_headers_read) (void *user_data);
|
/// Return false to abort further processing of input.
|
||||||
|
bool (*on_headers_read) (void *user_data);
|
||||||
|
|
||||||
/// Content available; len == 0 means end of file
|
/// Content available; len == 0 means end of file.
|
||||||
void (*on_content) (void *user_data, const void *data, size_t len);
|
/// Return false to abort further processing of input.
|
||||||
|
bool (*on_content) (void *user_data, const void *data, size_t len);
|
||||||
|
|
||||||
void *user_data; ///< User data passed to callbacks
|
void *user_data; ///< User data passed to callbacks
|
||||||
};
|
};
|
||||||
|
@ -545,15 +523,15 @@ scgi_parser_push (struct scgi_parser *self,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Indicate end of file
|
// Indicate end of file
|
||||||
self->on_content (self->user_data, NULL, 0);
|
return self->on_content (self->user_data, NULL, 0);
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notice that this madness is significantly harder to parse than FastCGI;
|
// Notice that this madness is significantly harder to parse than FastCGI;
|
||||||
// this procedure could also be optimized significantly
|
// this procedure could also be optimized significantly
|
||||||
str_append_data (&self->input, data, len);
|
str_append_data (&self->input, data, len);
|
||||||
|
|
||||||
while (true)
|
bool keep_running = true;
|
||||||
|
while (keep_running)
|
||||||
switch (self->state)
|
switch (self->state)
|
||||||
{
|
{
|
||||||
case SCGI_READING_NETSTRING_LENGTH:
|
case SCGI_READING_NETSTRING_LENGTH:
|
||||||
|
@ -601,7 +579,7 @@ scgi_parser_push (struct scgi_parser *self,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
self->state = SCGI_READING_CONTENT;
|
self->state = SCGI_READING_CONTENT;
|
||||||
// TODO: a "on_headers_read" callback?
|
keep_running = self->on_headers_read (self->user_data);
|
||||||
}
|
}
|
||||||
else if (c != '\0')
|
else if (c != '\0')
|
||||||
str_append_c (&self->name, c);
|
str_append_c (&self->name, c);
|
||||||
|
@ -641,10 +619,12 @@ scgi_parser_push (struct scgi_parser *self,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SCGI_READING_CONTENT:
|
case SCGI_READING_CONTENT:
|
||||||
self->on_content (self->user_data, self->input.str, self->input.len);
|
keep_running = self->on_content
|
||||||
|
(self->user_data, self->input.str, self->input.len);
|
||||||
str_remove_slice (&self->input, 0, self->input.len);
|
str_remove_slice (&self->input, 0, self->input.len);
|
||||||
return true;
|
return keep_running;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Server ------------------------------------------------------------------
|
// --- Server ------------------------------------------------------------------
|
||||||
|
@ -703,6 +683,9 @@ server_context_free (struct server_context *self)
|
||||||
// returns back a JSON reply. This function may get called multiple times if
|
// returns back a JSON reply. This function may get called multiple times if
|
||||||
// the user sends a batch request.
|
// the user sends a batch request.
|
||||||
|
|
||||||
|
// TODO: a function that queues up a ping over IRC: this has to be owned by the
|
||||||
|
// server context as a background job that removes itself upon completion.
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
try_advance (const char **p, const char *text)
|
try_advance (const char **p, const char *text)
|
||||||
{
|
{
|
||||||
|
@ -747,9 +730,6 @@ validate_json_rpc_content_type (const char *type)
|
||||||
|
|
||||||
// --- Requests ----------------------------------------------------------------
|
// --- Requests ----------------------------------------------------------------
|
||||||
|
|
||||||
// TODO: something to read in the headers and decide what to do with the request
|
|
||||||
// e.g. whether to reject it with a 404, or do JSON-RPC, or ignore it with 200
|
|
||||||
|
|
||||||
struct request
|
struct request
|
||||||
{
|
{
|
||||||
// TODO *ctx
|
// TODO *ctx
|
||||||
|
@ -759,7 +739,8 @@ struct request
|
||||||
/// Callback to write some CGI response data to the output
|
/// Callback to write some CGI response data to the output
|
||||||
void (*write_cb) (void *user_data, const void *data, size_t len);
|
void (*write_cb) (void *user_data, const void *data, size_t len);
|
||||||
|
|
||||||
/// Callback to close the connection
|
/// Callback to close the connection.
|
||||||
|
/// CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED.
|
||||||
void (*close_cb) (void *user_data);
|
void (*close_cb) (void *user_data);
|
||||||
|
|
||||||
struct request_handler *handler; ///< Current request handler
|
struct request_handler *handler; ///< Current request handler
|
||||||
|
@ -771,8 +752,9 @@ struct request_handler
|
||||||
/// Install ourselves as the handler for the request if applicable
|
/// Install ourselves as the handler for the request if applicable
|
||||||
bool (*try_handle) (struct request *request, struct str_map *headers);
|
bool (*try_handle) (struct request *request, struct str_map *headers);
|
||||||
|
|
||||||
/// Handle incoming data
|
/// Handle incoming data.
|
||||||
void (*push_cb) (struct request *request, const void *data, size_t len);
|
/// Return false if further processing should be stopped.
|
||||||
|
bool (*push_cb) (struct request *request, const void *data, size_t len);
|
||||||
|
|
||||||
/// Destroy the handler
|
/// Destroy the handler
|
||||||
void (*destroy_cb) (struct request *request);
|
void (*destroy_cb) (struct request *request);
|
||||||
|
@ -787,16 +769,27 @@ request_init (struct request *self)
|
||||||
static void
|
static void
|
||||||
request_free (struct request *self)
|
request_free (struct request *self)
|
||||||
{
|
{
|
||||||
// TODO: destroy the handler?
|
if (self->handler)
|
||||||
|
self->handler->destroy_cb (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This function is only intended to be run from asynchronous event handlers
|
||||||
|
/// such as timers, not as a direct result of starting the request or receiving
|
||||||
|
/// request data. CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED.
|
||||||
static void
|
static void
|
||||||
|
request_finish (struct request *self)
|
||||||
|
{
|
||||||
|
self->close_cb (self->user_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
request_start (struct request *self, struct str_map *headers)
|
request_start (struct request *self, struct str_map *headers)
|
||||||
{
|
{
|
||||||
bool handled = false;
|
bool handled = false;
|
||||||
// TODO: try request handlers registered in self->ctx
|
// TODO: try request handlers registered in self->ctx
|
||||||
if (handled)
|
if (handled)
|
||||||
return;
|
// TODO: can also be false
|
||||||
|
return true;
|
||||||
|
|
||||||
// Unable to serve the request
|
// Unable to serve the request
|
||||||
struct str response;
|
struct str response;
|
||||||
|
@ -804,16 +797,17 @@ request_start (struct request *self, struct str_map *headers)
|
||||||
str_append (&response, "404 Not Found\r\n\r\n");
|
str_append (&response, "404 Not Found\r\n\r\n");
|
||||||
self->write_cb (self->user_data, response.str, response.len);
|
self->write_cb (self->user_data, response.str, response.len);
|
||||||
str_free (&response);
|
str_free (&response);
|
||||||
|
return false;
|
||||||
// XXX: how will the clients behave when this happens?
|
|
||||||
self->close_cb (self->user_data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static bool
|
||||||
request_push (struct request *self, const void *data, size_t len)
|
request_push (struct request *self, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
if (soft_assert (self->handler))
|
if (soft_assert (self->handler))
|
||||||
self->handler->push_cb (self, data, len);
|
return self->handler->push_cb (self, data, len);
|
||||||
|
|
||||||
|
// No handler, nothing to do with any data
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Requests handlers -------------------------------------------------------
|
// --- Requests handlers -------------------------------------------------------
|
||||||
|
@ -833,12 +827,13 @@ request_handler_json_rpc_try_handle
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static bool
|
||||||
request_handler_json_rpc_push
|
request_handler_json_rpc_push
|
||||||
(struct request *request, const void *data, size_t len)
|
(struct request *request, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
// TODO: append to a buffer
|
// TODO: append to a buffer
|
||||||
// TODO: len == 0: process the request
|
// TODO: len == 0: process the request
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -883,7 +878,7 @@ struct client_impl
|
||||||
void (*destroy) (struct client *client);
|
void (*destroy) (struct client *client);
|
||||||
|
|
||||||
/// Process incoming data; "len == 0" means EOF
|
/// Process incoming data; "len == 0" means EOF
|
||||||
bool (*on_data) (struct client *client, const void *data, size_t len);
|
bool (*push) (struct client *client, const void *data, size_t len);
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -910,6 +905,24 @@ client_write (struct client *client, const void *data, size_t len)
|
||||||
ev_io_start (EV_DEFAULT_ &client->write_watcher);
|
ev_io_start (EV_DEFAULT_ &client->write_watcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
client_remove (struct client *client)
|
||||||
|
{
|
||||||
|
struct server_context *ctx = client->ctx;
|
||||||
|
|
||||||
|
LIST_UNLINK (ctx->clients, client);
|
||||||
|
ctx->n_clients--;
|
||||||
|
|
||||||
|
// First uninitialize the higher-level implementation
|
||||||
|
client->impl->destroy (client);
|
||||||
|
|
||||||
|
ev_io_stop (EV_DEFAULT_ &client->read_watcher);
|
||||||
|
ev_io_stop (EV_DEFAULT_ &client->write_watcher);
|
||||||
|
xclose (client->socket_fd);
|
||||||
|
client_free (client);
|
||||||
|
free (client);
|
||||||
|
}
|
||||||
|
|
||||||
// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
struct client_fcgi
|
struct client_fcgi
|
||||||
|
@ -938,7 +951,7 @@ client_fcgi_destroy (struct client *client)
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_fcgi_on_data (struct client *client, const void *data, size_t len)
|
client_fcgi_push (struct client *client, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client_fcgi *self = client->impl_data;
|
struct client_fcgi *self = client->impl_data;
|
||||||
fcgi_parser_push (&self->parser, data, len);
|
fcgi_parser_push (&self->parser, data, len);
|
||||||
|
@ -949,7 +962,7 @@ static struct client_impl g_client_fcgi =
|
||||||
{
|
{
|
||||||
.init = client_fcgi_init,
|
.init = client_fcgi_init,
|
||||||
.destroy = client_fcgi_destroy,
|
.destroy = client_fcgi_destroy,
|
||||||
.on_data = client_fcgi_on_data,
|
.push = client_fcgi_push,
|
||||||
};
|
};
|
||||||
|
|
||||||
// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
@ -957,7 +970,7 @@ static struct client_impl g_client_fcgi =
|
||||||
struct client_scgi
|
struct client_scgi
|
||||||
{
|
{
|
||||||
struct scgi_parser parser; ///< SCGI stream parser
|
struct scgi_parser parser; ///< SCGI stream parser
|
||||||
struct request request; ///< Request
|
struct request request; ///< Request (only one per connection)
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -970,29 +983,29 @@ client_scgi_write (void *user_data, const void *data, size_t len)
|
||||||
static void
|
static void
|
||||||
client_scgi_close (void *user_data)
|
client_scgi_close (void *user_data)
|
||||||
{
|
{
|
||||||
|
// XXX: this rather really means "close me [the request]"
|
||||||
struct client *client = user_data;
|
struct client *client = user_data;
|
||||||
struct client_scgi *self = client->impl_data;
|
client_remove (client);
|
||||||
|
|
||||||
// TODO
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static bool
|
||||||
client_scgi_on_headers_read (void *user_data)
|
client_scgi_on_headers_read (void *user_data)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client *client = user_data;
|
||||||
struct client_scgi *self = client->impl_data;
|
struct client_scgi *self = client->impl_data;
|
||||||
|
return request_start (&self->request, &self->parser.headers);
|
||||||
request_start (&self->request, &self->parser.headers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static bool
|
||||||
client_scgi_on_content (void *user_data, const void *data, size_t len)
|
client_scgi_on_content (void *user_data, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client *client = user_data;
|
||||||
struct client_scgi *self = client->impl_data;
|
struct client_scgi *self = client->impl_data;
|
||||||
|
|
||||||
// XXX: make sure this is understood as EOF
|
// XXX: do we have to count CONTENT_LENGTH and supply our own EOF?
|
||||||
request_push (&self->request, data, len);
|
// If we do produce our own EOF, we should probably make sure we don't
|
||||||
|
// send it twice in a row.
|
||||||
|
return request_push (&self->request, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -1018,23 +1031,24 @@ client_scgi_destroy (struct client *client)
|
||||||
struct client_scgi *self = client->impl_data;
|
struct client_scgi *self = client->impl_data;
|
||||||
client->impl_data = NULL;
|
client->impl_data = NULL;
|
||||||
|
|
||||||
// TODO: do something more to abort the request?
|
|
||||||
request_free (&self->request);
|
request_free (&self->request);
|
||||||
|
|
||||||
scgi_parser_free (&self->parser);
|
scgi_parser_free (&self->parser);
|
||||||
free (self);
|
free (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_scgi_on_data (struct client *client, const void *data, size_t len)
|
client_scgi_push (struct client *client, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client_scgi *self = client->impl_data;
|
struct client_scgi *self = client->impl_data;
|
||||||
struct error *e = NULL;
|
struct error *e = NULL;
|
||||||
if (scgi_parser_push (&self->parser, data, len, &e))
|
if (scgi_parser_push (&self->parser, data, len, &e))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
print_debug ("SCGI parser failed: %s", e->message);
|
if (e != NULL)
|
||||||
error_free (e);
|
{
|
||||||
|
print_debug ("SCGI parser failed: %s", e->message);
|
||||||
|
error_free (e);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1042,7 +1056,7 @@ static struct client_impl g_client_scgi =
|
||||||
{
|
{
|
||||||
.init = client_scgi_init,
|
.init = client_scgi_init,
|
||||||
.destroy = client_scgi_destroy,
|
.destroy = client_scgi_destroy,
|
||||||
.on_data = client_scgi_on_data,
|
.push = client_scgi_push,
|
||||||
};
|
};
|
||||||
|
|
||||||
// --- Basic server stuff ------------------------------------------------------
|
// --- Basic server stuff ------------------------------------------------------
|
||||||
|
@ -1054,41 +1068,45 @@ struct listener
|
||||||
struct client_impl *impl; ///< Client behaviour
|
struct client_impl *impl; ///< Client behaviour
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
|
||||||
remove_client (struct server_context *ctx, struct client *client)
|
|
||||||
{
|
|
||||||
LIST_UNLINK (ctx->clients, client);
|
|
||||||
ctx->n_clients--;
|
|
||||||
|
|
||||||
// First uninitialize the higher-level implementation
|
|
||||||
client->impl->destroy (client);
|
|
||||||
|
|
||||||
ev_io_stop (EV_DEFAULT_ &client->read_watcher);
|
|
||||||
ev_io_stop (EV_DEFAULT_ &client->write_watcher);
|
|
||||||
xclose (client->socket_fd);
|
|
||||||
client_free (client);
|
|
||||||
free (client);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
on_client_data (EV_P_ ev_io *watcher, const void *buf, ssize_t n_read)
|
client_read_loop (EV_P_ struct client *client, ev_io *watcher)
|
||||||
{
|
{
|
||||||
(void) loop;
|
char buf[8192];
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
|
||||||
|
if (n_read < 0)
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN)
|
||||||
|
break;
|
||||||
|
if (errno == EINTR)
|
||||||
|
continue;
|
||||||
|
|
||||||
struct client *client = watcher->data;
|
return false;
|
||||||
return client->impl->on_data (client, buf, n_read);
|
}
|
||||||
|
|
||||||
|
if (!client->impl->push (client, buf, n_read))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!n_read)
|
||||||
|
{
|
||||||
|
// Don't receive the EOF condition repeatedly
|
||||||
|
ev_io_stop (EV_A_ watcher);
|
||||||
|
|
||||||
|
// We can probably still write, so let's just return
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
on_client_ready (EV_P_ ev_io *watcher, int revents)
|
on_client_ready (EV_P_ ev_io *watcher, int revents)
|
||||||
{
|
{
|
||||||
struct server_context *ctx = ev_userdata (loop);
|
|
||||||
struct client *client = watcher->data;
|
struct client *client = watcher->data;
|
||||||
|
|
||||||
// FIXME: don't close the connection on EOF; we need to be able to keep
|
|
||||||
// the connection open and respond in an asynchronous manner
|
|
||||||
if (revents & EV_READ)
|
if (revents & EV_READ)
|
||||||
if (!read_loop (EV_A_ watcher, on_client_data))
|
if (!client_read_loop (EV_A_ client, watcher))
|
||||||
goto error;
|
goto error;
|
||||||
if (revents & EV_WRITE)
|
if (revents & EV_WRITE)
|
||||||
if (!flush_queue (&client->write_queue, watcher))
|
if (!flush_queue (&client->write_queue, watcher))
|
||||||
|
@ -1096,7 +1114,9 @@ on_client_ready (EV_P_ ev_io *watcher, int revents)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
remove_client (ctx, client);
|
// The callback also could have just told us to stop reading,
|
||||||
|
// this is not necessarily an error condition
|
||||||
|
client_remove (client);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
|
Loading…
Reference in New Issue