json-rpc-test-server: add a simple co-process mode

A disgusting copy-paste but it will have to do for now.

Closes #6
This commit is contained in:
Přemysl Eric Janouch 2020-10-15 02:11:51 +02:00
parent b3c377afdb
commit 75b2094cdd
Signed by: p
GPG Key ID: A0420B94F92B9493
3 changed files with 179 additions and 7 deletions

View File

@ -65,8 +65,8 @@ Test server
----------- -----------
If you install development packages for libmagic, an included test server will If you install development packages for libmagic, an included test server will
be built but not installed which provides a trivial JSON-RPC 2.0 service with be built but not installed which provides a trivial JSON-RPC 2.0 service with
FastCGI, SCGI, and WebSocket interfaces. It responds to `ping` and `date` FastCGI, SCGI, WebSocket and LSP-like co-process interfaces. It responds to
methods and it can serve static files. `ping` and `date`, supports OpenRPC discovery and it can serve static files.
Contributing and Support Contributing and Support
------------------------ ------------------------

View File

@ -2623,7 +2623,7 @@ static const http_parser_settings backend_co_http_settings =
}; };
static bool static bool
backend_co_write_starter (struct co_context *self, struct error **e) backend_co_inject_starter (struct co_context *self, struct error **e)
{ {
// The default "Connection: keep-alive" maps well here. // The default "Connection: keep-alive" maps well here.
// We cannot feed this line into the parser from within callbacks. // We cannot feed this line into the parser from within callbacks.
@ -2653,7 +2653,7 @@ backend_co_parse (struct co_context *self, const char *data, size_t len,
if (self->pending_fake_starter) if (self->pending_fake_starter)
{ {
self->pending_fake_starter = false; self->pending_fake_starter = false;
if (!backend_co_write_starter (self, e)) if (!backend_co_inject_starter (self, e))
return false; return false;
} }

View File

@ -1581,7 +1581,6 @@ static void
process_json_rpc (struct server_context *ctx, process_json_rpc (struct server_context *ctx,
const void *data, size_t len, struct str *output) const void *data, size_t len, struct str *output)
{ {
json_error_t e; json_error_t e;
json_t *request; json_t *request;
if (!(request = json_loadb (data, len, JSON_DECODE_ANY, &e))) if (!(request = json_loadb (data, len, JSON_DECODE_ANY, &e)))
@ -2551,6 +2550,165 @@ client_ws_create (EV_P_ int sock_fd)
return &self->client; return &self->client;
} }
// --- Co-process client -------------------------------------------------------
// This is mostly copied over from json-rpc-shell.c, only a bit simplified.
// We're giving up on header parsing in order to keep this small.
struct co_context
{
struct server_context *ctx; ///< Server context
struct str message; ///< Message data
struct http_parser parser; ///< HTTP parser
bool pending_fake_starter; ///< Start of message?
};
static int
client_co_on_message_begin (http_parser *parser)
{
struct co_context *self = parser->data;
str_reset (&self->message);
return 0;
}
static int
client_co_on_body (http_parser *parser, const char *at, size_t len)
{
struct co_context *self = parser->data;
str_append_data (&self->message, at, len);
return 0;
}
static int
client_co_on_message_complete (http_parser *parser)
{
struct co_context *self = parser->data;
http_parser_pause (&self->parser, true);
return 0;
}
// The LSP incorporates a very thin subset of RFC 822, and it so happens
// that we may simply reuse the full HTTP parser here, with a small hack.
static const http_parser_settings client_co_http_settings =
{
.on_message_begin = client_co_on_message_begin,
.on_body = client_co_on_body,
.on_message_complete = client_co_on_message_complete,
};
static void
client_co_respond (const struct str *buf)
{
struct str wrapped = str_make();
str_append_printf (&wrapped,
"Content-Length: %zu\r\n"
"Content-Type: application/json; charset=utf-8\r\n"
"\r\n", buf->len);
str_append_data (&wrapped, buf->str, buf->len);
if (write (STDOUT_FILENO, wrapped.str, wrapped.len)
!= (ssize_t) wrapped.len)
exit_fatal ("write: %s", strerror (errno));
str_free (&wrapped);
}
static void
client_co_inject_starter (struct co_context *self)
{
// The default "Connection: keep-alive" maps well here.
// We cannot feed this line into the parser from within callbacks.
static const char starter[] = "POST / HTTP/1.1\r\n";
http_parser_pause (&self->parser, false);
size_t n_parsed = http_parser_execute (&self->parser,
&client_co_http_settings, starter, sizeof starter - 1);
enum http_errno err = HTTP_PARSER_ERRNO (&self->parser);
if (n_parsed != sizeof starter - 1 || err != HPE_OK)
exit_fatal ("protocol failure: %s", http_errno_description (err));
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void
client_co_process (struct co_context *self)
{
struct str *message = &self->message;
struct str response = str_make ();
process_json_rpc (self->ctx, message->str, message->len, &response);
if (response.len)
client_co_respond (&response);
str_free (&response);
}
static void
backend_co_parse (struct co_context *self, const char *data, size_t len,
size_t *n_parsed)
{
if (self->pending_fake_starter)
{
self->pending_fake_starter = false;
client_co_inject_starter (self);
}
*n_parsed = http_parser_execute
(&self->parser, &client_co_http_settings, data, len);
if (self->parser.upgrade)
exit_fatal ("protocol failure: %s", "unsupported upgrade attempt");
enum http_errno err = HTTP_PARSER_ERRNO (&self->parser);
if (err == HPE_PAUSED)
{
self->pending_fake_starter = true;
client_co_process (self);
}
else if (err != HPE_OK)
exit_fatal ("protocol failure: %s", http_errno_description (err));
}
static void
backend_co_on_data (struct co_context *self, const char *data, size_t len)
{
size_t n_parsed = 0;
do
{
backend_co_parse (self, data, len, &n_parsed);
data += n_parsed;
}
while ((len -= n_parsed));
}
static void
client_co_run (struct server_context *ctx)
{
struct co_context self = {};
self.ctx = ctx;
self.message = str_make ();
http_parser_init (&self.parser, HTTP_REQUEST);
self.parser.data = &self;
self.pending_fake_starter = true;
hard_assert (set_blocking (STDIN_FILENO, false));
struct str buf = str_make ();
struct pollfd pfd = { .fd = STDIN_FILENO, .events = POLLIN };
while (true)
{
if (poll (&pfd, 1, -1) <= 0)
exit_fatal ("poll: %s", strerror (errno));
str_remove_slice (&buf, 0, buf.len);
enum socket_io_result result = socket_io_try_read (pfd.fd, &buf);
int errno_saved = errno;
if (buf.len)
backend_co_on_data (&self, buf.str, buf.len);
if (result == SOCKET_IO_ERROR)
exit_fatal ("read: %s", strerror (errno_saved));
if (result == SOCKET_IO_EOF)
break;
}
str_free (&buf);
str_free (&self.message);
}
// --- Basic server stuff ------------------------------------------------------ // --- Basic server stuff ------------------------------------------------------
typedef struct client *(*client_create_fn) (EV_P_ int sock_fd); typedef struct client *(*client_create_fn) (EV_P_ int sock_fd);
@ -2914,11 +3072,12 @@ daemonize (struct server_context *ctx)
} }
static void static void
parse_program_arguments (int argc, char **argv) parse_program_arguments (int argc, char **argv, bool *running_as_slave)
{ {
static const struct opt opts[] = static const struct opt opts[] =
{ {
{ 't', "test", NULL, 0, "self-test" }, { 't', "test", NULL, 0, "self-test" },
{ 's', "slave", NULL, 0, "co-process mode" },
{ 'd', "debug", NULL, 0, "run in debug mode" }, { 'd', "debug", NULL, 0, "run in debug mode" },
{ 'h', "help", NULL, 0, "display this help and exit" }, { 'h', "help", NULL, 0, "display this help and exit" },
{ 'V', "version", NULL, 0, "output version information and exit" }, { 'V', "version", NULL, 0, "output version information and exit" },
@ -2938,6 +3097,9 @@ parse_program_arguments (int argc, char **argv)
case 't': case 't':
test_main (argc, argv); test_main (argc, argv);
exit (EXIT_SUCCESS); exit (EXIT_SUCCESS);
case 's':
*running_as_slave = true;
break;
case 'd': case 'd':
g_debug_mode = true; g_debug_mode = true;
break; break;
@ -2970,7 +3132,8 @@ parse_program_arguments (int argc, char **argv)
int int
main (int argc, char *argv[]) main (int argc, char *argv[])
{ {
parse_program_arguments (argc, argv); bool running_as_a_slave = false;
parse_program_arguments (argc, argv, &running_as_a_slave);
print_status (PROGRAM_NAME " " PROGRAM_VERSION " starting"); print_status (PROGRAM_NAME " " PROGRAM_VERSION " starting");
@ -2985,6 +3148,15 @@ main (int argc, char *argv[])
exit (EXIT_FAILURE); exit (EXIT_FAILURE);
} }
// There's a lot of unnecessary left-over scaffolding in this program,
// for testing purposes assume that everything is synchronous
if (running_as_a_slave)
{
client_co_run (&ctx);
server_context_free (&ctx);
return EXIT_SUCCESS;
}
struct ev_loop *loop; struct ev_loop *loop;
if (!(loop = EV_DEFAULT)) if (!(loop = EV_DEFAULT))
exit_fatal ("libev initialization failed"); exit_fatal ("libev initialization failed");