@ -1,7 +1,7 @@
/*
* json - rpc - test - server . c : JSON - RPC 2.0 demo server
*
* Copyright ( c ) 2015 - 202 0 , P ř emysl Eric Janouch < p @ janouch . name >
* Copyright ( c ) 2015 - 202 2 , P ř emysl Eric Janouch < p @ janouch . name >
*
* Permission to use , copy , modify , and / or distribute this software for any
* purpose with or without fee is hereby granted .
@ -525,11 +525,11 @@ fcgi_muxer_push (struct fcgi_muxer *self, const void *data, size_t len)
}
/// @}
// --- WebSocket s --------------------------------------------------------------
/// @defgroup WebSocket s
// --- WebSocket - --------------------------------------------------------------
/// @defgroup WebSocket
/// @{
// WebSocket s are n't CGI-compatible, therefore we must handle the initial HTTP
// WebSocket is n't CGI-compatible, therefore we must handle the initial HTTP
// handshake ourselves. Luckily it's not too much of a bother with http-parser.
// Typically there will be a normal HTTP server in front of us, proxying the
// requests based on the URI.
@ -537,7 +537,7 @@ fcgi_muxer_push (struct fcgi_muxer *self, const void *data, size_t len)
enum ws_handler_state
{
WS_HANDLER_CONNECTING , ///< Parsing HTTP
WS_HANDLER_OPEN , ///< Parsing WebSocket s frames
WS_HANDLER_OPEN , ///< Parsing WebSocket frames
WS_HANDLER_CLOSING , ///< Partial closure by us
WS_HANDLER_FLUSHING , ///< Just waiting for client EOF
WS_HANDLER_CLOSED ///< Dead, both sides closed
@ -851,6 +851,17 @@ ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents)
self - > close_cb ( self , false /* half_close */ ) ;
}
static bool ws_handler_fail_handshake ( struct ws_handler * self ,
const char * status , . . . ) ATTRIBUTE_SENTINEL ;
# define HTTP_101_SWITCHING_PROTOCOLS "101 Switching Protocols"
# define HTTP_400_BAD_REQUEST "400 Bad Request"
# define HTTP_405_METHOD_NOT_ALLOWED "405 Method Not Allowed"
# define HTTP_408_REQUEST_TIMEOUT "408 Request Timeout"
# define HTTP_417_EXPECTATION_FAILED "407 Expectation Failed"
# define HTTP_426_UPGRADE_REQUIRED "426 Upgrade Required"
# define HTTP_505_VERSION_NOT_SUPPORTED "505 HTTP Version Not Supported"
static void
ws_handler_on_handshake_timeout ( EV_P_ ev_timer * watcher , int revents )
{
@ -858,13 +869,7 @@ ws_handler_on_handshake_timeout (EV_P_ ev_timer *watcher, int revents)
( void ) revents ;
struct ws_handler * self = watcher - > data ;
// XXX: this is a no-op, since this currently doesn't even call shutdown
// immediately but postpones it until later
self - > close_cb ( self , true /* half_close */ ) ;
self - > state = WS_HANDLER_FLUSHING ;
if ( self - > on_close )
self - > on_close ( self , WS_STATUS_ABNORMAL_CLOSURE , " handshake timeout " ) ;
ws_handler_fail_handshake ( self , HTTP_408_REQUEST_TIMEOUT , NULL ) ;
self - > state = WS_HANDLER_CLOSED ;
self - > close_cb ( self , false /* half_close */ ) ;
@ -1003,9 +1008,10 @@ ws_handler_on_headers_complete (http_parser *parser)
if ( self - > have_header_value )
ws_handler_on_header_read ( self ) ;
// We strictly require a protocol upgrade
// We require a protocol upgrade. 1 is for "skip body", 2 is the same
// + "stop processing", return another number to indicate a problem here.
if ( ! parser - > upgrade )
return 2 ;
return 3 ;
return 0 ;
}
@ -1018,13 +1024,6 @@ ws_handler_on_url (http_parser *parser, const char *at, size_t len)
return 0 ;
}
# define HTTP_101_SWITCHING_PROTOCOLS "101 Switching Protocols"
# define HTTP_400_BAD_REQUEST "400 Bad Request"
# define HTTP_405_METHOD_NOT_ALLOWED "405 Method Not Allowed"
# define HTTP_417_EXPECTATION_FAILED "407 Expectation Failed"
# define HTTP_426_UPGRADE_REQUIRED "426 Upgrade Required"
# define HTTP_505_VERSION_NOT_SUPPORTED "505 HTTP Version Not Supported"
static void
ws_handler_http_responsev ( struct ws_handler * self ,
const char * status , char * const * fields )
@ -1066,6 +1065,7 @@ ws_handler_fail_handshake (struct ws_handler *self, const char *status, ...)
struct strv v = strv_make ( ) ;
while ( ( s = va_arg ( ap , const char * ) ) )
strv_append ( & v , s ) ;
strv_append ( & v , " Connection: close " ) ;
va_end ( ap ) ;
ws_handler_http_responsev ( self , status , v . vector ) ;
@ -1110,7 +1110,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
if ( ! connection | | strcasecmp_ascii ( connection , " Upgrade " ) )
FAIL_HANDSHAKE ( HTTP_400_BAD_REQUEST ) ;
// Check if we can actually upgrade the protocol to WebSocket s
// Check if we can actually upgrade the protocol to WebSocket
const char * upgrade = str_map_find ( & self - > headers , " Upgrade " ) ;
struct http_protocol * offered_upgrades = NULL ;
bool can_upgrade = false ;
@ -1268,11 +1268,13 @@ ws_handler_push (struct ws_handler *self, const void *data, size_t len)
ev_timer_stop ( EV_DEFAULT_ & self - > handshake_timeout_watcher ) ;
if ( err = = HPE_CB_headers_complete )
{
print_debug ( " WS handshake failed: %s " , " missing `Upgrade' field " ) ;
else
print_debug ( " WS handshake failed: %s " ,
http_errno_description ( err ) ) ;
FAIL_HANDSHAKE ( HTTP_426_UPGRADE_REQUIRED ,
" Upgrade: websocket " , SEC_WS_VERSION " : 13 " ) ;
}
print_debug ( " WS handshake failed: %s " , http_errno_description ( err ) ) ;
FAIL_HANDSHAKE ( HTTP_400_BAD_REQUEST ) ;
}
return true ;
@ -1286,7 +1288,7 @@ static struct simple_config_item g_config_table[] =
{ " bind_host " , NULL , " Address of the server " } ,
{ " port_fastcgi " , " 9000 " , " Port to bind for FastCGI " } ,
{ " port_scgi " , NULL , " Port to bind for SCGI " } ,
{ " port_ws " , NULL , " Port to bind for WebSocket s " } ,
{ " port_ws " , NULL , " Port to bind for WebSocket " } ,
{ " pid_file " , NULL , " Full path for the PID file " } ,
// XXX: here belongs something like a web SPA that interfaces with us
{ " static_root " , NULL , " The root for static content " } ,
@ -1446,6 +1448,39 @@ json_rpc_handler_info_cmp (const void *first, const void *second)
( ( struct json_rpc_handler_info * ) second ) - > method_name ) ;
}
static json_t *
open_rpc_describe ( const char * method , json_t * result )
{
return json_pack ( " {sssoso} " , " name " , method , " params " , json_pack ( " [] " ) ,
" result " , json_pack ( " {ssso} " , " name " , method , " schema " , result ) ) ;
}
// This server rarely sees changes and we can afford to hardcode the schema
static json_t *
json_rpc_discover ( struct server_context * ctx , json_t * params )
{
( void ) ctx ;
( void ) params ;
json_t * info = json_pack ( " {ssss} " ,
" title " , PROGRAM_NAME , " version " , PROGRAM_VERSION ) ;
json_t * methods = json_pack ( " [oooo] " ,
open_rpc_describe ( " date " , json_pack ( " {ssso} " , " type " , " object " ,
" properties " , json_pack ( " {s{ss}s{ss}s{ss}s{ss}s{ss}s{ss}} " ,
" year " , " type " , " number " ,
" month " , " type " , " number " ,
" day " , " type " , " number " ,
" hours " , " type " , " number " ,
" minutes " , " type " , " number " ,
" seconds " , " type " , " number " ) ) ) ,
open_rpc_describe ( " ping " , json_pack ( " {ss} " , " type " , " string " ) ) ,
open_rpc_describe ( " rpc.discover " , json_pack ( " {ss} " , " $ref " ,
" https://github.com/open-rpc/meta-schema/raw/master/schema.json " ) ) ,
open_rpc_describe ( " wait " , json_pack ( " {ss} " , " type " , " null " ) ) ) ;
return json_rpc_response ( NULL , json_pack ( " {sssoso} " ,
" openrpc " , " 1.2.6 " , " info " , info , " methods " , methods ) , NULL ) ;
}
static json_t *
json_rpc_ping ( struct server_context * ctx , json_t * params )
{
@ -1458,6 +1493,16 @@ json_rpc_ping (struct server_context *ctx, json_t *params)
return json_rpc_response ( NULL , json_string ( " pong " ) , NULL ) ;
}
static json_t *
json_rpc_wait ( struct server_context * ctx , json_t * params )
{
( void ) ctx ;
( void ) params ;
sleep ( 1 ) ;
return json_rpc_response ( NULL , json_null ( ) , NULL ) ;
}
static json_t *
json_rpc_date ( struct server_context * ctx , json_t * params )
{
@ -1487,8 +1532,10 @@ process_json_rpc_request (struct server_context *ctx, json_t *request)
// Eventually it might be better to move this into a map in the context.
static struct json_rpc_handler_info handlers [ ] =
{
{ " date " , json_rpc_date } ,
{ " ping " , json_rpc_ping } ,
{ " date " , json_rpc_date } ,
{ " ping " , json_rpc_ping } ,
{ " rpc.discover " , json_rpc_discover } ,
{ " wait " , json_rpc_wait } ,
} ;
if ( ! json_is_object ( request ) )
@ -1545,7 +1592,6 @@ static void
process_json_rpc ( struct server_context * ctx ,
const void * data , size_t len , struct str * output )
{
json_error_t e ;
json_t * request ;
if ( ! ( request = json_loadb ( data , len , JSON_DECODE_ANY , & e ) ) )
@ -1620,15 +1666,37 @@ struct request_handler
LIST_HEADER ( struct request_handler )
/// Install ourselves as the handler for the request, if applicable.
/// If the request contains data, check it against CONTENT_LENGTH.
/// ("Transfer-Encoding: chunked" should be dechunked by the HTTP server,
/// however it is possible that it mishandles this situation.)
/// Sets @a continue_ to false if further processing should be stopped,
/// meaning the request has already been handled.
/// Note that starting the response before receiving all data denies you
/// the option of returning error status codes based on the data.
bool ( * try_handle ) ( struct request * request ,
struct str_map * headers , bool * continue_ ) ;
/// Handle incoming data. "len == 0" means EOF.
/// Returns false if there is no more processing to be done.
// FIXME: the EOF may or may not be delivered when request is cut short,
// we should fix FastCGI not to deliver it on CONTENT_LENGTH mismatch
/// EOF is never delivered on a network error (see client_read_loop()).
// XXX: the EOF may or may not be delivered when the request is cut short:
// - client_scgi delivers an EOF when it itself receives an EOF without
// considering any mismatch, and it can deliver another one earlier
// when the counter just goes down to 0... depends on what we return
// from here upon the first occasion (whether we want to close).
// - FCGI_ABORT_REQUEST /might/ not close the stdin and it /might/ cover
// a CONTENT_LENGTH mismatch, since this callback wouldn't get invoked.
// The FastCGI specification explicitly says to compare CONTENT_LENGTH
// against the number of received bytes, which may only be smaller.
//
// We might want to adjust client_scgi and client_fcgi to not invoke
// request_push(EOF) when CONTENT_LENGTH hasn't been reached and remove
// the extra EOF generation from client_scgi (why is it there, does the
// server keep the connection open, or is it just a precaution?)
//
// The finalization callback takes care of any needs to destruct data.
// If we handle this reliably in all clients, try_handle won't have to,
// as it will run in a stricter-than-CGI scenario.
bool ( * push_cb ) ( struct request * request , const void * data , size_t len ) ;
/// Destroy the handler's data stored in the request object
@ -1750,7 +1818,9 @@ request_handler_json_rpc_push
// TODO: check buf.len against CONTENT_LENGTH; if it's less, then the
// client hasn't been successful in transferring all of its data.
// See also comment on request_handler::push_cb.
// See also comment on request_handler::push_cb. For JSON-RPC, though,
// it shouldn't matter as an incomplete request will be invalid and
// clients have no reason to append unnecessary trailing bytes.
struct str response = str_make ( ) ;
str_append ( & response , " Status: 200 OK \n " ) ;
@ -1867,8 +1937,13 @@ request_handler_static_try_handle
char * path = xstrdup_printf ( " %s%s " , root , suffix ) ;
print_debug ( " trying to statically serve %s " , path ) ;
// TODO: check that this is a regular file
FILE * fp = fopen ( path , " rb " ) ;
struct stat st = { } ;
if ( fp & & ! fstat ( fileno ( fp ) , & st ) & & ! S_ISREG ( st . st_mode ) )
{
fclose ( fp ) ;
fp = NULL ;
}
if ( ! fp )
{
struct str response = str_make ( ) ;
@ -1913,8 +1988,8 @@ request_handler_static_try_handle
request_write ( request , buf , len ) ;
fclose ( fp ) ;
// TODO: this should rather not be returned all at once but in chunks ;
// file read requests never return EAGAIN
// TODO: this should rather not be returned all at once but in chunks
// (consider Transfer-Encoding); file read requests never return EAGAIN
// TODO: actual file data should really be returned by a callback when
// the socket is writable with nothing to be sent (pumping the entire
// file all at once won't really work if it's huge).
@ -2048,6 +2123,8 @@ static void
client_shutdown ( struct client * self )
{
self - > flushing = true ;
// In case this shutdown is immediately followed by a close, try our best
( void ) flush_queue ( & self - > write_queue , self - > socket_fd ) ;
ev_feed_event ( EV_DEFAULT_ & self - > write_watcher , EV_WRITE ) ;
}
@ -2359,14 +2436,15 @@ client_scgi_on_content (void *user_data, const void *data, size_t len)
print_debug ( " SCGI request got more data than CONTENT_LENGTH " ) ;
return false ;
}
// We're in a slight disagreement with the specification since
// We're in a slight disagreement with the SCGI specification since
// this tries to write output before it has read all the input
if ( ! request_push ( & self - > request , data , len ) )
return false ;
if ( ( self - > remaining_content - = len ) )
return true ;
// Signalise end of input to the request handler
return ( self - > remaining_content - = len ) ! = 0
| | request_push ( & self - > request , NULL , 0 ) ;
return request_push ( & self - > request , NULL , 0 ) ;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -2419,12 +2497,12 @@ client_scgi_create (EV_P_ int sock_fd)
return & self - > client ;
}
// --- WebSocket s client handler -----------------------------------------------
// --- WebSocket client handler - -----------------------------------------------
struct client_ws
{
struct client client ; ///< Parent class
struct ws_handler handler ; ///< WebSocket s connection handler
struct ws_handler handler ; ///< WebSocket connection handler
} ;
static bool
@ -2515,6 +2593,165 @@ client_ws_create (EV_P_ int sock_fd)
return & self - > client ;
}
// --- Co-process client -------------------------------------------------------
// This is mostly copied over from json-rpc-shell.c, only a bit simplified.
// We're giving up on header parsing in order to keep this small.
struct co_context
{
struct server_context * ctx ; ///< Server context
struct str message ; ///< Message data
struct http_parser parser ; ///< HTTP parser
bool pending_fake_starter ; ///< Start of message?
} ;
static int
client_co_on_message_begin ( http_parser * parser )
{
struct co_context * self = parser - > data ;
str_reset ( & self - > message ) ;
return 0 ;
}
static int
client_co_on_body ( http_parser * parser , const char * at , size_t len )
{
struct co_context * self = parser - > data ;
str_append_data ( & self - > message , at , len ) ;
return 0 ;
}
static int
client_co_on_message_complete ( http_parser * parser )
{
struct co_context * self = parser - > data ;
http_parser_pause ( & self - > parser , true ) ;
return 0 ;
}
// The LSP incorporates a very thin subset of RFC 822, and it so happens
// that we may simply reuse the full HTTP parser here, with a small hack.
static const http_parser_settings client_co_http_settings =
{
. on_message_begin = client_co_on_message_begin ,
. on_body = client_co_on_body ,
. on_message_complete = client_co_on_message_complete ,
} ;
static void
client_co_respond ( const struct str * buf )
{
struct str wrapped = str_make ( ) ;
str_append_printf ( & wrapped ,
" Content-Length: %zu \r \n "
" Content-Type: application/json; charset=utf-8 \r \n "
" \r \n " , buf - > len ) ;
str_append_data ( & wrapped , buf - > str , buf - > len ) ;
if ( write ( STDOUT_FILENO , wrapped . str , wrapped . len )
! = ( ssize_t ) wrapped . len )
exit_fatal ( " write: %s " , strerror ( errno ) ) ;
str_free ( & wrapped ) ;
}
static void
client_co_inject_starter ( struct co_context * self )
{
// The default "Connection: keep-alive" maps well here.
// We cannot feed this line into the parser from within callbacks.
static const char starter [ ] = " POST / HTTP/1.1 \r \n " ;
http_parser_pause ( & self - > parser , false ) ;
size_t n_parsed = http_parser_execute ( & self - > parser ,
& client_co_http_settings , starter , sizeof starter - 1 ) ;
enum http_errno err = HTTP_PARSER_ERRNO ( & self - > parser ) ;
if ( n_parsed ! = sizeof starter - 1 | | err ! = HPE_OK )
exit_fatal ( " protocol failure: %s " , http_errno_description ( err ) ) ;
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
static void
client_co_process ( struct co_context * self )
{
struct str * message = & self - > message ;
struct str response = str_make ( ) ;
process_json_rpc ( self - > ctx , message - > str , message - > len , & response ) ;
if ( response . len )
client_co_respond ( & response ) ;
str_free ( & response ) ;
}
static void
client_co_parse ( struct co_context * self , const char * data , size_t len ,
size_t * n_parsed )
{
if ( self - > pending_fake_starter )
{
self - > pending_fake_starter = false ;
client_co_inject_starter ( self ) ;
}
* n_parsed = http_parser_execute
( & self - > parser , & client_co_http_settings , data , len ) ;
if ( self - > parser . upgrade )
exit_fatal ( " protocol failure: %s " , " unsupported upgrade attempt " ) ;
enum http_errno err = HTTP_PARSER_ERRNO ( & self - > parser ) ;
if ( err = = HPE_PAUSED )
{
self - > pending_fake_starter = true ;
client_co_process ( self ) ;
}
else if ( err ! = HPE_OK )
exit_fatal ( " protocol failure: %s " , http_errno_description ( err ) ) ;
}
static void
client_co_on_data ( struct co_context * self , const char * data , size_t len )
{
size_t n_parsed = 0 ;
do
{
client_co_parse ( self , data , len , & n_parsed ) ;
data + = n_parsed ;
}
while ( ( len - = n_parsed ) ) ;
}
static void
client_co_run ( struct server_context * ctx )
{
struct co_context self = { } ;
self . ctx = ctx ;
self . message = str_make ( ) ;
http_parser_init ( & self . parser , HTTP_REQUEST ) ;
self . parser . data = & self ;
self . pending_fake_starter = true ;
hard_assert ( set_blocking ( STDIN_FILENO , false ) ) ;
struct str buf = str_make ( ) ;
struct pollfd pfd = { . fd = STDIN_FILENO , . events = POLLIN } ;
while ( true )
{
if ( poll ( & pfd , 1 , - 1 ) < = 0 )
exit_fatal ( " poll: %s " , strerror ( errno ) ) ;
str_remove_slice ( & buf , 0 , buf . len ) ;
enum socket_io_result result = socket_io_try_read ( pfd . fd , & buf ) ;
int errno_saved = errno ;
if ( buf . len )
client_co_on_data ( & self , buf . str , buf . len ) ;
if ( result = = SOCKET_IO_ERROR )
exit_fatal ( " read: %s " , strerror ( errno_saved ) ) ;
if ( result = = SOCKET_IO_EOF )
break ;
}
str_free ( & buf ) ;
str_free ( & self . message ) ;
}
// --- Basic server stuff ------------------------------------------------------
typedef struct client * ( * client_create_fn ) ( EV_P_ int sock_fd ) ;
@ -2878,11 +3115,12 @@ daemonize (struct server_context *ctx)
}
static void
parse_program_arguments ( int argc , char * * argv )
parse_program_arguments ( int argc , char * * argv , bool * running_as_slave )
{
static const struct opt opts [ ] =
{
{ ' t ' , " test " , NULL , 0 , " self-test " } ,
{ ' s ' , " slave " , NULL , 0 , " co-process mode " } ,
{ ' d ' , " debug " , NULL , 0 , " run in debug mode " } ,
{ ' h ' , " help " , NULL , 0 , " display this help and exit " } ,
{ ' V ' , " version " , NULL , 0 , " output version information and exit " } ,
@ -2902,6 +3140,9 @@ parse_program_arguments (int argc, char **argv)
case ' t ' :
test_main ( argc , argv ) ;
exit ( EXIT_SUCCESS ) ;
case ' s ' :
* running_as_slave = true ;
break ;
case ' d ' :
g_debug_mode = true ;
break ;
@ -2934,7 +3175,8 @@ parse_program_arguments (int argc, char **argv)
int
main ( int argc , char * argv [ ] )
{
parse_program_arguments ( argc , argv ) ;
bool running_as_a_slave = false ;
parse_program_arguments ( argc , argv , & running_as_a_slave ) ;
print_status ( PROGRAM_NAME " " PROGRAM_VERSION " starting " ) ;
@ -2949,6 +3191,15 @@ main (int argc, char *argv[])
exit ( EXIT_FAILURE ) ;
}
// There's a lot of unnecessary left-over scaffolding in this program,
// for testing purposes assume that everything is synchronous
if ( running_as_a_slave )
{
client_co_run ( & ctx ) ;
server_context_free ( & ctx ) ;
return EXIT_SUCCESS ;
}
struct ev_loop * loop ;
if ( ! ( loop = EV_DEFAULT ) )
exit_fatal ( " libev initialization failed " ) ;