Give the WebSocket backend some chance of working

This is all untested code.
This commit is contained in:
Přemysl Eric Janouch 2015-04-08 01:52:20 +02:00
parent 950fc21ecf
commit 7333f27159
1 changed files with 213 additions and 75 deletions

View File

@ -74,6 +74,13 @@ static struct config_item g_config_table[] =
// --- Main program ------------------------------------------------------------ // --- Main program ------------------------------------------------------------
// HTTP/S and WS/S require significantly different handling. While for HTTP we
// can just use the cURL easy interface, with WebSockets it gets a bit more
// complicated and we implement it all by ourselves.
//
// Luckily on a higher level the application doesn't need to bother itself with
// the details and the backend API can be very simple..
struct app_context; struct app_context;
struct backend_iface struct backend_iface
@ -108,18 +115,32 @@ enum ws_handler_state
struct ws_context struct ws_context
{ {
// Configuration:
char *endpoint; ///< Endpoint URL char *endpoint; ///< Endpoint URL
struct http_parser_url url; ///< Parsed URL struct http_parser_url url; ///< Parsed URL
struct str_vector extra_headers; ///< Extra headers for the handshake
enum ws_handler_state state; ///< State // Events:
char *key; ///< Key for the current handshake
struct str *response_buffer; ///< Buffer for the response bool waiting_for_event; ///< Running a separate loop to wait?
struct error *e; ///< Error while waiting for event
ev_timer timeout_watcher; ///< Connection timeout watcher
struct str *response_buffer; ///< Buffer for the incoming messages
// The TCP transport:
int server_fd; ///< Socket FD of the server int server_fd; ///< Socket FD of the server
ev_io read_watcher; ///< Server FD read watcher ev_io read_watcher; ///< Server FD read watcher
SSL_CTX *ssl_ctx; ///< SSL context SSL_CTX *ssl_ctx; ///< SSL context
SSL *ssl; ///< SSL connection SSL *ssl; ///< SSL connection
// WebSockets protocol handling:
enum ws_handler_state state; ///< State
char *key; ///< Key for the current handshake
http_parser hp; ///< HTTP parser http_parser hp; ///< HTTP parser
bool parsing_header_value; ///< Parsing header value or field? bool parsing_header_value; ///< Parsing header value or field?
struct str field; ///< Field part buffer struct str field; ///< Field part buffer
@ -131,14 +152,15 @@ struct ws_context
enum ws_opcode message_opcode; ///< Opcode for the current message enum ws_opcode message_opcode; ///< Opcode for the current message
struct str message_data; ///< Concatenated message data struct str message_data; ///< Concatenated message data
struct str_vector extra_headers; ///< Extra headers for the handshake
}; };
static void static void
ws_context_init (struct ws_context *self) ws_context_init (struct ws_context *self)
{ {
memset (self, 0, sizeof *self); memset (self, 0, sizeof *self);
ev_timer_init (&self->timeout_watcher, NULL, 0, 0);
self->server_fd = -1;
ev_io_init (&self->read_watcher, NULL, 0, 0);
http_parser_init (&self->hp, HTTP_RESPONSE); http_parser_init (&self->hp, HTTP_RESPONSE);
str_init (&self->field); str_init (&self->field);
str_init (&self->value); str_init (&self->value);
@ -755,14 +777,15 @@ backend_ws_on_data (struct app_context *ctx, const void *data, size_t len)
struct error *e = NULL; struct error *e = NULL;
if (!backend_ws_finish_handshake (ctx, &e)) if (!backend_ws_finish_handshake (ctx, &e))
{ {
print_debug ("WS handshake failed: %s", e->message); print_error ("WS handshake failed: %s", e->message);
error_free (e); error_free (e);
return false; return false;
} }
// Finished the handshake, return to caller
// (we run a separate loop to wait for the handshake to finish)
self->state = WS_HANDLER_OPEN; self->state = WS_HANDLER_OPEN;
ev_unloop (EV_DEFAULT_ EVUNLOOP_ONE);
// TODO: set the event loop to quit?
if ((len -= n_parsed)) if ((len -= n_parsed))
return ws_parser_push (&self->parser, return ws_parser_push (&self->parser,
@ -775,15 +798,47 @@ backend_ws_on_data (struct app_context *ctx, const void *data, size_t len)
if (n_parsed != len || err != HPE_OK) if (n_parsed != len || err != HPE_OK)
{ {
if (err == HPE_CB_headers_complete) if (err == HPE_CB_headers_complete)
print_debug ("WS handshake failed: %s", "missing `Upgrade' field"); print_error ("WS handshake failed: %s", "missing `Upgrade' field");
else else
print_debug ("WS handshake failed: %s", print_error ("WS handshake failed: %s",
http_errno_description (err)); http_errno_description (err));
return false; return false;
} }
return true; return true;
} }
static void
backend_ws_close_connection (struct app_context *ctx)
{
struct ws_context *self = &ctx->ws;
if (self->server_fd == -1)
return;
ev_io_stop (EV_DEFAULT_ &self->read_watcher);
if (self->ssl)
{
(void) SSL_shutdown (self->ssl);
SSL_free (self->ssl);
self->ssl = NULL;
}
xclose (self->server_fd);
self->server_fd = -1;
self->state = WS_HANDLER_CLOSED;
// That would have no way of succeeding
// XXX: what if we're waiting for the close?
if (self->waiting_for_event)
{
if (!self->e)
error_set (&self->e, "unexpected connection close");
ev_unloop (EV_DEFAULT_ EVUNLOOP_ONE);
}
}
static void static void
backend_ws_on_fd_ready (EV_P_ ev_io *handle, int revents) backend_ws_on_fd_ready (EV_P_ ev_io *handle, int revents)
{ {
@ -793,44 +848,46 @@ backend_ws_on_fd_ready (EV_P_ ev_io *handle, int revents)
struct app_context *ctx = handle->data; struct app_context *ctx = handle->data;
struct ws_context *self = &ctx->ws; struct ws_context *self = &ctx->ws;
(void) set_blocking (self->server_fd, false);
enum ws_read_result (*fill_buffer)(struct app_context *, void *, size_t *) enum ws_read_result (*fill_buffer)(struct app_context *, void *, size_t *)
= self->ssl = self->ssl
? backend_ws_fill_read_buffer_tls ? backend_ws_fill_read_buffer_tls
: backend_ws_fill_read_buffer; : backend_ws_fill_read_buffer;
bool disconnected = false; bool close_connection = false;
uint8_t buf[8192]; uint8_t buf[8192];
while (true) while (true)
{ {
// Try to read some data in a non-blocking manner
size_t n_read = sizeof buf; size_t n_read = sizeof buf;
switch (fill_buffer (ctx, buf, &n_read)) (void) set_blocking (self->server_fd, false);
enum ws_read_result result = fill_buffer (ctx, buf, &n_read);
(void) set_blocking (self->server_fd, true);
switch (result)
{ {
case WS_READ_AGAIN: case WS_READ_AGAIN:
goto end; goto end;
case WS_READ_ERROR: case WS_READ_ERROR:
print_error ("reading from the server failed"); print_error ("reading from the server failed");
disconnected = true; close_connection = true;
goto end; goto end;
case WS_READ_EOF: case WS_READ_EOF:
print_status ("the server closed the connection"); print_status ("the server closed the connection");
disconnected = true; close_connection = true;
goto end; goto end;
case WS_READ_OK: case WS_READ_OK:
// XXX: this is a bit ugly if (backend_ws_on_data (ctx, buf, n_read))
(void) set_blocking (self->server_fd, true); break;
// TODO: use the return value
backend_ws_on_data (ctx, buf, n_read); // XXX: maybe we should wait until we receive an EOF
(void) set_blocking (self->server_fd, false); close_connection = true;
break; goto end;
} }
} }
end: end:
(void) set_blocking (self->server_fd, true); if (close_connection)
if (disconnected) backend_ws_close_connection (ctx);
; // TODO
} }
static bool static bool
@ -855,8 +912,6 @@ backend_ws_write (struct app_context *ctx, const void *data, size_t len)
print_debug ("%s: %s: %s", __func__, "write", strerror (errno)); print_debug ("%s: %s: %s", __func__, "write", strerror (errno));
result = false; result = false;
} }
// TODO: destroy the connection on failure?
return result; return result;
} }
@ -1012,7 +1067,7 @@ backend_ws_send_message (struct app_context *ctx,
return result; return result;
} }
static void static bool
backend_ws_send_control (struct app_context *ctx, backend_ws_send_control (struct app_context *ctx,
enum ws_opcode opcode, const void *data, size_t len) enum ws_opcode opcode, const void *data, size_t len)
{ {
@ -1023,22 +1078,22 @@ backend_ws_send_control (struct app_context *ctx,
len = WS_MAX_CONTROL_PAYLOAD_LEN; len = WS_MAX_CONTROL_PAYLOAD_LEN;
} }
backend_ws_send_control (ctx, opcode, data, len); return backend_ws_send_message (ctx, opcode, data, len);
} }
static void static bool
backend_ws_fail (struct app_context *ctx, enum ws_status reason) backend_ws_fail (struct app_context *ctx, enum ws_status reason)
{ {
uint8_t payload[2] = { reason << 8, reason }; struct ws_context *self = &ctx->ws;
backend_ws_send_control (ctx, WS_OPCODE_CLOSE, payload, sizeof payload);
// TODO: set the close timer, ignore all further incoming input (either set uint8_t payload[2] = { reason << 8, reason };
// some flag for the case that we're in the middle of backend_ws_push(), (void) backend_ws_send_control (ctx, WS_OPCODE_CLOSE,
// and/or add a mechanism to stop the caller from polling the socket for payload, sizeof payload);
// reads).
// TODO: set the state to FAILED (not CLOSED as that means the TCP // The caller should immediately proceed to close the TCP connection,
// connection is closed) and wait until all is sent? // e.g. by returning false from a handler
// TODO: make sure we don't send pings after the close self->state = WS_HANDLER_CLOSING;
return false;
} }
static bool static bool
@ -1057,37 +1112,66 @@ backend_ws_on_frame_header (void *user_data, const struct ws_parser *parser)
|| (!ws_is_control_frame (parser->opcode) && || (!ws_is_control_frame (parser->opcode) &&
(self->expecting_continuation && parser->opcode != WS_OPCODE_CONT)) (self->expecting_continuation && parser->opcode != WS_OPCODE_CONT))
|| parser->payload_len >= 0x8000000000000000ULL) || parser->payload_len >= 0x8000000000000000ULL)
backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR); return backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR);
else if (parser->payload_len > BACKEND_WS_MAX_PAYLOAD_LEN) else if (parser->payload_len > BACKEND_WS_MAX_PAYLOAD_LEN)
backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG); return backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG);
else return true;
return true; }
return false;
static bool
backend_ws_finish_closing_handshake
(struct app_context *ctx, const struct ws_parser *parser)
{
struct str reason;
str_init (&reason);
if (parser->payload_len >= 2)
{
struct msg_unpacker unpacker;
msg_unpacker_init (&unpacker, parser->input.str, parser->payload_len);
uint16_t status_code;
msg_unpacker_u16 (&unpacker, &status_code);
print_debug ("close status code: %d", status_code);
str_append_data (&reason,
parser->input.str + 2, parser->payload_len - 2);
}
char *s = iconv_xstrdup (ctx->term_from_utf8,
reason.str, reason.len, NULL);
print_status ("server closed the connection (%s)", s);
str_free (&reason);
free (s);
return backend_ws_send_control (ctx, WS_OPCODE_CLOSE,
parser->input.str, parser->payload_len);
} }
static bool static bool
backend_ws_on_control_frame backend_ws_on_control_frame
(struct app_context *ctx, const struct ws_parser *parser) (struct app_context *ctx, const struct ws_parser *parser)
{ {
struct ws_context *self = &ctx->ws;
switch (parser->opcode) switch (parser->opcode)
{ {
case WS_OPCODE_CLOSE: case WS_OPCODE_CLOSE:
// TODO: confirm the close // We've received an unsolicited server close
// TODO: change the state to CLOSING if (self->state != WS_HANDLER_CLOSING)
// TODO: call "on_close" (void) backend_ws_finish_closing_handshake (ctx, parser);
// NOTE: the reason is an empty string if omitted
break; return false;
case WS_OPCODE_PING: case WS_OPCODE_PING:
backend_ws_send_control (ctx, WS_OPCODE_PONG, if (!backend_ws_send_control (ctx, WS_OPCODE_PONG,
parser->input.str, parser->payload_len); parser->input.str, parser->payload_len))
return false;
break; break;
case WS_OPCODE_PONG: case WS_OPCODE_PONG:
// Not sending any pings but w/e // Not sending any pings but w/e
break; break;
default: default:
// Unknown control frame // Unknown control frame
backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR); return backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR);
return false;
} }
return true; return true;
} }
@ -1099,19 +1183,16 @@ backend_ws_on_message (struct app_context *ctx,
struct ws_context *self = &ctx->ws; struct ws_context *self = &ctx->ws;
if (type != WS_OPCODE_TEXT) if (type != WS_OPCODE_TEXT)
{ return backend_ws_fail (ctx, WS_STATUS_UNSUPPORTED_DATA);
backend_ws_fail (ctx, WS_STATUS_UNSUPPORTED_DATA);
return false;
}
if (!self->response_buffer) if (!self->waiting_for_event || !self->response_buffer)
{ {
// TODO: warn about unexpected messages print_warning ("unexpected message received");
return true; return true;
} }
str_append_data (self->response_buffer, data, len); str_append_data (self->response_buffer, data, len);
// TODO: exit the event loop ev_unloop (EV_DEFAULT_ EVUNLOOP_ONE);
return true; return true;
} }
@ -1126,10 +1207,7 @@ backend_ws_on_frame (void *user_data, const struct ws_parser *parser)
// TODO: do this rather in "on_frame_header" // TODO: do this rather in "on_frame_header"
if (self->message_data.len + parser->payload_len if (self->message_data.len + parser->payload_len
> BACKEND_WS_MAX_PAYLOAD_LEN) > BACKEND_WS_MAX_PAYLOAD_LEN)
{ return backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG);
backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG);
return false;
}
if (!self->expecting_continuation) if (!self->expecting_continuation)
self->message_opcode = parser->opcode; self->message_opcode = parser->opcode;
@ -1143,10 +1221,7 @@ backend_ws_on_frame (void *user_data, const struct ws_parser *parser)
if (self->message_opcode == WS_OPCODE_TEXT if (self->message_opcode == WS_OPCODE_TEXT
&& !utf8_validate (self->parser.input.str, self->parser.input.len)) && !utf8_validate (self->parser.input.str, self->parser.input.len))
{ return backend_ws_fail (ctx, WS_STATUS_INVALID_PAYLOAD_DATA);
backend_ws_fail (ctx, WS_STATUS_INVALID_PAYLOAD_DATA);
return false;
}
bool result = backend_ws_on_message (ctx, self->message_opcode, bool result = backend_ws_on_message (ctx, self->message_opcode,
self->message_data.str, self->message_data.len); self->message_data.str, self->message_data.len);
@ -1154,6 +1229,20 @@ backend_ws_on_frame (void *user_data, const struct ws_parser *parser)
return result; return result;
} }
static void
backend_ws_on_connection_timeout (EV_P_ ev_io *handle, int revents)
{
(void) loop;
(void) revents;
struct app_context *ctx = handle->data;
struct ws_context *self = &ctx->ws;
hard_assert (self->waiting_for_event);
error_set (&self->e, "connection timeout");
backend_ws_close_connection (ctx);
}
static bool static bool
backend_ws_connect (struct app_context *ctx, struct error **e) backend_ws_connect (struct app_context *ctx, struct error **e)
{ {
@ -1223,7 +1312,7 @@ backend_ws_connect (struct app_context *ctx, struct error **e)
http_parser_init (&self->hp, HTTP_RESPONSE); http_parser_init (&self->hp, HTTP_RESPONSE);
str_reset (&self->field); str_reset (&self->field);
str_reset (&self->value); str_reset (&self->value);
// TODO: write a function to free self->headers and call it here str_map_clear (&self->headers);
ws_parser_free (&self->parser); ws_parser_free (&self->parser);
ws_parser_init (&self->parser); ws_parser_init (&self->parser);
self->parser.on_frame_header = backend_ws_on_frame_header; self->parser.on_frame_header = backend_ws_on_frame_header;
@ -1235,10 +1324,28 @@ backend_ws_connect (struct app_context *ctx, struct error **e)
self->read_watcher.data = ctx; self->read_watcher.data = ctx;
ev_io_start (EV_DEFAULT_ &self->read_watcher); ev_io_start (EV_DEFAULT_ &self->read_watcher);
// TODO: set a timeout timer // XXX: we should do everything non-blocking and include establishing
// TODO: run an event loop to process the handshake response // the TCP connection in the timeout, but that requires a rewrite.
// As it is, this isn't really too useful.
ev_timer_init (&self->timeout_watcher,
backend_ws_on_connection_timeout, 30, 0);
result = true; // Run an event loop to process the handshake
ev_timer_start (EV_DEFAULT_ &self->timeout_watcher);
self->waiting_for_event = true;
ev_loop (EV_DEFAULT_ 0);
self->waiting_for_event = false;
ev_timer_stop (EV_DEFAULT_ &self->timeout_watcher);
if (self->e)
{
error_propagate (e, self->e);
self->e = NULL;
}
else
result = true;
fail_2: fail_2:
if (!result) if (!result)
@ -1269,16 +1376,28 @@ backend_ws_make_call (struct app_context *ctx,
if (backend_ws_send_message (ctx, if (backend_ws_send_message (ctx,
WS_OPCODE_TEXT, request, strlen (request))) WS_OPCODE_TEXT, request, strlen (request)))
break; break;
print_status ("connection failed"); print_status ("connection failed, reconnecting");
if (!backend_ws_connect (ctx, e)) if (!backend_ws_connect (ctx, e))
return false; return false;
} }
if (expect_content) if (expect_content)
{ {
// Run an event loop to retrieve the response
self->response_buffer = buf; self->response_buffer = buf;
// TODO: run an event loop to retrieve the answer into "buf" self->waiting_for_event = true;
ev_loop (EV_DEFAULT_ 0);
self->waiting_for_event = false;
self->response_buffer = NULL; self->response_buffer = NULL;
if (self->e)
{
error_propagate (e, self->e);
self->e = NULL;
return false;
}
} }
return true; return true;
} }
@ -1286,7 +1405,26 @@ backend_ws_make_call (struct app_context *ctx,
static void static void
backend_ws_destroy (struct app_context *ctx) backend_ws_destroy (struct app_context *ctx)
{ {
// TODO struct ws_context *self = &ctx->ws;
// TODO: maybe attempt a graceful shutdown, but for that there should
// probably be another backend method that runs an event loop
if (self->server_fd != -1)
backend_ws_close_connection (ctx);
free (self->endpoint);
str_vector_free (&self->extra_headers);
if (self->e)
error_free (self->e);
ev_timer_stop (EV_DEFAULT_ &self->timeout_watcher);
if (self->ssl_ctx)
SSL_CTX_free (self->ssl_ctx);
free (self->key);
str_free (&self->field);
str_free (&self->value);
str_map_free (&self->headers);
ws_parser_free (&self->parser);
str_free (&self->message_data);
} }
static struct backend_iface g_backend_ws = static struct backend_iface g_backend_ws =