Steady progress

On the WebSocket service.

It's not too far from being finished now.  I just have to make some
sense of the code again and make sure it's correct.

Now that json-rpc-shell should be able to run against this, I can
also finally test if both of them work as they should.
This commit is contained in:
Přemysl Eric Janouch 2015-04-10 01:40:32 +02:00
parent db6dff4216
commit 4dbdc849d9
1 changed files with 182 additions and 56 deletions

View File

@ -468,6 +468,7 @@ enum ws_handler_state
WS_HANDLER_CONNECTING, ///< Parsing HTTP WS_HANDLER_CONNECTING, ///< Parsing HTTP
WS_HANDLER_OPEN, ///< Parsing WebSockets frames WS_HANDLER_OPEN, ///< Parsing WebSockets frames
WS_HANDLER_CLOSING, ///< Closing the connection WS_HANDLER_CLOSING, ///< Closing the connection
WS_HANDLER_ALMOST_DEAD, ///< Closing connection after failure
WS_HANDLER_CLOSED ///< Dead WS_HANDLER_CLOSED ///< Dead
}; };
@ -475,12 +476,17 @@ struct ws_handler
{ {
enum ws_handler_state state; ///< State enum ws_handler_state state; ///< State
// HTTP handshake:
http_parser hp; ///< HTTP parser http_parser hp; ///< HTTP parser
bool parsing_header_value; ///< Parsing header value or field? bool parsing_header_value; ///< Parsing header value or field?
struct str field; ///< Field part buffer struct str field; ///< Field part buffer
struct str value; ///< Value part buffer struct str value; ///< Value part buffer
struct str_map headers; ///< HTTP Headers struct str_map headers; ///< HTTP Headers
struct str url; ///< Request URL struct str url; ///< Request URL
ev_timer handshake_timeout_watcher; ///< Handshake timeout watcher
// WebSocket frame protocol:
struct ws_parser parser; ///< Protocol frame parser struct ws_parser parser; ///< Protocol frame parser
bool expecting_continuation; ///< For non-control traffic bool expecting_continuation; ///< For non-control traffic
@ -488,27 +494,32 @@ struct ws_handler
enum ws_opcode message_opcode; ///< Opcode for the current message enum ws_opcode message_opcode; ///< Opcode for the current message
struct str message_data; ///< Concatenated message data struct str message_data; ///< Concatenated message data
unsigned ping_interval; ///< Ping interval in seconds
uint64_t max_payload_len; ///< Maximum length of any message
// TODO: handshake_timeout
// TODO: a close timer
// TODO: a ping timer (when no pong is received by the second time the
// timer triggers, it is a ping timeout)
ev_timer ping_timer; ///< Ping timer ev_timer ping_timer; ///< Ping timer
bool received_pong; ///< Received PONG since the last PING bool received_pong; ///< Received PONG since the last PING
ev_timer close_timeout_watcher; ///< Close timeout watcher
// Configuration:
unsigned handshake_timeout; ///< How long to wait for the handshake
unsigned close_timeout; ///< How long to wait for TCP close
unsigned ping_interval; ///< Ping interval in seconds
uint64_t max_payload_len; ///< Maximum length of any message
// Event callbacks:
// TODO: void (*on_handshake) (protocols) that will allow the user // TODO: void (*on_handshake) (protocols) that will allow the user
// to choose any sub-protocol, if the client has provided any. // to choose any sub-protocol, if the client has provided any.
// This may render "on_connected" unnecessary.
// TODO: "on_connected" after the handshake has finished? /// Called after successfuly connecting (handshake complete)
bool (*on_connected) (void *user_data);
/// Called upon reception of a single full message /// Called upon reception of a single full message
bool (*on_message) (void *user_data, bool (*on_message) (void *user_data,
enum ws_opcode type, const void *data, size_t len); enum ws_opcode type, const void *data, size_t len);
/// The connection has been closed. @a close_code may, or may not, be one /// The connection is about to close. @a close_code may, or may not, be one
/// of enum ws_status. The @a reason is never NULL. /// of enum ws_status. The @a reason is never NULL.
// TODO; also note that ideally, the handler should (be able to) first // TODO; also note that ideally, the handler should (be able to) first
// receive a notification about the connection being closed because of // receive a notification about the connection being closed because of
@ -516,10 +527,13 @@ struct ws_handler
// Actually, calling push() could work pretty fine for this. // Actually, calling push() could work pretty fine for this.
void (*on_close) (void *user_data, int close_code, const char *reason); void (*on_close) (void *user_data, int close_code, const char *reason);
// Method callbacks:
/// Write a chunk of data to the stream /// Write a chunk of data to the stream
void (*write_cb) (void *user_data, const void *data, size_t len); void (*write_cb) (void *user_data, const void *data, size_t len);
// TODO: "close_cb"; to be used from a ping timer e.g. /// Close the connection
void (*close_cb) (void *user_data);
void *user_data; ///< User data for callbacks void *user_data; ///< User data for callbacks
}; };
@ -541,30 +555,44 @@ ws_handler_send_control (struct ws_handler *self,
} }
static void static void
ws_handler_fail (struct ws_handler *self, enum ws_status reason) ws_handler_close (struct ws_handler *self,
enum ws_status close_code, const char *reason, size_t len)
{ {
uint8_t payload[2] = { reason << 8, reason }; struct str payload;
ws_handler_send_control (self, WS_OPCODE_CLOSE, payload, sizeof payload); str_init (&payload);
str_pack_u16 (&payload, close_code);
// XXX: maybe accept a null-terminated string on input? Has to be UTF-8 a/w
str_append_data (&payload, reason, len);
ws_handler_send_control (self, WS_OPCODE_CLOSE, payload.str, payload.len);
// Close initiated by us; the reason is null-terminated within `payload'
if (self->on_close)
self->on_close (self->user_data, close_code, payload.str + 2);
self->state = WS_HANDLER_CLOSING;
str_free (&payload);
}
static void
ws_handler_fail (struct ws_handler *self, enum ws_status close_code)
{
ws_handler_close (self, close_code, NULL, 0);
self->state = WS_HANDLER_ALMOST_DEAD;
// TODO: set the close timer, ignore all further incoming input (either set // TODO: set the close timer, ignore all further incoming input (either set
// some flag for the case that we're in the middle of ws_handler_push(), // some flag for the case that we're in the middle of ws_handler_push(),
// and/or add a mechanism to stop the caller from polling the socket for // and/or add a mechanism to stop the caller from polling the socket for
// reads). // reads).
// TODO: set the state to FAILED (not CLOSED as that means the TCP
// connection is closed) and wait until all is sent?
// TODO: make sure we don't send pings after the close // TODO: make sure we don't send pings after the close
} }
// TODO: ws_handler_close() that behaves like ws_handler_fail() but doesn't
// ignore frames up to a corresponding close from the client.
// Read the RFC once again to see if we can really process the frames.
// TODO: add support for fragmented responses // TODO: add support for fragmented responses
static void static void
ws_handler_send (struct ws_handler *self, ws_handler_send (struct ws_handler *self,
enum ws_opcode opcode, const void *data, size_t len) enum ws_opcode opcode, const void *data, size_t len)
{ {
// TODO: make sure (just assert?) we're in the OPEN state if (!soft_assert (self->state == WS_HANDLER_OPEN))
return;
struct str header; struct str header;
str_init (&header); str_init (&header);
@ -611,6 +639,38 @@ ws_handler_on_frame_header (void *user_data, const struct ws_parser *parser)
return false; return false;
} }
static bool
ws_handler_on_protocol_close
(struct ws_handler *self, const struct ws_parser *parser)
{
struct msg_unpacker unpacker;
msg_unpacker_init (&unpacker, parser->input.str, parser->payload_len);
char *reason = NULL;
uint16_t close_code = WS_STATUS_NO_STATUS_RECEIVED;
if (parser->payload_len >= 2)
{
(void) msg_unpacker_u16 (&unpacker, &close_code);
reason = xstrndup (parser->input.str + 2, parser->payload_len - 2);
}
else
reason = xstrdup ("");
if (self->state != WS_HANDLER_CLOSING)
{
// Close initiated by the client
ws_handler_send_control (self, WS_OPCODE_CLOSE,
parser->input.str, parser->payload_len);
if (self->on_close)
self->on_close (self->user_data, close_code, reason);
}
free (reason);
self->state = WS_HANDLER_ALMOST_DEAD;
return true;
}
static bool static bool
ws_handler_on_control_frame ws_handler_on_control_frame
(struct ws_handler *self, const struct ws_parser *parser) (struct ws_handler *self, const struct ws_parser *parser)
@ -618,11 +678,7 @@ ws_handler_on_control_frame
switch (parser->opcode) switch (parser->opcode)
{ {
case WS_OPCODE_CLOSE: case WS_OPCODE_CLOSE:
// TODO: confirm the close return ws_handler_on_protocol_close (self, parser);
// TODO: change the state to CLOSING
// TODO: call "on_close"
// NOTE: the reason is an empty string if omitted
break;
case WS_OPCODE_PING: case WS_OPCODE_PING:
ws_handler_send_control (self, WS_OPCODE_PONG, ws_handler_send_control (self, WS_OPCODE_PONG,
parser->input.str, parser->payload_len); parser->input.str, parser->payload_len);
@ -634,6 +690,8 @@ ws_handler_on_control_frame
default: default:
// Unknown control frame // Unknown control frame
ws_handler_fail (self, WS_STATUS_PROTOCOL_ERROR); ws_handler_fail (self, WS_STATUS_PROTOCOL_ERROR);
// FIXME: we shouldn't close the connection right away;
// also check other places
return false; return false;
} }
return true; return true;
@ -670,8 +728,10 @@ ws_handler_on_frame (void *user_data, const struct ws_parser *parser)
return false; return false;
} }
bool result = self->on_message (self->user_data, self->message_opcode, bool result = true;
self->message_data.str, self->message_data.len); if (self->on_message)
result = self->on_message (self->user_data, self->message_opcode,
self->message_data.str, self->message_data.len);
str_reset (&self->message_data); str_reset (&self->message_data);
return result; return result;
} }
@ -686,10 +746,26 @@ ws_handler_on_ping_timer (EV_P_ ev_timer *watcher, int revents)
if (!self->received_pong) if (!self->received_pong)
{ {
// TODO: close/fail the connection? // TODO: close/fail the connection?
return;
} }
else
{
ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0);
ev_timer_again (EV_A_ watcher);
}
}
ws_handler_send_control (self, WS_OPCODE_PING, NULL, 0); static void
ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents)
{
struct ws_handler *self = watcher->data;
// TODO: call "close_cb"
}
static void
ws_handler_on_handshake_timeout (EV_P_ ev_timer *watcher, int revents)
{
struct ws_handler *self = watcher->data;
// TODO
} }
static void static void
@ -697,44 +773,61 @@ ws_handler_init (struct ws_handler *self)
{ {
memset (self, 0, sizeof *self); memset (self, 0, sizeof *self);
self->state = WS_HANDLER_CONNECTING;
http_parser_init (&self->hp, HTTP_REQUEST); http_parser_init (&self->hp, HTTP_REQUEST);
self->hp.data = self; self->hp.data = self;
str_init (&self->field); str_init (&self->field);
str_init (&self->value); str_init (&self->value);
str_map_init (&self->headers); str_map_init (&self->headers);
self->headers.free = free; self->headers.free = free;
self->headers.key_xfrm = tolower_ascii_strxfrm; self->headers.key_xfrm = tolower_ascii_strxfrm;
str_init (&self->url); str_init (&self->url);
ev_timer_init (&self->handshake_timeout_watcher,
ws_handler_on_handshake_timeout, 0., 0.);
self->handshake_timeout_watcher.data = self;
ws_parser_init (&self->parser); ws_parser_init (&self->parser);
self->parser.on_frame_header = ws_handler_on_frame_header; self->parser.on_frame_header = ws_handler_on_frame_header;
self->parser.on_frame = ws_handler_on_frame; self->parser.on_frame = ws_handler_on_frame;
str_init (&self->message_data); str_init (&self->message_data);
self->ping_interval = 60; ev_timer_init (&self->ping_timer,
// This is still ridiculously high. Note that the most significant bit ws_handler_on_ping_timer, 0., 0.);
// must always be zero, i.e. the protocol maximum is 0x7FFF FFFF FFFF FFFF. self->ping_timer.data = self;
self->max_payload_len = UINT32_MAX; ev_timer_init (&self->close_timeout_watcher,
ws_handler_on_close_timeout, 0., 0.);
// Just so we can safely stop it
ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer, 0., 0.);
self->ping_timer.data = self; self->ping_timer.data = self;
// So that the first ping timer doesn't timeout the connection // So that the first ping timer doesn't timeout the connection
self->received_pong = true; self->received_pong = true;
self->handshake_timeout = self->close_timeout = self->ping_interval = 60;
// This is still ridiculously high. Note that the most significant bit
// must always be zero, i.e. the protocol maximum is 0x7FFF FFFF FFFF FFFF.
self->max_payload_len = UINT32_MAX;
}
/// Stop all timers, not going to use the handler anymore
static void
ws_handler_stop (struct ws_handler *self)
{
ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
ev_timer_stop (EV_DEFAULT_ &self->close_timeout_watcher);
} }
static void static void
ws_handler_free (struct ws_handler *self) ws_handler_free (struct ws_handler *self)
{ {
ws_handler_stop (self);
str_free (&self->field); str_free (&self->field);
str_free (&self->value); str_free (&self->value);
str_map_free (&self->headers); str_map_free (&self->headers);
str_free (&self->url); str_free (&self->url);
ws_parser_free (&self->parser); ws_parser_free (&self->parser);
str_free (&self->message_data); str_free (&self->message_data);
ev_timer_stop (EV_DEFAULT_ &self->ping_timer);
} }
static bool static bool
@ -863,10 +956,9 @@ ws_handler_http_response (struct ws_handler *self, const char *status, ...)
str_vector_free (&v); str_vector_free (&v);
} }
// TODO: also set the connection to some FAILED state or anything that's neither
// CONNECTING nor OPEN
#define FAIL_HANDSHAKE(status, ...) \ #define FAIL_HANDSHAKE(status, ...) \
BLOCK_START \ BLOCK_START \
self->state = WS_HANDLER_ALMOST_DEAD; \
ws_handler_http_response (self, (status), __VA_ARGS__); \ ws_handler_http_response (self, (status), __VA_ARGS__); \
return false; \ return false; \
BLOCK_END BLOCK_END
@ -949,21 +1041,38 @@ ws_handler_finish_handshake (struct ws_handler *self)
str_vector_free (&fields); str_vector_free (&fields);
// XXX: maybe we should start it earlier so that the handshake can ev_timer_init (&self->ping_timer, ws_handler_on_ping_timer,
// timeout as well. ws_handler_connected()? self->ping_interval, 0);
//
// But it should rather be named "connect_timer"
ev_timer_start (EV_DEFAULT_ &self->ping_timer); ev_timer_start (EV_DEFAULT_ &self->ping_timer);
return true; return true;
} }
/// Tells the handler that the TCP connection has been established so it can
/// timeout when the client handshake doesn't arrive soon enough
static void
ws_handler_start (struct ws_handler *self)
{
ev_timer_set (&self->handshake_timeout_watcher,
self->handshake_timeout, 0.);
ev_timer_start (EV_DEFAULT_ &self->handshake_timeout_watcher);
}
/// Push data to the WebSocket handler; "len == 0" means EOF
static bool static bool
ws_handler_push (struct ws_handler *self, const void *data, size_t len) ws_handler_push (struct ws_handler *self, const void *data, size_t len)
{ {
// TODO: make sure all timers are stopped appropriately
if (!len) if (!len)
{ {
ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
if (self->state == WS_HANDLER_OPEN) if (self->state == WS_HANDLER_OPEN)
self->on_close (self->user_data, WS_STATUS_ABNORMAL_CLOSURE, ""); {
if (self->on_close)
self->on_close (self->user_data,
WS_STATUS_ABNORMAL_CLOSURE, "");
}
else else
{ {
// TODO: anything to do besides just closing the connection? // TODO: anything to do besides just closing the connection?
@ -973,6 +1082,9 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
return false; return false;
} }
if (self->state == WS_HANDLER_ALMOST_DEAD)
// We're waiting for an EOF from the client, must not process data
return true;
if (self->state != WS_HANDLER_CONNECTING) if (self->state != WS_HANDLER_CONNECTING)
return ws_parser_push (&self->parser, data, len); return ws_parser_push (&self->parser, data, len);
@ -990,6 +1102,8 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
if (self->hp.upgrade) if (self->hp.upgrade)
{ {
ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
// The handshake hasn't been finished, yet there is more data // The handshake hasn't been finished, yet there is more data
// to be processed after the headers already // to be processed after the headers already
if (len - n_parsed) if (len - n_parsed)
@ -999,12 +1113,16 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
return false; return false;
self->state = WS_HANDLER_OPEN; self->state = WS_HANDLER_OPEN;
if (self->on_connected)
return self->on_connected (self->user_data);
return true; return true;
} }
enum http_errno err = HTTP_PARSER_ERRNO (&self->hp); enum http_errno err = HTTP_PARSER_ERRNO (&self->hp);
if (n_parsed != len || err != HPE_OK) if (n_parsed != len || err != HPE_OK)
{ {
ev_timer_stop (EV_DEFAULT_ &self->handshake_timeout_watcher);
if (err == HPE_CB_headers_complete) if (err == HPE_CB_headers_complete)
print_debug ("WS handshake failed: %s", "missing `Upgrade' field"); print_debug ("WS handshake failed: %s", "missing `Upgrade' field");
else else
@ -1916,13 +2034,6 @@ struct client_ws
struct ws_handler handler; ///< WebSockets connection handler struct ws_handler handler; ///< WebSockets connection handler
}; };
static void
client_ws_write (void *user_data, const void *data, size_t len)
{
struct client *client = user_data;
client_write (client, data, len);
}
static bool static bool
client_ws_on_message (void *user_data, client_ws_on_message (void *user_data,
enum ws_opcode type, const void *data, size_t len) enum ws_opcode type, const void *data, size_t len)
@ -1945,6 +2056,20 @@ client_ws_on_message (void *user_data,
return true; return true;
} }
static void
client_ws_write (void *user_data, const void *data, size_t len)
{
struct client *client = user_data;
client_write (client, data, len);
}
static void
client_ws_close (void *user_data)
{
struct client *client = user_data;
client_remove (client);
}
static void static void
client_ws_init (struct client *client) client_ws_init (struct client *client)
{ {
@ -1952,10 +2077,10 @@ client_ws_init (struct client *client)
client->impl_data = self; client->impl_data = self;
ws_handler_init (&self->handler); ws_handler_init (&self->handler);
self->handler.write_cb = client_ws_write;
self->handler.on_message = client_ws_on_message; self->handler.on_message = client_ws_on_message;
self->handler.write_cb = client_ws_write;
self->handler.close_cb = client_ws_close;
self->handler.user_data = client; self->handler.user_data = client;
// TODO: configure the handler some more, e.g. regarding the protocol
// One mebibyte seems to be a reasonable value // One mebibyte seems to be a reasonable value
self->handler.max_payload_len = 1 << 10; self->handler.max_payload_len = 1 << 10;
@ -2052,6 +2177,7 @@ on_client_ready (EV_P_ ev_io *watcher, int revents)
// finished flushing the write queue? This should probably even be // finished flushing the write queue? This should probably even be
// the default behaviour, as it's fairly uncommon for clients to // the default behaviour, as it's fairly uncommon for clients to
// shutdown the socket for writes while leaving it open for reading. // shutdown the socket for writes while leaving it open for reading.
// Actually, we should wait until the client closes the connection.
// TODO: some sort of "on_buffers_flushed" callback for streaming huge // TODO: some sort of "on_buffers_flushed" callback for streaming huge
// chunks of external (or generated) data. // chunks of external (or generated) data.
if (!flush_queue (&client->write_queue, watcher)) if (!flush_queue (&client->write_queue, watcher))