Steady progress

Still in a state of total chaos, it appears.
This commit is contained in:
Přemysl Eric Janouch 2015-03-14 19:36:37 +01:00
parent 4337038819
commit 23eb4cca38
1 changed files with 275 additions and 45 deletions

View File

@ -701,6 +701,12 @@ fcgi_request_write (struct fcgi_request *self, const void *data, size_t len)
}
}
static void
fcgi_request_finish (struct fcgi_request *self)
{
// TODO: flush(), end_request(), delete self, muxer->request_destroy_cb()?
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
typedef void (*fcgi_muxer_handler_fn)
@ -764,6 +770,7 @@ fcgi_muxer_on_begin_request
return;
}
// We can only act as a responder, reject everything else up front
if (role != FCGI_RESPONDER)
{
fcgi_muxer_send_end_request (self,
@ -1063,6 +1070,8 @@ scgi_parser_push (struct scgi_parser *self,
#define SEC_WS_PROTOCOL "Sec-WebSocket-Protocol"
#define SEC_WS_VERSION "Sec-WebSocket-Version"
#define WS_MAX_CONTROL_PAYLOAD_LEN 125
static char *
ws_encode_response_key (const char *key)
{
@ -1122,6 +1131,12 @@ enum ws_opcode
WS_OPCODE_PONG = 10
};
static bool
ws_is_control_frame (int opcode)
{
return opcode >= WS_OPCODE_CLOSE;
}
struct ws_parser
{
struct str input; ///< External input buffer
@ -1136,8 +1151,7 @@ struct ws_parser
uint32_t mask; ///< Frame mask
uint64_t payload_len; ///< Payload length
// TODO: it wouldn't be half bad if there was a callback to just validate
// the frame header (such as the maximum payload length)
bool (*on_frame_header) (void *user_data, const struct ws_parser *self);
/// Callback for when a message is successfully parsed.
/// The actual payload is stored in "input", of length "payload_len".
@ -1248,17 +1262,17 @@ ws_parser_push (struct ws_parser *self, const void *data, size_t len)
case WS_PARSER_MASK:
if (!self->is_masked)
{
self->state = WS_PARSER_PAYLOAD;
break;
}
goto end_of_header;
if (self->input.len < 4)
return true;
(void) msg_unpacker_u32 (&unpacker, &self->mask);
self->state = WS_PARSER_PAYLOAD;
str_remove_slice (&self->input, 0, 4);
end_of_header:
self->state = WS_PARSER_PAYLOAD;
if (!self->on_frame_header (self->user_data, self))
return false;
break;
case WS_PARSER_PAYLOAD:
@ -1289,7 +1303,7 @@ ws_parser_push (struct ws_parser *self, const void *data, size_t len)
enum ws_handler_state
{
WS_HANDLER_HTTP, ///< Parsing HTTP
WS_HANDLER_HANDSHAKE, ///< Parsing HTTP
WS_HANDLER_WEBSOCKETS ///< Parsing WebSockets frames
};
@ -1305,12 +1319,38 @@ struct ws_handler
struct str url; ///< Request URL
struct ws_parser parser; ///< Protocol frame parser
bool expecting_continuation; ///< For non-control traffic
// TODO: bool closing;
// TODO: a configurable max_payload_len initialized by _init()
enum ws_opcode message_opcode; ///< Opcode for the current message
struct str message_data; ///< Concatenated message data
unsigned ping_interval; ///< Ping interval in seconds
uint64_t max_payload_len; ///< Maximum length of any message
// TODO: bool closing; // XXX: rather a { OPEN, CLOSING } state?
// TODO: a close timer
// TODO: a ping timer (when no pong is received by the second time the
// timer triggers, it is a ping timeout)
ev_timer ping_timer; ///< Ping timer
bool received_pong; ///< Received PONG since the last PING
/// Called upon reception of a single full message
bool (*on_message) (void *user_data, const void *data, size_t len);
bool (*on_message) (void *user_data,
enum ws_opcode type, const void *data, size_t len);
// TODO: void (*on_initialized) () that will allow the user to choose
// any sub-protocol, if the client has provided any.
/// The connection has been closed.
/// @a close_code may, or may not, be one of enum ws_status.
// NOTE: the "close_code" is what we receive from the remote endpoint,
// or one of 1005/1006/1015
// NOTE: the reason is an empty string if omitted
// TODO; also note that ideally, the handler should (be able to) first
// receive a notification about the connection being closed because of
// an error (recv()) returns -1, and call on_close() in reaction.
void (*on_close) (void *user_data, int close_code, const char *reason);
/// Write a chunk of data to the stream
void (*write_cb) (void *user_data, const void *data, size_t len);
@ -1320,15 +1360,126 @@ struct ws_handler
void *user_data; ///< User data for callbacks
};
static void
ws_handler_send_control (struct ws_handler *self, enum ws_opcode opcode,
const void *data, size_t len)
{
if (len > WS_MAX_CONTROL_PAYLOAD_LEN)
{
print_debug ("truncating output control frame payload"
" from %zu to %zu bytes", len, (size_t) WS_MAX_CONTROL_PAYLOAD_LEN);
len = WS_MAX_CONTROL_PAYLOAD_LEN;
}
uint8_t header[2] = { 0x80 | (opcode & 0x0F), len };
self->write_cb (self->user_data, header, sizeof header);
self->write_cb (self->user_data, data, len);
}
static void
ws_handler_fail (struct ws_handler *self, enum ws_status reason)
{
uint8_t payload[2] = { reason << 8, reason };
ws_handler_send_control (self, WS_OPCODE_CLOSE, payload, sizeof payload);
// TODO: set the close timer, ignore all further incoming input (either set
// some flag for the case that we're in the middle of ws_handler_push(),
// and/or add a mechanism to stop the caller from polling the socket for
// reads).
}
// TODO: ws_handler_close() that behaves like ws_handler_fail() but doesn't
// ignore frames up to a corresponding close from the client.
// Read the RFC once again to see if we can really process the frames.
static bool
ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser)
{
struct ws_handler *self = user_data;
if (parser->reserved_1 || parser->reserved_2 || parser->reserved_3
|| !parser->is_masked // client -> server payload must be masked
|| (ws_is_control_frame (parser->opcode) &&
(!parser->is_fin || parser->payload_len > WS_MAX_CONTROL_PAYLOAD_LEN))
|| (!ws_is_control_frame (parser->opcode) &&
(self->expecting_continuation && parser->opcode != WS_OPCODE_CONT)))
ws_handler_fail (self, WS_STATUS_PROTOCOL);
else if (parser->payload_len > self->max_payload_len)
ws_handler_fail (self, WS_STATUS_TOO_BIG);
else
return true;
return false;
}
static bool
ws_handler_on_control_frame
(struct ws_handler *self, const struct ws_parser *parser)
{
switch (parser->opcode)
{
case WS_OPCODE_CLOSE:
// TODO: confirm the close
break;
case WS_OPCODE_PING:
ws_handler_send_control (self, WS_OPCODE_PONG,
parser->input.str, parser->payload_len);
break;
case WS_OPCODE_PONG:
// XXX: maybe we should check the payload
self->received_pong = true;
break;
default:
// TODO: shouldn't we rather fail on unknown control frames?
// But should we actually return false at any time? Yes?
break;
}
return true;
}
static bool
ws_handler_on_frame (void *user_data, const struct ws_parser *parser)
{
struct ws_handler *self = user_data;
// TODO: handle pings and what not
// TODO: validate the message
// TODO: first concatenate all parts of the message
return self->on_message (self->user_data,
if (ws_is_control_frame (parser->opcode))
return ws_handler_on_control_frame (self, parser);
// TODO: do this rather in "on_frame_header"
if (self->message_data.len + parser->payload_len > self->max_payload_len)
{
ws_handler_fail (self, WS_STATUS_TOO_BIG);
return true;
}
if (!self->expecting_continuation)
self->message_opcode = parser->opcode;
str_append_data (&self->message_data,
parser->input.str, parser->payload_len);
self->expecting_continuation = !parser->is_fin;
if (!parser->is_fin)
return true;
bool result = self->on_message (self->user_data, self->message_opcode,
self->parser.input.str, self->parser.payload_len);
str_reset (&self->message_data);
return result;
}
static void
ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents)
{
(void) loop;
(void) revents;
struct ws_handler *self = watcher->data;
if (!self->received_pong)
{
// TODO: close/fail the connection?
return;
}
ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0);
}
static void
@ -1347,7 +1498,20 @@ ws_handler_init (struct ws_handler *self)
str_init (&self->url);
ws_parser_init (&self->parser);
self->parser.on_frame_header = ws_handler_on_frame_header;
self->parser.on_frame = ws_handler_on_frame;
str_init (&self->message_data);
self->ping_interval = 60;
// This is still ridiculously high
self->max_payload_len = UINT32_MAX;
// Just so we can safely stop it
ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, 0., 0.);
self->ping_timer.data = self;
// So that the first ping timer doesn't timeout the connection
self->received_pong = true;
}
static void
@ -1358,6 +1522,8 @@ ws_handler_free (struct ws_handler *self)
str_map_free (&self->headers);
str_free (&self->url);
ws_parser_free (&self->parser);
str_free (&self->message_data);
ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
}
static void
@ -1395,10 +1561,11 @@ ws_handler_on_header_value (http_parser *parser, const char *at, size_t len)
static int
ws_handler_on_headers_complete (http_parser *parser)
{
// Just return 1 to tell the parser we don't want to parse any body;
// the parser should have found an upgrade request for WebSockets
(void) parser;
return 1;
// We strictly require a protocol upgrade
if (!parser->upgrade)
return 2;
return 0;
}
static int
@ -1418,6 +1585,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
|| self->hp.http_major != 1
|| self->hp.http_minor != 1)
; // TODO: error (maybe send a frame depending on conditions)
// ...mostly just 400 Bad Request
const char *upgrade = str_map_find (&self->headers, "Upgrade");
@ -1425,6 +1593,12 @@ ws_handler_finish_handshake (struct ws_handler *self)
const char *version = str_map_find (&self->headers, SEC_WS_VERSION);
const char *protocol = str_map_find (&self->headers, SEC_WS_PROTOCOL);
if (!upgrade || strcmp (upgrade, "websocket")
|| !version || strcmp (version, "13"))
; // TODO: error
// ... if the version doesn't match, we must send back a header indicating
// the version we do support
struct str response;
str_init (&response);
str_append (&response, "HTTP/1.1 101 Switching Protocols\r\n");
@ -1433,8 +1607,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
// TODO: prepare the rest of the headers
// TODO: we should ideally check that this is a 16-byte base64-encoded
// value; do we also have to strip surrounding whitespace?
// TODO: we should ideally check that this is a 16-byte base64-encoded value
char *response_key = ws_encode_response_key (key);
str_append_printf (&response, SEC_WS_ACCEPT ": %s\r\n", response_key);
free (response_key);
@ -1442,6 +1615,10 @@ ws_handler_finish_handshake (struct ws_handler *self)
str_append (&response, "\r\n");
self->write_cb (self->user_data, response.str, response.len);
str_free (&response);
// XXX: maybe we should start it earlier so that the handshake can
// timeout as well. ws_handler_connected()?
ev_timer_start (EV_DEFAULT_ &self->ping_timer);
return true;
}
@ -1477,14 +1654,16 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
self->state = WS_HANDLER_WEBSOCKETS;
return true;
}
else if (n_parsed != len || HTTP_PARSER_ERRNO (&self->hp) != HPE_OK)
if (n_parsed != len || HTTP_PARSER_ERRNO (&self->hp) != HPE_OK)
{
// TODO: error
// print_debug (..., http_errno_description
// (HTTP_PARSER_ERRNO (&self->hp));
// NOTE: if == HPE_CB_headers_complete, "Upgrade" is missing
return false;
}
// TODO: make double sure to handle the case of !upgrade
return true;
}
@ -1497,6 +1676,7 @@ static struct config_item g_config_table[] =
{ "port_scgi", NULL, "Port to bind for SCGI" },
{ "port_ws", NULL, "Port to bind for WebSockets" },
{ "pid_file", NULL, "Full path for the PID file" },
// XXX: here belongs something like a web SPA that interfaces with us
{ "static_root", NULL, "The root for static content" },
{ NULL, NULL, NULL }
};
@ -1526,11 +1706,16 @@ server_context_init (struct server_context *self)
load_config_defaults (&self->config, g_config_table);
}
static void close_listeners (struct server_context *self);
static void
server_context_free (struct server_context *self)
{
// TODO: free the clients (?)
// TODO: close the listeners (?)
// We really shouldn't attempt a quit without closing the clients first
soft_assert (!self->clients);
close_listeners (self);
free (self->listeners);
str_map_free (&self->config);
}
@ -1773,7 +1958,8 @@ struct request
{
struct server_context *ctx; ///< Server context
void *user_data; ///< User data argument for callbacks
struct request_handler *handler; ///< Current request handler
void *handler_data; ///< User data for the handler
/// Callback to write some CGI response data to the output
void (*write_cb) (void *user_data, const void *data, size_t len);
@ -1782,16 +1968,17 @@ struct request
/// CALLING THIS MAY CAUSE THE REQUEST TO BE DESTROYED.
void (*close_cb) (void *user_data);
struct request_handler *handler; ///< Current request handler
void *handler_data; ///< User data for the handler
void *user_data; ///< User data argument for callbacks
};
struct request_handler
{
LIST_HEADER (struct request_handler)
/// Install ourselves as the handler for the request if applicable
bool (*try_handle) (struct request *request, struct str_map *headers);
/// Install ourselves as the handler for the request if applicable.
/// Set @a continue_ to false if further processing should be stopped.
bool (*try_handle) (struct request *request,
struct str_map *headers, bool *continue_);
/// Handle incoming data.
/// Return false if further processing should be stopped.
@ -1826,16 +2013,22 @@ request_finish (struct request *self)
static bool
request_start (struct request *self, struct str_map *headers)
{
LIST_FOR_EACH (struct request_handler, handler, self->ctx->handlers)
if (handler->try_handle (self, headers))
{
// XXX: maybe we should isolate the handlers a bit more
self->handler = handler;
// XXX: it feels like this should rather be two steps:
// bool (*can_handle) (request *, headers)
// ... install the handler ...
// bool (*handle) (request *)
//
// However that might cause some stuff to be done twice.
//
// Another way we could get rid off the continue_ argument is via adding
// some way of marking the request as finished from within the handler.
// TODO: we should also allow the "try_handle" function to
// return that it has already finished processing the request
// and we should abort it by returning false here.
return true;
bool continue_ = true;
LIST_FOR_EACH (struct request_handler, handler, self->ctx->handlers)
if (handler->try_handle (self, headers, &continue_))
{
self->handler = handler;
return continue_;
}
// Unable to serve the request
@ -1862,7 +2055,7 @@ request_push (struct request *self, const void *data, size_t len)
static bool
request_handler_json_rpc_try_handle
(struct request *request, struct str_map *headers)
(struct request *request, struct str_map *headers, bool *continue_)
{
const char *content_type = str_map_find (headers, "CONTENT_TYPE");
const char *method = str_map_find (headers, "REQUEST_METHOD");
@ -1875,6 +2068,7 @@ request_handler_json_rpc_try_handle
str_init (buf);
request->handler_data = buf;
*continue_ = true;
return true;
}
@ -1972,8 +2166,11 @@ detect_magic (const void *data, size_t len)
static bool
request_handler_static_try_handle
(struct request *request, struct str_map *headers)
(struct request *request, struct str_map *headers, bool *continue_)
{
// Serving static files is actually quite complicated as it turns out;
// but this is only meant to serve a few tiny text files
struct server_context *ctx = request->ctx;
const char *root = str_map_find (&ctx->config, "static_root");
if (!root)
@ -1999,6 +2196,7 @@ request_handler_static_try_handle
char *suffix = canonicalize_url_path (path_info);
char *path = xstrdup_printf ("%s%s", root, suffix);
// TODO: check that this is a regular file
FILE *fp = fopen (path, "rb");
if (!fp)
{
@ -2045,6 +2243,13 @@ request_handler_static_try_handle
while ((len = fread (buf, 1, sizeof buf, fp)))
request->write_cb (request->user_data, buf, len);
fclose (fp);
// TODO: this should rather not be returned all at once but in chunks;
// file read requests never return EAGAIN
// TODO: actual file data should really be returned by a callback when
// the socket is writable with nothing to be sent (pumping the entire
// file all at once won't really work if it's huge).
*continue_ = false;
return true;
}
@ -2178,7 +2383,8 @@ static void
client_fcgi_request_close (void *user_data)
{
struct client_fcgi_request *request = user_data;
// TODO: tell the fcgi_request to what?
// TODO: fcgi_request_finish()? That will most probably end up with us
// receiving client_fcgi_request_destroy()
}
static void *
@ -2186,6 +2392,7 @@ client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request)
{
struct client *client = user_data;
// TODO: what if the request is aborted by ;
struct client_fcgi_request *request = xmalloc (sizeof *request);
request->fcgi_request = fcgi_request;
request_init (&request->request);
@ -2375,12 +2582,17 @@ client_ws_write (void *user_data, const void *data, size_t len)
}
static bool
client_ws_on_message (void *user_data, const void *data, size_t len)
client_ws_on_message (void *user_data,
enum ws_opcode type, const void *data, size_t len)
{
struct client *client = user_data;
struct client_ws *self = client->impl_data;
// TODO: do something about the message
struct str response;
str_init (&response);
process_json_rpc (client->ctx, data, len, &response);
// TODO: send the response
str_free (&response);
return true;
}
@ -2431,6 +2643,22 @@ struct listener
struct client_impl *impl; ///< Client behaviour
};
static void
close_listeners (struct server_context *self)
{
// TODO: factor out the closing act, to be used in initiate_quit()
for (size_t i = 0; i < self->n_listeners; i++)
{
struct listener *listener = &self->listeners[i];
if (listener->fd == -1)
continue;
ev_io_stop (EV_DEFAULT_ &listener->watcher);
xclose (listener->fd);
listener->fd = -1;
}
}
static bool
client_read_loop (EV_P_ struct client *client, ev_io *watcher)
{
@ -2472,6 +2700,8 @@ on_client_ready (EV_P_ ev_io *watcher, int revents)
// finished flushing the write queue? This should probably even be
// the default behaviour, as it's fairly uncommon for clients to
// shutdown the socket for writes while leaving it open for reading.
// TODO: some sort of "on_buffers_flushed" callback for streaming huge
// chunks of external (or generated) data.
if (!flush_queue (&client->write_queue, watcher))
goto close;
return;