From 7333f271595a8bd42f02f19e1e3e71df9764bc96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Janouch?= Date: Wed, 8 Apr 2015 01:52:20 +0200 Subject: [PATCH] Give the WebSocket backend some chance of working This is all untested code. --- json-rpc-shell.c | 288 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 213 insertions(+), 75 deletions(-) diff --git a/json-rpc-shell.c b/json-rpc-shell.c index 1ff3120..a231d0b 100644 --- a/json-rpc-shell.c +++ b/json-rpc-shell.c @@ -74,6 +74,13 @@ static struct config_item g_config_table[] = // --- Main program ------------------------------------------------------------ +// HTTP/S and WS/S require significantly different handling. While for HTTP we +// can just use the cURL easy interface, with WebSockets it gets a bit more +// complicated and we implement it all by ourselves. +// +// Luckily on a higher level the application doesn't need to bother itself with +// the details and the backend API can be very simple.. + struct app_context; struct backend_iface @@ -108,18 +115,32 @@ enum ws_handler_state struct ws_context { + // Configuration: + char *endpoint; ///< Endpoint URL struct http_parser_url url; ///< Parsed URL + struct str_vector extra_headers; ///< Extra headers for the handshake - enum ws_handler_state state; ///< State - char *key; ///< Key for the current handshake - struct str *response_buffer; ///< Buffer for the response + // Events: + + bool waiting_for_event; ///< Running a separate loop to wait? + struct error *e; ///< Error while waiting for event + + ev_timer timeout_watcher; ///< Connection timeout watcher + struct str *response_buffer; ///< Buffer for the incoming messages + + // The TCP transport: int server_fd; ///< Socket FD of the server ev_io read_watcher; ///< Server FD read watcher SSL_CTX *ssl_ctx; ///< SSL context SSL *ssl; ///< SSL connection + // WebSockets protocol handling: + + enum ws_handler_state state; ///< State + char *key; ///< Key for the current handshake + http_parser hp; ///< HTTP parser bool parsing_header_value; ///< Parsing header value or field? struct str field; ///< Field part buffer @@ -131,14 +152,15 @@ struct ws_context enum ws_opcode message_opcode; ///< Opcode for the current message struct str message_data; ///< Concatenated message data - - struct str_vector extra_headers; ///< Extra headers for the handshake }; static void ws_context_init (struct ws_context *self) { memset (self, 0, sizeof *self); + ev_timer_init (&self->timeout_watcher, NULL, 0, 0); + self->server_fd = -1; + ev_io_init (&self->read_watcher, NULL, 0, 0); http_parser_init (&self->hp, HTTP_RESPONSE); str_init (&self->field); str_init (&self->value); @@ -755,14 +777,15 @@ backend_ws_on_data (struct app_context *ctx, const void *data, size_t len) struct error *e = NULL; if (!backend_ws_finish_handshake (ctx, &e)) { - print_debug ("WS handshake failed: %s", e->message); + print_error ("WS handshake failed: %s", e->message); error_free (e); return false; } + // Finished the handshake, return to caller + // (we run a separate loop to wait for the handshake to finish) self->state = WS_HANDLER_OPEN; - - // TODO: set the event loop to quit? + ev_unloop (EV_DEFAULT_ EVUNLOOP_ONE); if ((len -= n_parsed)) return ws_parser_push (&self->parser, @@ -775,15 +798,47 @@ backend_ws_on_data (struct app_context *ctx, const void *data, size_t len) if (n_parsed != len || err != HPE_OK) { if (err == HPE_CB_headers_complete) - print_debug ("WS handshake failed: %s", "missing `Upgrade' field"); + print_error ("WS handshake failed: %s", "missing `Upgrade' field"); else - print_debug ("WS handshake failed: %s", + print_error ("WS handshake failed: %s", http_errno_description (err)); return false; } return true; } +static void +backend_ws_close_connection (struct app_context *ctx) +{ + struct ws_context *self = &ctx->ws; + if (self->server_fd == -1) + return; + + ev_io_stop (EV_DEFAULT_ &self->read_watcher); + + if (self->ssl) + { + (void) SSL_shutdown (self->ssl); + SSL_free (self->ssl); + self->ssl = NULL; + } + + xclose (self->server_fd); + self->server_fd = -1; + + self->state = WS_HANDLER_CLOSED; + + // That would have no way of succeeding + // XXX: what if we're waiting for the close? + if (self->waiting_for_event) + { + if (!self->e) + error_set (&self->e, "unexpected connection close"); + + ev_unloop (EV_DEFAULT_ EVUNLOOP_ONE); + } +} + static void backend_ws_on_fd_ready (EV_P_ ev_io *handle, int revents) { @@ -793,44 +848,46 @@ backend_ws_on_fd_ready (EV_P_ ev_io *handle, int revents) struct app_context *ctx = handle->data; struct ws_context *self = &ctx->ws; - (void) set_blocking (self->server_fd, false); - enum ws_read_result (*fill_buffer)(struct app_context *, void *, size_t *) = self->ssl ? backend_ws_fill_read_buffer_tls : backend_ws_fill_read_buffer; - bool disconnected = false; + bool close_connection = false; uint8_t buf[8192]; while (true) { + // Try to read some data in a non-blocking manner size_t n_read = sizeof buf; - switch (fill_buffer (ctx, buf, &n_read)) + (void) set_blocking (self->server_fd, false); + enum ws_read_result result = fill_buffer (ctx, buf, &n_read); + (void) set_blocking (self->server_fd, true); + + switch (result) { case WS_READ_AGAIN: goto end; case WS_READ_ERROR: print_error ("reading from the server failed"); - disconnected = true; + close_connection = true; goto end; case WS_READ_EOF: print_status ("the server closed the connection"); - disconnected = true; + close_connection = true; goto end; case WS_READ_OK: - // XXX: this is a bit ugly - (void) set_blocking (self->server_fd, true); - // TODO: use the return value - backend_ws_on_data (ctx, buf, n_read); - (void) set_blocking (self->server_fd, false); - break; + if (backend_ws_on_data (ctx, buf, n_read)) + break; + + // XXX: maybe we should wait until we receive an EOF + close_connection = true; + goto end; } } end: - (void) set_blocking (self->server_fd, true); - if (disconnected) - ; // TODO + if (close_connection) + backend_ws_close_connection (ctx); } static bool @@ -855,8 +912,6 @@ backend_ws_write (struct app_context *ctx, const void *data, size_t len) print_debug ("%s: %s: %s", __func__, "write", strerror (errno)); result = false; } - - // TODO: destroy the connection on failure? return result; } @@ -1012,7 +1067,7 @@ backend_ws_send_message (struct app_context *ctx, return result; } -static void +static bool backend_ws_send_control (struct app_context *ctx, enum ws_opcode opcode, const void *data, size_t len) { @@ -1023,22 +1078,22 @@ backend_ws_send_control (struct app_context *ctx, len = WS_MAX_CONTROL_PAYLOAD_LEN; } - backend_ws_send_control (ctx, opcode, data, len); + return backend_ws_send_message (ctx, opcode, data, len); } -static void +static bool backend_ws_fail (struct app_context *ctx, enum ws_status reason) { - uint8_t payload[2] = { reason << 8, reason }; - backend_ws_send_control (ctx, WS_OPCODE_CLOSE, payload, sizeof payload); + struct ws_context *self = &ctx->ws; - // TODO: set the close timer, ignore all further incoming input (either set - // some flag for the case that we're in the middle of backend_ws_push(), - // and/or add a mechanism to stop the caller from polling the socket for - // reads). - // TODO: set the state to FAILED (not CLOSED as that means the TCP - // connection is closed) and wait until all is sent? - // TODO: make sure we don't send pings after the close + uint8_t payload[2] = { reason << 8, reason }; + (void) backend_ws_send_control (ctx, WS_OPCODE_CLOSE, + payload, sizeof payload); + + // The caller should immediately proceed to close the TCP connection, + // e.g. by returning false from a handler + self->state = WS_HANDLER_CLOSING; + return false; } static bool @@ -1057,37 +1112,66 @@ backend_ws_on_frame_header (void *user_data, const struct ws_parser *parser) || (!ws_is_control_frame (parser->opcode) && (self->expecting_continuation && parser->opcode != WS_OPCODE_CONT)) || parser->payload_len >= 0x8000000000000000ULL) - backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR); + return backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR); else if (parser->payload_len > BACKEND_WS_MAX_PAYLOAD_LEN) - backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG); - else - return true; - return false; + return backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG); + return true; +} + +static bool +backend_ws_finish_closing_handshake + (struct app_context *ctx, const struct ws_parser *parser) +{ + struct str reason; + str_init (&reason); + + if (parser->payload_len >= 2) + { + struct msg_unpacker unpacker; + msg_unpacker_init (&unpacker, parser->input.str, parser->payload_len); + + uint16_t status_code; + msg_unpacker_u16 (&unpacker, &status_code); + print_debug ("close status code: %d", status_code); + + str_append_data (&reason, + parser->input.str + 2, parser->payload_len - 2); + } + + char *s = iconv_xstrdup (ctx->term_from_utf8, + reason.str, reason.len, NULL); + print_status ("server closed the connection (%s)", s); + str_free (&reason); + free (s); + + return backend_ws_send_control (ctx, WS_OPCODE_CLOSE, + parser->input.str, parser->payload_len); } static bool backend_ws_on_control_frame (struct app_context *ctx, const struct ws_parser *parser) { + struct ws_context *self = &ctx->ws; switch (parser->opcode) { case WS_OPCODE_CLOSE: - // TODO: confirm the close - // TODO: change the state to CLOSING - // TODO: call "on_close" - // NOTE: the reason is an empty string if omitted - break; + // We've received an unsolicited server close + if (self->state != WS_HANDLER_CLOSING) + (void) backend_ws_finish_closing_handshake (ctx, parser); + + return false; case WS_OPCODE_PING: - backend_ws_send_control (ctx, WS_OPCODE_PONG, - parser->input.str, parser->payload_len); + if (!backend_ws_send_control (ctx, WS_OPCODE_PONG, + parser->input.str, parser->payload_len)) + return false; break; case WS_OPCODE_PONG: // Not sending any pings but w/e break; default: // Unknown control frame - backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR); - return false; + return backend_ws_fail (ctx, WS_STATUS_PROTOCOL_ERROR); } return true; } @@ -1099,19 +1183,16 @@ backend_ws_on_message (struct app_context *ctx, struct ws_context *self = &ctx->ws; if (type != WS_OPCODE_TEXT) - { - backend_ws_fail (ctx, WS_STATUS_UNSUPPORTED_DATA); - return false; - } + return backend_ws_fail (ctx, WS_STATUS_UNSUPPORTED_DATA); - if (!self->response_buffer) + if (!self->waiting_for_event || !self->response_buffer) { - // TODO: warn about unexpected messages + print_warning ("unexpected message received"); return true; } str_append_data (self->response_buffer, data, len); - // TODO: exit the event loop + ev_unloop (EV_DEFAULT_ EVUNLOOP_ONE); return true; } @@ -1126,10 +1207,7 @@ backend_ws_on_frame (void *user_data, const struct ws_parser *parser) // TODO: do this rather in "on_frame_header" if (self->message_data.len + parser->payload_len > BACKEND_WS_MAX_PAYLOAD_LEN) - { - backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG); - return false; - } + return backend_ws_fail (ctx, WS_STATUS_MESSAGE_TOO_BIG); if (!self->expecting_continuation) self->message_opcode = parser->opcode; @@ -1143,10 +1221,7 @@ backend_ws_on_frame (void *user_data, const struct ws_parser *parser) if (self->message_opcode == WS_OPCODE_TEXT && !utf8_validate (self->parser.input.str, self->parser.input.len)) - { - backend_ws_fail (ctx, WS_STATUS_INVALID_PAYLOAD_DATA); - return false; - } + return backend_ws_fail (ctx, WS_STATUS_INVALID_PAYLOAD_DATA); bool result = backend_ws_on_message (ctx, self->message_opcode, self->message_data.str, self->message_data.len); @@ -1154,6 +1229,20 @@ backend_ws_on_frame (void *user_data, const struct ws_parser *parser) return result; } +static void +backend_ws_on_connection_timeout (EV_P_ ev_io *handle, int revents) +{ + (void) loop; + (void) revents; + + struct app_context *ctx = handle->data; + struct ws_context *self = &ctx->ws; + + hard_assert (self->waiting_for_event); + error_set (&self->e, "connection timeout"); + backend_ws_close_connection (ctx); +} + static bool backend_ws_connect (struct app_context *ctx, struct error **e) { @@ -1223,7 +1312,7 @@ backend_ws_connect (struct app_context *ctx, struct error **e) http_parser_init (&self->hp, HTTP_RESPONSE); str_reset (&self->field); str_reset (&self->value); - // TODO: write a function to free self->headers and call it here + str_map_clear (&self->headers); ws_parser_free (&self->parser); ws_parser_init (&self->parser); self->parser.on_frame_header = backend_ws_on_frame_header; @@ -1235,10 +1324,28 @@ backend_ws_connect (struct app_context *ctx, struct error **e) self->read_watcher.data = ctx; ev_io_start (EV_DEFAULT_ &self->read_watcher); - // TODO: set a timeout timer - // TODO: run an event loop to process the handshake response + // XXX: we should do everything non-blocking and include establishing + // the TCP connection in the timeout, but that requires a rewrite. + // As it is, this isn't really too useful. + ev_timer_init (&self->timeout_watcher, + backend_ws_on_connection_timeout, 30, 0); - result = true; + // Run an event loop to process the handshake + ev_timer_start (EV_DEFAULT_ &self->timeout_watcher); + self->waiting_for_event = true; + + ev_loop (EV_DEFAULT_ 0); + + self->waiting_for_event = false; + ev_timer_stop (EV_DEFAULT_ &self->timeout_watcher); + + if (self->e) + { + error_propagate (e, self->e); + self->e = NULL; + } + else + result = true; fail_2: if (!result) @@ -1269,16 +1376,28 @@ backend_ws_make_call (struct app_context *ctx, if (backend_ws_send_message (ctx, WS_OPCODE_TEXT, request, strlen (request))) break; - print_status ("connection failed"); + print_status ("connection failed, reconnecting"); if (!backend_ws_connect (ctx, e)) return false; } if (expect_content) { + // Run an event loop to retrieve the response self->response_buffer = buf; - // TODO: run an event loop to retrieve the answer into "buf" + self->waiting_for_event = true; + + ev_loop (EV_DEFAULT_ 0); + + self->waiting_for_event = false; self->response_buffer = NULL; + + if (self->e) + { + error_propagate (e, self->e); + self->e = NULL; + return false; + } } return true; } @@ -1286,7 +1405,26 @@ backend_ws_make_call (struct app_context *ctx, static void backend_ws_destroy (struct app_context *ctx) { - // TODO + struct ws_context *self = &ctx->ws; + + // TODO: maybe attempt a graceful shutdown, but for that there should + // probably be another backend method that runs an event loop + if (self->server_fd != -1) + backend_ws_close_connection (ctx); + + free (self->endpoint); + str_vector_free (&self->extra_headers); + if (self->e) + error_free (self->e); + ev_timer_stop (EV_DEFAULT_ &self->timeout_watcher); + if (self->ssl_ctx) + SSL_CTX_free (self->ssl_ctx); + free (self->key); + str_free (&self->field); + str_free (&self->value); + str_map_free (&self->headers); + ws_parser_free (&self->parser); + str_free (&self->message_data); } static struct backend_iface g_backend_ws =