Add a backend for co-processes

Targets language servers.

In this first stage, we don't need to support bi-directionality,
although it's a requirement for finishing this task.

Updates #4
This commit is contained in:
Přemysl Eric Janouch 2020-10-14 00:05:03 +02:00
parent dfe814316f
commit 23c728e535
Signed by: p
GPG Key ID: A0420B94F92B9493
3 changed files with 439 additions and 11 deletions

View File

@ -19,6 +19,7 @@ the following niceties:
results in your favourite editor or redirect them to a file
- ability to edit the input line in your favourite editor as well with Alt+E
- WebSocket (RFC 6455) can also be used as a transport rather than HTTP
- even Language Server Protocol servers may be launched as a slave command
- support for method name tab completion using OpenRPC discovery or file input
Documentation

View File

@ -10,7 +10,7 @@ json-rpc-shell - a shell for JSON-RPC 2.0
Synopsis
--------
*json-rpc-shell* [_OPTION_]... _ENDPOINT_
*json-rpc-shell* [_OPTION_]... { _ENDPOINT_ | _COMMAND_ [_ARG_]... }
Description
-----------
@ -80,6 +80,11 @@ Protocol
Call "rpc.discover" upon start-up in order to pull in OpenRPC data for
tab completion of method names. If a path is given, it is read from a file.
*-e*, *--execute*::
Rather than an _ENDPOINT_, accept a command line to execute and communicate
with using the JSON-RPC 2.0 protocol variation used in the Language Server
Protocol.
Program information
~~~~~~~~~~~~~~~~~~~
*-h*, *--help*::

View File

@ -1087,6 +1087,10 @@ static struct app_context
char *editor_filename; ///< File for input line editor
struct str_map methods; ///< Methods detected via OpenRPC
bool awaiting; ///< Running a separate loop to wait?
struct error *await_error; ///< Error while waiting for event
struct str *await_response; ///< Buffer for a response to a message
struct config config; ///< Program configuration
enum color_mode color_mode; ///< Colour output mode
bool compact; ///< Whether to not pretty print
@ -1125,6 +1129,9 @@ struct backend_vtable
const char *request, bool expect_content,
struct str *buf, struct error **e);
/// See if the child belongs to the backend and process the signal
bool (*on_child) (struct backend *backend, pid_t pid, int status);
/// Do everything necessary to deal with ev_break(EVBREAK_ALL)
void (*on_quit) (struct backend *backend);
@ -1132,6 +1139,73 @@ struct backend_vtable
void (*destroy) (struct backend *backend);
};
// --- Asynchronous results ----------------------------------------------------
static bool
await (struct app_context *ctx, struct str *buf, struct error **e)
{
hard_assert (!ctx->awaiting);
// Run an event loop to retrieve the response
ctx->await_response = buf;
ctx->awaiting = true;
ev_run (EV_DEFAULT_ 0);
ctx->awaiting = false;
ctx->await_response = NULL;
if (ctx->await_error)
{
error_propagate (e, ctx->await_error);
ctx->await_error = NULL;
return false;
}
return true;
}
static int normalize_whitespace (int c) { return isspace_ascii (c) ? ' ' : c; }
/// Caller guarantees that data[len] is a NUL byte (because of iconv_xstrdup())
static void
await_try_finish (struct app_context *ctx, const char *data, size_t len)
{
// There is no buffer while connecting and after we obtain our result
if (data && !ctx->await_response)
{
char *s = iconv_xstrdup (ctx->term_from_utf8,
(char *) data, len + 1 /* null byte */, NULL);
// Does not affect JSON and ensures the message is printed out okay
cstr_transform (s, normalize_whitespace);
print_warning ("unexpected message received: %s", s);
free (s);
return;
}
if (data && ctx->await_response)
{
str_append_data (ctx->await_response, data, len);
ctx->await_response = NULL;
}
// Here we need to be very careful to not return from too many levels
if (ctx->awaiting)
ev_break (EV_DEFAULT_ EVBREAK_ONE);
}
static void
await_try_cancel (struct app_context *ctx)
{
if (!ctx->awaiting)
return;
ctx->await_response = NULL;
if (!ctx->await_error)
error_set (&ctx->await_error, "unexpected connection close");
ev_break (EV_DEFAULT_ EVBREAK_ONE);
}
// --- Configuration -----------------------------------------------------------
static void on_config_attribute_change (struct config_item *item);
@ -2114,8 +2188,6 @@ backend_ws_on_control_frame
return true;
}
static int normalize_whitespace (int c) { return isspace_ascii (c) ? ' ' : c; }
/// Caller guarantees that data[len] is a NUL byte (because of iconv_xstrdup())
static bool
backend_ws_on_message (struct ws_context *self,
@ -2577,6 +2649,328 @@ backend_curl_new (struct app_context *ctx, const char *endpoint)
return &self->super;
}
// --- Co-process backend ------------------------------------------------------
struct co_context
{
struct backend super; ///< Parent class
struct app_context *ctx; ///< Application context
pid_t child; ///< The co-process or -1
int socket; ///< Our end of the socketpair
int stderr_fd; ///< stderr read end
struct str stderr_buffer; ///< stderr buffer
ev_io stderr_watcher; ///< stderr watcher
ev_io socket_watcher; ///< Socketpair watcher
struct http_parserpp http; ///< HTTP parser
bool pending_fake_starter; ///< Start of message?
};
static int
backend_co_on_message_complete (http_parser *parser)
{
struct http_parserpp *self = parser->data;
http_parser_pause (&self->parser, true);
return 0;
}
// The LSP incorporates a very thin subset of RFC 822, and it so happens
// that we may simply reuse the full HTTP parser here, with a small hack.
static const http_parser_settings backend_co_http_settings =
{
.on_header_field = http_parserpp_on_header_field,
.on_header_value = http_parserpp_on_header_value,
.on_headers_complete = http_parserpp_on_headers_complete,
// TODO: check Content-Type early?
.on_message_begin = http_parserpp_on_message_begin,
.on_body = http_parserpp_on_body,
.on_message_complete = backend_co_on_message_complete,
};
static bool
backend_co_write_starter (struct co_context *self, struct error **e)
{
// The default "Connection: keep-alive" maps well here.
// We cannot feed this line into the parser from within callbacks.
static const char starter[] = "POST / HTTP/1.1\r\n";
http_parser_pause (&self->http.parser, false);
size_t n_parsed = http_parser_execute (&self->http.parser,
&backend_co_http_settings, starter, sizeof starter - 1);
enum http_errno err = HTTP_PARSER_ERRNO (&self->http.parser);
if (n_parsed != sizeof starter - 1 || err != HPE_OK)
FAIL ("protocol failure: %s", http_errno_description (err));
return true;
}
static void
backend_co_process (struct co_context *self)
{
// TODO: verify Content-Type in the headers, though tricky and optional
struct str *message = &self->http.message;
await_try_finish (self->ctx, message->str, message->len);
}
static bool
backend_co_parse (struct co_context *self, const char *data, size_t len,
size_t *n_parsed, struct error **e)
{
if (self->pending_fake_starter)
{
self->pending_fake_starter = false;
if (!backend_co_write_starter (self, e))
return false;
}
*n_parsed = http_parser_execute
(&self->http.parser, &backend_co_http_settings, data, len);
if (self->http.parser.upgrade)
FAIL ("protocol failure: %s", "unsupported upgrade attempt");
enum http_errno err = HTTP_PARSER_ERRNO (&self->http.parser);
if (err == HPE_PAUSED)
{
self->pending_fake_starter = true;
backend_co_process (self);
}
else if (err != HPE_OK)
FAIL ("protocol failure: %s", http_errno_description (err));
return true;
}
static bool
backend_co_on_data (struct co_context *self, const char *data, size_t len,
struct error **e)
{
size_t n_parsed = 0;
while (backend_co_parse (self, data, len, &n_parsed, e))
{
data += n_parsed;
if (!(len -= n_parsed))
return true;
}
return false;
}
static void
backend_co_on_socket_ready (EV_P_ ev_io *handle, int revents)
{
(void) loop;
(void) revents;
struct co_context *self = handle->data;
char buf[BUFSIZ];
restart:
// Try to read some data in a non-blocking manner
(void) set_blocking (handle->fd, false);
ssize_t n_read = read (handle->fd, buf, sizeof buf);
int errno_saved = errno;
(void) set_blocking (handle->fd, true);
errno = errno_saved;
struct error *e = NULL;
if (n_read < 0)
{
if (errno == EAGAIN)
return;
if (errno == EINTR)
goto restart;
print_error ("reading from the command failed: %s", strerror (errno));
}
else if (!backend_co_on_data (self, buf, n_read, &e))
{
print_error ("%s", e->message);
error_free (e);
}
else if (!n_read)
print_status ("the command has closed the connection");
else
goto restart;
ev_io_stop (EV_A_ handle);
// That would have no way of succeeding
await_try_cancel (self->ctx);
}
static void
backend_co_on_err_ready (EV_P_ ev_io *handle, int revents)
{
(void) revents;
struct co_context *self = handle->data;
struct str *buf = &self->stderr_buffer;
enum socket_io_result result = socket_io_try_read (handle->fd, buf);
char *p;
while ((p = strchr (buf->str, '\n')))
{
*p = 0;
print_status ("stderr: %s", buf->str);
str_remove_slice (buf, 0, p - buf->str + 1);
}
switch (result)
{
case SOCKET_IO_EOF:
print_debug ("the command has closed its stderr");
break;
case SOCKET_IO_OK:
return;
case SOCKET_IO_ERROR:
print_warning ("cannot read stderr: %s", strerror (errno));
}
ev_io_stop (EV_A_ handle);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// TODO: do we want to go for synchronous writes, as with the WebSocket backend?
// We can postpone it for later.
static bool
backend_co_make_call (struct backend *backend,
const char *request, bool expect_content, struct str *buf, struct error **e)
{
struct co_context *self = (struct co_context *) backend;
struct str wrapped = str_make();
str_append_printf (&wrapped,
"Content-Length: %zu\r\n"
"Content-Type: application/json; charset=utf-8\r\n"
"\r\n%s", strlen (request), request);
enum socket_io_result result = socket_io_try_write (self->socket, &wrapped);
if (result == SOCKET_IO_ERROR)
{
str_free (&wrapped);
ev_io_stop (EV_DEFAULT_ &self->socket_watcher);
FAIL ("writing to the command failed: %s", strerror (errno));
}
else if (wrapped.len)
print_error ("internal error, partial write to command");
str_free (&wrapped);
return !expect_content || await (self->ctx, buf, e);
}
static bool
backend_co_on_child (struct backend *backend, pid_t pid, int status)
{
struct co_context *self = (struct co_context *) backend;
if (pid != self->child)
return false;
if (WIFSTOPPED (status))
print_warning ("the command has been stopped");
else if (WIFCONTINUED (status))
print_warning ("the command has been resumed");
else
{
if (WIFEXITED (status))
print_error ("the command has exited with status %d",
WEXITSTATUS (status));
else
print_error ("the command has died from signal %d",
WTERMSIG (status));
self->child = -1;
// Wait for the file descriptor to close, it may still contain data
}
return true;
}
static void
backend_co_destroy (struct backend *backend)
{
struct co_context *self = (struct co_context *) backend;
str_free (&self->stderr_buffer);
http_parserpp_free (&self->http);
if (self->socket != -1)
xclose (self->socket);
if (self->stderr_fd != -1)
xclose (self->stderr_fd);
if (self->child != -1)
(void) kill (self->child, SIGKILL);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static struct backend_vtable backend_co_vtable =
{
.add_header = NULL,
.make_call = backend_co_make_call,
.on_child = backend_co_on_child,
.destroy = backend_co_destroy,
};
static struct backend *
backend_co_new (struct app_context *ctx, char **argv)
{
struct co_context *self = xcalloc (1, sizeof *self);
self->super.vtable = &backend_co_vtable;
self->ctx = ctx;
enum { OURS, THEIRS };
int pair[2] = { -1, -1 }, err[2] = { -1, -1 };
if (socketpair (AF_UNIX, SOCK_STREAM, 0, pair))
exit_fatal ("socketpair: %s", strerror (errno));
if (pipe (err))
exit_fatal ("pipe: %s", strerror (errno));
set_cloexec ((self->socket = pair[OURS]));
set_cloexec (pair[THEIRS]);
set_cloexec ((self->stderr_fd = err[OURS]));
set_cloexec (err[THEIRS]);
// It runs in our own progress group, so it gets SIGINTed with us
switch ((self->child = fork ()))
{
case -1:
exit_fatal ("fork: %s", strerror (errno));
case 0:
dup2 (pair[THEIRS], STDIN_FILENO);
dup2 (pair[THEIRS], STDOUT_FILENO);
dup2 (err[THEIRS], STDERR_FILENO);
// Undo what we've done in init_watchers()
signal (SIGPIPE, SIG_DFL);
signal (SIGTTOU, SIG_DFL);
execvp (argv[0], argv);
// stderr has been redirected, this won't cause a SIGTTOU
print_error ("execv: %s", strerror (errno));
_exit (EXIT_FAILURE);
default:
xclose (pair[THEIRS]);
xclose (err[THEIRS]);
}
ev_io_init (&self->socket_watcher,
backend_co_on_socket_ready, self->socket, EV_READ);
self->socket_watcher.data = self;
ev_io_start (EV_DEFAULT_ &self->socket_watcher);
set_blocking (self->stderr_fd, false);
self->stderr_buffer = str_make ();
ev_io_init (&self->stderr_watcher,
backend_co_on_err_ready, self->stderr_fd, EV_READ);
self->stderr_watcher.data = self;
ev_io_start (EV_DEFAULT_ &self->stderr_watcher);
http_parserpp_init (&self->http, HTTP_REQUEST);
self->pending_fake_starter = true;
return &self->super;
}
// --- JSON tokenizer ----------------------------------------------------------
// A dumb JSON tokenizer intended strictly just for syntax highlighting
@ -2877,6 +3271,8 @@ quit (struct app_context *ctx)
{
if (ctx->backend->vtable->on_quit)
ctx->backend->vtable->on_quit (ctx->backend);
if (ctx->awaiting && !ctx->await_error)
error_set (&ctx->await_error, "aborted by user");
ev_break (EV_DEFAULT_ EVBREAK_ALL);
ctx->input->vtable->hide (ctx->input);
@ -3076,6 +3472,14 @@ static struct error *
json_rpc_call_raw (struct app_context *ctx,
const char *method, json_t *id, json_t *params, struct str *buf)
{
struct error *error = NULL;
if (ctx->awaiting)
{
// Only allow recursing once below, awaiting is not re-entrant
error_set (&error, "busy");
return error;
}
json_t *request = json_object ();
json_object_set_new (request, "jsonrpc", json_string ("2.0"));
json_object_set_new (request, "method", json_string (method));
@ -3088,11 +3492,9 @@ json_rpc_call_raw (struct app_context *ctx,
maybe_print_verbose (ctx, ATTR_OUTGOING, req_utf8, -1);
struct error *error = NULL;
ctx->backend->vtable->make_call (ctx->backend, req_utf8,
id != NULL /* expect_content */, buf, &error);
free (req_utf8);
if (error)
return error;
@ -3460,6 +3862,11 @@ on_child (EV_P_ ev_child *handle, int revents)
(void) revents;
struct app_context *ctx = ev_userdata (loop);
if (ctx->backend->vtable->on_child
&& ctx->backend->vtable->on_child (ctx->backend,
handle->rpid, handle->rstatus))
return;
// I am not a shell, stopping not allowed
int status = handle->rstatus;
if (WIFSTOPPED (status)
@ -3570,6 +3977,7 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
{ 'c', "compact-output", NULL, 0, "do not pretty-print responses" },
{ 'C', "color", "WHEN", OPT_LONG_ONLY,
"colorize output: never, always, or auto" },
{ 'e', "execute", NULL, 0, "launch a command to act as a server" },
{ 'n', "null-as-id", NULL, 0, "JSON null is used as an `id'" },
{ 'o', "origin", "O", 0, "set the HTTP Origin header" },
// So far you have to explicitly enable this rather than disable
@ -3587,9 +3995,10 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
};
struct opt_handler oh = opt_handler_make (argc, argv, opts,
"ENDPOINT", "A shell for JSON-RPC 2.0.");
"{ ENDPOINT | COMMAND [ARG]... }", "A shell for JSON-RPC 2.0.");
int c;
bool run_command = false;
while ((c = opt_handler_get (&oh)) != -1)
switch (c)
{
@ -3606,6 +4015,7 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
case 'o': *origin = optarg; break;
case 'O': *openrpc = optarg ? optarg : ""; break;
case 'e': run_command = true; break;
case 'n': ctx->null_as_id = true; break;
case 'c': ctx->compact = true; break;
case 't': ctx->trust_all = true; break;
@ -3637,19 +4047,29 @@ parse_program_arguments (struct app_context *ctx, int argc, char **argv,
argc -= optind;
argv += optind;
if (argc != 1)
if (run_command && argc >= 1)
*endpoint = NULL;
else if (argc == 1)
*endpoint = argv[0];
else
{
opt_handler_usage (&oh, stderr);
exit (EXIT_FAILURE);
}
*endpoint = argv[0];
opt_handler_free (&oh);
}
static void
init_backend (struct app_context *ctx, const char *origin, const char *endpoint)
init_backend (struct app_context *ctx, const char *origin, const char *endpoint,
char **argv)
{
if (!endpoint)
{
// There is no point in passing the Origin to a co-process
ctx->backend = backend_co_new (ctx, argv);
return;
}
struct http_parser_url url;
if (http_parser_parse_url (endpoint, strlen (endpoint), false, &url))
exit_fatal ("invalid endpoint address");
@ -3688,6 +4108,8 @@ main (int argc, char *argv[])
const char *origin = NULL, *endpoint = NULL, *openrpc = NULL;
parse_program_arguments (&g_ctx, argc, argv, &origin, &endpoint, &openrpc);
argc -= optind;
argv += optind;
g_ctx.input = input_new ();
g_ctx.input->user_data = &g_ctx;
@ -3698,7 +4120,7 @@ main (int argc, char *argv[])
g_ctx.methods = str_map_make (NULL);
init_colors (&g_ctx);
load_configuration (&g_ctx);
init_backend (&g_ctx, origin, endpoint);
init_backend (&g_ctx, origin, endpoint, argv);
// We only need to convert to and from the terminal encoding
setlocale (LC_CTYPE, "");