Steady progress

Still trying to figure out FastCGI.

At least I've finally implemented the JSON-RPC handler.
This commit is contained in:
Přemysl Eric Janouch 2015-03-08 05:39:57 +01:00
parent 2733ead30f
commit 9e0c9dd6d8
1 changed files with 650 additions and 34 deletions

View File

@ -66,6 +66,45 @@ msg_unpacker_u32 (struct msg_unpacker *self, uint32_t *value)
#undef UNPACKER_INT_BEGIN
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// "msg_writer" should be rewritten on top of this
static void
str_pack_u8 (struct str *self, uint8_t x)
{
str_append_data (self, &x, 1);
}
static void
str_pack_u16 (struct str *self, uint64_t x)
{
uint8_t tmp[2] = { x >> 8, x };
str_append_data (self, tmp, sizeof tmp);
}
static void
str_pack_u32 (struct str *self, uint32_t x)
{
uint32_t u = x;
uint8_t tmp[4] = { u >> 24, u >> 16, u >> 8, u };
str_append_data (self, tmp, sizeof tmp);
}
static void
str_pack_i32 (struct str *self, int32_t x)
{
str_pack_u32 (self, (uint32_t) x);
}
static void
str_pack_u64 (struct str *self, uint64_t x)
{
uint8_t tmp[8] =
{ x >> 56, x >> 48, x >> 40, x >> 32, x >> 24, x >> 16, x >> 8, x };
str_append_data (self, tmp, sizeof tmp);
}
// --- libev helpers -----------------------------------------------------------
static bool
@ -312,7 +351,7 @@ fcgi_nv_parser_free (struct fcgi_nv_parser *self)
}
static void
fcgi_nv_parser_push (struct fcgi_nv_parser *self, void *data, size_t len)
fcgi_nv_parser_push (struct fcgi_nv_parser *self, const void *data, size_t len)
{
// This could be optimized significantly; I'm not even trying
str_append_data (&self->input, data, len);
@ -402,7 +441,38 @@ fcgi_nv_parser_push (struct fcgi_nv_parser *self, void *data, size_t len)
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// TODO
static void
fcgi_nv_convert_len (size_t len, struct str *output)
{
if (len < 0x80)
str_pack_u8 (output, len);
else
{
len |= (uint32_t) 1 << 31;
str_pack_u32 (output, len);
}
}
static void
fcgi_nv_convert (struct str_map *map, struct str *output)
{
struct str_map_iter iter;
str_map_iter_init (&iter, map);
while (str_map_iter_next (&iter))
{
const char *name = iter.link->key;
const char *value = iter.link->data;
size_t name_len = iter.link->key_length;
size_t value_len = strlen (value);
fcgi_nv_convert_len (name_len, output);
fcgi_nv_convert_len (value_len, output);
str_append_data (output, name, name_len);
str_append_data (output, value, value_len);
}
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
enum fcgi_request_state
{
@ -413,14 +483,17 @@ enum fcgi_request_state
struct fcgi_request
{
struct fcgi_muxer *muxer; ///< The parent muxer
uint16_t request_id; ///< The ID of this request
uint8_t flags; ///< Request flags
enum fcgi_request_state state; ///< Parsing state
struct str_map headers; ///< Headers
struct fcgi_nv_parser hdr_parser; ///< Header parser
};
// TODO
struct str output_buffer; ///< Output buffer
void *handler_data; ///< Handler data
};
struct fcgi_muxer
{
@ -429,14 +502,302 @@ struct fcgi_muxer
/// Requests assigned to request IDs
// TODO: allocate this dynamically
struct fcgi_request *requests[1 << 16];
void (*write_cb) (void *user_data, const void *data, size_t len);
void (*close_cb) (void *user_data);
void *(*request_start_cb) (void *user_data, struct fcgi_request *request);
void (*request_push_cb) (void *handler_data, const void *data, size_t len);
void (*request_destroy_cb) (void *handler_data);
void *user_data; ///< User data for callbacks
};
static void
fcgi_muxer_send (struct fcgi_muxer *self,
enum fcgi_type type, uint16_t request_id, const void *data, size_t len)
{
hard_assert (len <= UINT16_MAX);
struct str message;
str_init (&message);
str_pack_u8 (&message, FCGI_VERSION_1);
str_pack_u8 (&message, type);
str_pack_u16 (&message, request_id);
str_pack_u16 (&message, len); // content length
str_pack_u8 (&message, 0); // padding length
str_append_data (&message, data, len);
// XXX: we should probably have another write_cb that assumes ownership
self->write_cb (self->user_data, message.str, message.len);
str_free (&message);
}
static void
fcgi_muxer_send_end_request (struct fcgi_muxer *self, uint16_t request_id,
uint32_t app_status, enum fcgi_protocol_status protocol_status)
{
uint8_t content[8] = { app_status >> 24, app_status >> 16,
app_status << 8, app_status, protocol_status };
fcgi_muxer_send (self, FCGI_END_REQUEST, request_id,
content, sizeof content);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void
fcgi_request_init (struct fcgi_request *self)
{
memset (self, 0, sizeof *self);
str_map_init (&self->headers);
self->headers.free = free;
fcgi_nv_parser_init (&self->hdr_parser);
self->hdr_parser.output = &self->headers;
}
static void
fcgi_request_free (struct fcgi_request *self)
{
str_map_free (&self->headers);
fcgi_nv_parser_free (&self->hdr_parser);
}
static void
fcgi_request_push_params
(struct fcgi_request *self, const void *data, size_t len)
{
if (self->state != FCGI_REQUEST_PARAMS)
{
// TODO: probably reject the request
return;
}
if (len)
fcgi_nv_parser_push (&self->hdr_parser, data, len);
else
{
// TODO: probably check the state of the header parser
// TODO: request_start() can return false, end the request here?
self->handler_data = self->muxer->request_start_cb
(self->muxer->user_data, self);
self->state = FCGI_REQUEST_STDIN;
}
}
static void
fcgi_request_push_stdin
(struct fcgi_request *self, const void *data, size_t len)
{
if (self->state != FCGI_REQUEST_STDIN)
{
// TODO: probably reject the request
return;
}
self->muxer->request_push_cb (self->handler_data, data, len);
}
static void
fcgi_request_flush (struct fcgi_request *self)
{
if (!self->output_buffer.len)
return;
fcgi_muxer_send (self->muxer, FCGI_STDOUT, self->request_id,
self->output_buffer.str, self->output_buffer.len);
str_reset (&self->output_buffer);
}
static void
fcgi_request_write (struct fcgi_request *self, const void *data, size_t len)
{
// We're buffering the output and splitting it into messages
bool need_flush = true;
while (len)
{
size_t to_write = UINT16_MAX - self->output_buffer.len;
if (to_write > len)
{
to_write = len;
need_flush = false;
}
str_append_data (&self->output_buffer, data, to_write);
data = (uint8_t *) data + to_write;
len -= to_write;
if (need_flush)
fcgi_request_flush (self);
}
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
typedef void (*fcgi_muxer_handler_fn)
(struct fcgi_muxer *, const struct fcgi_parser *);
static void
fcgi_muxer_on_get_values
(struct fcgi_muxer *self, const struct fcgi_parser *parser)
{
struct str_map values; str_map_init (&values); values.free = free;
struct str_map response; str_map_init (&response); response.free = free;
struct fcgi_nv_parser nv_parser;
fcgi_nv_parser_init (&nv_parser);
nv_parser.output = &values;
fcgi_nv_parser_push (&nv_parser, parser->content.str, parser->content.len);
struct str_map_iter iter;
str_map_iter_init (&iter, &values);
while (str_map_iter_next (&iter))
{
const char *key = iter.link->key;
// TODO: if (!strcmp (key, FCGI_MAX_CONNS))
// TODO: if (!strcmp (key, FCGI_MAX_REQS))
if (!strcmp (key, FCGI_MPXS_CONNS))
str_map_set (&response, key, xstrdup ("1"));
}
struct str content;
str_init (&content);
fcgi_nv_convert (&response, &content);
fcgi_muxer_send (self, FCGI_GET_VALUES_RESULT, parser->request_id,
content.str, content.len);
str_free (&content);
str_map_free (&values);
str_map_free (&response);
}
static void
fcgi_muxer_on_begin_request
(struct fcgi_muxer *self, const struct fcgi_parser *parser)
{
struct msg_unpacker unpacker;
msg_unpacker_init (&unpacker, parser->content.str, parser->content.len);
uint16_t role;
uint8_t flags;
bool success = true;
success &= msg_unpacker_u16 (&unpacker, &role);
success &= msg_unpacker_u8 (&unpacker, &flags);
// Ignoring 5 reserved bytes
if (!success)
{
print_debug ("FastCGI: ignoring invalid %s message",
STRINGIFY (FCGI_BEGIN_REQUEST));
return;
}
if (role != FCGI_RESPONDER)
{
fcgi_muxer_send_end_request (self,
parser->request_id, 0, FCGI_UNKNOWN_ROLE);
return;
}
struct fcgi_request *request = self->requests[parser->request_id];
if (request)
{
// TODO: fail
return;
}
request = xcalloc (1, sizeof *request);
fcgi_request_init (request);
request->muxer = self;
request->request_id = parser->request_id;
request->flags = flags;
self->requests[parser->request_id] = request;
}
static void
fcgi_muxer_on_abort_request
(struct fcgi_muxer *self, const struct fcgi_parser *parser)
{
struct fcgi_request *request = self->requests[parser->request_id];
if (!request)
{
print_debug ("FastCGI: received %s for an unknown request",
STRINGIFY (FCGI_ABORT_REQUEST));
return;
}
// TODO: abort the request
}
static void
fcgi_muxer_on_params (struct fcgi_muxer *self, const struct fcgi_parser *parser)
{
struct fcgi_request *request = self->requests[parser->request_id];
if (!request)
{
print_debug ("FastCGI: received %s for an unknown request",
STRINGIFY (FCGI_PARAMS));
return;
}
fcgi_request_push_params (request,
parser->content.str, parser->content.len);
}
static void
fcgi_muxer_on_stdin (struct fcgi_muxer *self, const struct fcgi_parser *parser)
{
struct fcgi_request *request = self->requests[parser->request_id];
if (!request)
{
print_debug ("FastCGI: received %s for an unknown request",
STRINGIFY (FCGI_STDIN));
return;
}
fcgi_request_push_stdin (request,
parser->content.str, parser->content.len);
}
static void
fcgi_muxer_on_message (const struct fcgi_parser *parser, void *user_data)
{
struct fcgi_muxer *self = user_data;
// TODO
if (parser->version != FCGI_VERSION_1)
{
print_debug ("FastCGI: unsupported version %d", parser->version);
// TODO: also return false to stop processing on protocol error?
return;
}
static const fcgi_muxer_handler_fn handlers[] =
{
[FCGI_GET_VALUES] = fcgi_muxer_on_get_values,
[FCGI_BEGIN_REQUEST] = fcgi_muxer_on_begin_request,
[FCGI_ABORT_REQUEST] = fcgi_muxer_on_abort_request,
[FCGI_PARAMS] = fcgi_muxer_on_params,
[FCGI_STDIN] = fcgi_muxer_on_stdin,
};
fcgi_muxer_handler_fn handler;
if (parser->type >= N_ELEMENTS (handlers)
|| !(handler = handlers[parser->type]))
{
uint8_t content[8] = { parser->type };
fcgi_muxer_send (self, FCGI_UNKNOWN_TYPE, parser->request_id,
content, sizeof content);
return;
}
handler (self, parser);
}
static void
@ -649,6 +1010,7 @@ struct server_context
struct client *clients; ///< Clients
unsigned n_clients; ///< Current number of connections
struct request_handler *handlers; ///< Request handlers
struct str_map config; ///< Server configuration
};
@ -672,19 +1034,58 @@ server_context_free (struct server_context *self)
// --- JSON-RPC ----------------------------------------------------------------
// TODO: this is where we're actually supposed to do JSON-RPC 2.0 processing
#define JSON_RPC_ERROR_TABLE(XX) \
XX (-32700, PARSE_ERROR, "Parse error") \
XX (-32600, INVALID_REQUEST, "Invalid Request") \
XX (-32601, METHOD_NOT_FOUND, "Method not found") \
XX (-32602, INVALID_PARAMS, "Invalid params") \
XX (-32603, INTERNAL_ERROR, "Internal error")
// There's probably no reason to create an object for this.
//
// We probably just want a handler function that takes a JSON string, parses it,
// and returns back another JSON string.
//
// Then there should be another function that takes a parsed JSON request and
// returns back a JSON reply. This function may get called multiple times if
// the user sends a batch request.
enum json_rpc_error
{
#define XX(code, name, message) JSON_RPC_ERROR_ ## name,
JSON_RPC_ERROR_TABLE (XX)
#undef XX
JSON_RPC_ERROR_COUNT
};
// TODO: a function that queues up a ping over IRC: this has to be owned by the
// server context as a background job that removes itself upon completion.
static json_t *
json_rpc_error (enum json_rpc_error id, json_t *data)
{
#define XX(code, name, message) { code, message },
static const struct json_rpc_error
{
int code;
const char *message;
}
errors[JSON_RPC_ERROR_COUNT] =
{
JSON_RPC_ERROR_TABLE (XX)
};
#undef XX
json_t *error = json_object ();
json_object_set_new (error, "code", json_integer (errors[id].code));
json_object_set_new (error, "message", json_string (errors[id].message));
if (data)
json_object_set_new (error, "data", data);
return error;
}
static json_t *
json_rpc_response (json_t *id, json_t *result, json_t *error)
{
json_t *x = json_object ();
json_object_set_new (x, "jsonrpc", json_string ("2.0"));
json_object_set_new (x, "id", id ? id : json_null ());
if (result) json_object_set_new (x, "result", result);
if (error) json_object_set_new (x, "error", error);
return x;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static bool
try_advance (const char **p, const char *text)
@ -728,11 +1129,128 @@ validate_json_rpc_content_type (const char *type)
return !*type;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// TODO: a method that queues up a ping over IRC: this has to be owned by the
// server context as a background job that removes itself upon completion.
static json_t *
json_rpc_ping (struct server_context *ctx, json_t *params)
{
(void) ctx;
(void) params;
return json_rpc_response (NULL, json_string ("pong"), NULL);
}
static json_t *
process_json_rpc_request (struct server_context *ctx, json_t *request)
{
// TODO: takes a parsed JSON request and returns back a JSON reply.
// This function may get called multiple times for batch requests.
if (!json_is_object (request))
return json_rpc_response (NULL, NULL,
json_rpc_error (JSON_RPC_ERROR_INVALID_REQUEST, NULL));
json_t *v = json_object_get (request, "jsonrpc");
json_t *m = json_object_get (request, "method");
json_t *params = json_object_get (request, "params");
json_t *id = json_object_get (request, "id");
const char *version;
const char *method;
bool ok = true;
ok &= v && (version = json_string_value (v)) && !strcmp (version, "2.0");
ok &= m && (method = json_string_value (m));
ok &= !params || json_is_array (params) || json_is_object (params);
ok &= !id || json_is_null (id) ||
json_is_string (id) || json_is_number (id);
if (!ok)
return json_rpc_response (id, NULL,
json_rpc_error (JSON_RPC_ERROR_INVALID_REQUEST, NULL));
// TODO: add a more extensible mechanism
json_t *response = NULL;
if (!strcmp (method, "ping"))
response = json_rpc_ping (ctx, params);
else
return json_rpc_response (id, NULL,
json_rpc_error (JSON_RPC_ERROR_METHOD_NOT_FOUND, NULL));
if (id)
return response;
// Notifications don't get responses
// TODO: separate notifications from non-notifications?
json_decref (response);
return NULL;
}
static void
flush_json (json_t *json, struct str *output)
{
char *utf8 = json_dumps (json, JSON_ENCODE_ANY);
str_append (output, utf8);
free (utf8);
json_decref (json);
}
static void
process_json_rpc (struct server_context *ctx,
const void *data, size_t len, struct str *output)
{
json_error_t e;
json_t *request;
if (!(request = json_loadb (data, len, JSON_DECODE_ANY, &e)))
{
flush_json (json_rpc_response (NULL, NULL,
json_rpc_error (JSON_RPC_ERROR_PARSE_ERROR, NULL)),
output);
return;
}
if (json_is_array (request))
{
if (!json_array_size (request))
{
flush_json (json_rpc_response (NULL, NULL,
json_rpc_error (JSON_RPC_ERROR_INVALID_REQUEST, NULL)),
output);
return;
}
json_t *response = json_array ();
json_t *iter;
size_t i;
json_array_foreach (request, i, iter)
{
json_t *result = process_json_rpc_request (ctx, iter);
if (result)
json_array_append_new (response, result);
}
if (json_array_size (response))
flush_json (response, output);
else
json_decref (response);
}
else
{
json_t *result = process_json_rpc_request (ctx, request);
if (result)
flush_json (result, output);
}
}
// --- Requests ----------------------------------------------------------------
struct request
{
// TODO *ctx
struct server_context *ctx; ///< Server context
void *user_data; ///< User data argument for callbacks
@ -749,6 +1267,8 @@ struct request
struct request_handler
{
LIST_HEADER (struct request_handler)
/// Install ourselves as the handler for the request if applicable
bool (*try_handle) (struct request *request, struct str_map *headers);
@ -785,11 +1305,17 @@ request_finish (struct request *self)
static bool
request_start (struct request *self, struct str_map *headers)
{
bool handled = false;
// TODO: try request handlers registered in self->ctx
if (handled)
// TODO: can also be false
LIST_FOR_EACH (struct request_handler, handler, self->ctx->handlers)
if (handler->try_handle (self, headers))
{
// XXX: maybe we should isolate the handlers a bit more
self->handler = handler;
// TODO: we should also allow the "try_handle" function to
// return that it has already finished processing the request
// and we should abort it by returning false here.
return true;
}
// Unable to serve the request
struct str response;
@ -819,11 +1345,14 @@ request_handler_json_rpc_try_handle
const char *content_type = str_map_find (headers, "CONTENT_TYPE");
const char *method = str_map_find (headers, "REQUEST_METHOD");
if (strcmp (method, "POST")
|| !validate_json_rpc_content_type (content_type))
if (!method || strcmp (method, "POST")
|| !content_type || !validate_json_rpc_content_type (content_type))
return false;
// TODO: install the handler, perhaps construct an object
struct str *buf = xcalloc (1, sizeof *buf);
str_init (buf);
request->handler_data = buf;
return true;
}
@ -831,15 +1360,28 @@ static bool
request_handler_json_rpc_push
(struct request *request, const void *data, size_t len)
{
// TODO: append to a buffer
// TODO: len == 0: process the request
struct str *buf = request->handler_data;
if (len)
str_append_data (buf, data, len);
else
{
struct str response;
str_init (&response);
process_json_rpc (request->ctx, buf->str, buf->len, &response);
request->write_cb (request->user_data, response.str, response.len);
str_free (&response);
}
return true;
}
static void
request_handler_json_rpc_destroy (struct request *request)
{
// TODO
struct str *buf = request->handler_data;
str_free (buf);
free (buf);
request->handler_data = NULL;
}
struct request_handler g_request_handler_json_rpc =
@ -849,6 +1391,8 @@ struct request_handler g_request_handler_json_rpc =
.destroy_cb = request_handler_json_rpc_destroy,
};
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// TODO: another request handler to respond to all GETs with a message
// --- Client communication handlers -------------------------------------------
@ -927,17 +1471,86 @@ client_remove (struct client *client)
struct client_fcgi
{
struct fcgi_parser parser; ///< FastCGI stream parser
struct fcgi_muxer muxer; ///< FastCGI de/multiplexer
};
struct client_fcgi_request
{
struct fcgi_request *fcgi_request; ///< FastCGI request
struct request request; ///< Request
};
static void
client_fcgi_request_write (void *user_data, const void *data, size_t len)
{
struct client_fcgi_request *request = user_data;
fcgi_request_write (request->fcgi_request, data, len);
}
static void
client_fcgi_request_close (void *user_data)
{
struct client_fcgi_request *request = user_data;
// TODO: tell the fcgi_request to what?
}
static void *
client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request)
{
struct client *client = user_data;
struct client_fcgi_request *request = xmalloc (sizeof *request);
request->fcgi_request = fcgi_request;
request_init (&request->request);
request->request.ctx = client->ctx;
request->request.write_cb = client_fcgi_request_write;
request->request.close_cb = client_fcgi_request_close;
request->request.user_data = request;
return request;
}
static void
client_fcgi_request_push (void *handler_data, const void *data, size_t len)
{
struct client_fcgi_request *request = handler_data;
request_push (&request->request, data, len);
}
static void
client_fcgi_request_destroy (void *handler_data)
{
struct client_fcgi_request *request = handler_data;
request_free (&request->request);
free (handler_data);
}
static void
client_fcgi_write (void *user_data, const void *data, size_t len)
{
struct client *client = user_data;
client_write (client, data, len);
}
static void
client_fcgi_close (void *user_data)
{
struct client *client = user_data;
client_remove (client);
}
static void
client_fcgi_init (struct client *client)
{
struct client_fcgi *self = xcalloc (1, sizeof *self);
client->impl_data = self;
fcgi_parser_init (&self->parser);
// TODO: configure the parser
fcgi_muxer_init (&self->muxer);
self->muxer.write_cb = client_fcgi_write;
self->muxer.close_cb = client_fcgi_close;
self->muxer.request_start_cb = client_fcgi_request_start;
self->muxer.request_push_cb = client_fcgi_request_push;
self->muxer.request_destroy_cb = client_fcgi_request_destroy;
self->muxer.user_data = client;
}
static void
@ -946,7 +1559,7 @@ client_fcgi_destroy (struct client *client)
struct client_fcgi *self = client->impl_data;
client->impl_data = NULL;
fcgi_parser_free (&self->parser);
fcgi_muxer_free (&self->muxer);
free (self);
}
@ -954,7 +1567,7 @@ static bool
client_fcgi_push (struct client *client, const void *data, size_t len)
{
struct client_fcgi *self = client->impl_data;
fcgi_parser_push (&self->parser, data, len);
fcgi_muxer_push (&self->muxer, data, len);
return true;
}
@ -1015,6 +1628,7 @@ client_scgi_init (struct client *client)
client->impl_data = self;
request_init (&self->request);
self->request.ctx = client->ctx;
self->request.write_cb = client_scgi_write;
self->request.close_cb = client_scgi_close;
self->request.user_data = client;
@ -1426,6 +2040,8 @@ main (int argc, char *argv[])
(void) signal (SIGPIPE, SIG_IGN);
LIST_PREPEND (ctx.handlers, &g_request_handler_json_rpc);
if (!parse_config (&ctx, &e)
|| !setup_listen_fds (&ctx, &e))
{