Overall revision after a year
Use something closer to inheritance for clients
This commit is contained in:
parent
a95867dbee
commit
f273151447
|
@ -1,7 +1,7 @@
|
||||||
/*
|
/*
|
||||||
* demo-json-rpc-server.c: JSON-RPC 2.0 demo server
|
* demo-json-rpc-server.c: JSON-RPC 2.0 demo server
|
||||||
*
|
*
|
||||||
* Copyright (c) 2015, Přemysl Janouch <p.janouch@gmail.com>
|
* Copyright (c) 2015 - 2016, Přemysl Janouch <p.janouch@gmail.com>
|
||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
*
|
*
|
||||||
* Permission to use, copy, modify, and/or distribute this software for any
|
* Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
@ -756,7 +756,9 @@ static void
|
||||||
ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents)
|
ws_handler_on_close_timeout (EV_P_ ev_timer *watcher, int revents)
|
||||||
{
|
{
|
||||||
struct ws_handler *self = watcher->data;
|
struct ws_handler *self = watcher->data;
|
||||||
// TODO: call "close_cb"
|
// TODO: anything else to do here? Invalidate our state?
|
||||||
|
if (self->close_cb)
|
||||||
|
self->close_cb (self->user_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -1002,7 +1004,7 @@ ws_handler_finish_handshake (struct ws_handler *self)
|
||||||
if (!can_upgrade)
|
if (!can_upgrade)
|
||||||
FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
|
FAIL_HANDSHAKE (HTTP_400_BAD_REQUEST, NULL);
|
||||||
|
|
||||||
// Okay, we've finally got past basic HTTP/1.1 stuff
|
// Okay, we're finally past the basic HTTP/1.1 stuff
|
||||||
const char *key = str_map_find (&self->headers, SEC_WS_KEY);
|
const char *key = str_map_find (&self->headers, SEC_WS_KEY);
|
||||||
const char *version = str_map_find (&self->headers, SEC_WS_VERSION);
|
const char *version = str_map_find (&self->headers, SEC_WS_VERSION);
|
||||||
const char *protocol = str_map_find (&self->headers, SEC_WS_PROTOCOL);
|
const char *protocol = str_map_find (&self->headers, SEC_WS_PROTOCOL);
|
||||||
|
@ -1733,6 +1735,7 @@ static void
|
||||||
request_handler_static_destroy (struct request *request)
|
request_handler_static_destroy (struct request *request)
|
||||||
{
|
{
|
||||||
(void) request;
|
(void) request;
|
||||||
|
// Nothing to dispose of this far
|
||||||
}
|
}
|
||||||
|
|
||||||
struct request_handler g_request_handler_static =
|
struct request_handler g_request_handler_static =
|
||||||
|
@ -1748,6 +1751,7 @@ struct client
|
||||||
{
|
{
|
||||||
LIST_HEADER (struct client)
|
LIST_HEADER (struct client)
|
||||||
|
|
||||||
|
// XXX: do we really need this here?
|
||||||
struct server_context *ctx; ///< Server context
|
struct server_context *ctx; ///< Server context
|
||||||
|
|
||||||
int socket_fd; ///< The TCP socket
|
int socket_fd; ///< The TCP socket
|
||||||
|
@ -1756,32 +1760,22 @@ struct client
|
||||||
ev_io read_watcher; ///< The socket can be read from
|
ev_io read_watcher; ///< The socket can be read from
|
||||||
ev_io write_watcher; ///< The socket can be written to
|
ev_io write_watcher; ///< The socket can be written to
|
||||||
|
|
||||||
struct client_impl *impl; ///< Client behaviour
|
struct client_vtable *vtable; ///< Client behaviour
|
||||||
void *impl_data; ///< Client behaviour data
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct client_impl
|
struct client_vtable
|
||||||
{
|
{
|
||||||
/// Initialize the client as needed
|
|
||||||
void (*init) (struct client *client);
|
|
||||||
|
|
||||||
/// Attempt a graceful shutdown
|
/// Attempt a graceful shutdown
|
||||||
void (*shutdown) (struct client *client);
|
void (*shutdown) (struct client *client);
|
||||||
|
|
||||||
/// Do any additional cleanup
|
/// Do any additional cleanup
|
||||||
|
// TODO: rename to "finalize" or "cleanup"?
|
||||||
void (*destroy) (struct client *client);
|
void (*destroy) (struct client *client);
|
||||||
|
|
||||||
/// Process incoming data; "len == 0" means EOF
|
/// Process incoming data; "len == 0" means EOF
|
||||||
bool (*push) (struct client *client, const void *data, size_t len);
|
bool (*push) (struct client *client, const void *data, size_t len);
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
|
||||||
client_init (struct client *self)
|
|
||||||
{
|
|
||||||
memset (self, 0, sizeof *self);
|
|
||||||
write_queue_init (&self->write_queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_free (struct client *self)
|
client_free (struct client *self)
|
||||||
{
|
{
|
||||||
|
@ -1789,40 +1783,116 @@ client_free (struct client *self)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_write (struct client *client, const void *data, size_t len)
|
client_write (struct client *self, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
write_req_t *req = xcalloc (1, sizeof *req);
|
write_req_t *req = xcalloc (1, sizeof *req);
|
||||||
req->data.iov_base = memcpy (xmalloc (len), data, len);
|
req->data.iov_base = memcpy (xmalloc (len), data, len);
|
||||||
req->data.iov_len = len;
|
req->data.iov_len = len;
|
||||||
|
|
||||||
write_queue_add (&client->write_queue, req);
|
write_queue_add (&self->write_queue, req);
|
||||||
ev_io_start (EV_DEFAULT_ &client->write_watcher);
|
ev_io_start (EV_DEFAULT_ &self->write_watcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_remove (struct client *client)
|
client_destroy (struct client *self)
|
||||||
{
|
{
|
||||||
struct server_context *ctx = client->ctx;
|
struct server_context *ctx = self->ctx;
|
||||||
|
|
||||||
LIST_UNLINK (ctx->clients, client);
|
LIST_UNLINK (ctx->clients, self);
|
||||||
ctx->n_clients--;
|
ctx->n_clients--;
|
||||||
|
|
||||||
// First uninitialize the higher-level implementation
|
// First uninitialize the higher-level implementation
|
||||||
client->impl->destroy (client);
|
self->vtable->destroy (self);
|
||||||
|
|
||||||
ev_io_stop (EV_DEFAULT_ &client->read_watcher);
|
ev_io_stop (EV_DEFAULT_ &self->read_watcher);
|
||||||
ev_io_stop (EV_DEFAULT_ &client->write_watcher);
|
ev_io_stop (EV_DEFAULT_ &self->write_watcher);
|
||||||
xclose (client->socket_fd);
|
xclose (self->socket_fd);
|
||||||
client_free (client);
|
client_free (self);
|
||||||
free (client);
|
free (self);
|
||||||
|
|
||||||
try_finish_quit (ctx);
|
try_finish_quit (ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// - - FastCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
static bool
|
||||||
|
client_read_loop (EV_P_ struct client *client, ev_io *watcher)
|
||||||
|
{
|
||||||
|
char buf[8192];
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
|
||||||
|
if (n_read >= 0)
|
||||||
|
{
|
||||||
|
if (!client->vtable->push (client, buf, n_read))
|
||||||
|
return false;
|
||||||
|
if (!n_read)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (errno == EAGAIN)
|
||||||
|
return true;
|
||||||
|
else if (errno != EINTR)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't receive the EOF condition repeatedly
|
||||||
|
ev_io_stop (EV_A_ watcher);
|
||||||
|
|
||||||
|
// We can probably still write, so let's just return
|
||||||
|
// XXX: if there's nothing to be written, shouldn't we close the connection?
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
on_client_ready (EV_P_ ev_io *watcher, int revents)
|
||||||
|
{
|
||||||
|
struct client *client = watcher->data;
|
||||||
|
|
||||||
|
if (revents & EV_READ)
|
||||||
|
if (!client_read_loop (EV_A_ client, watcher))
|
||||||
|
goto close;
|
||||||
|
if (revents & EV_WRITE)
|
||||||
|
// TODO: add "closing link" functionality -> automatic shutdown
|
||||||
|
// (half-close) once we manage to flush the write buffer,
|
||||||
|
// which is logically followed by waiting for an EOF from the client
|
||||||
|
// TODO: some sort of "on_buffers_flushed" callback for streaming huge
|
||||||
|
// chunks of external (or generated) data.
|
||||||
|
if (!flush_queue (&client->write_queue, watcher))
|
||||||
|
goto close;
|
||||||
|
return;
|
||||||
|
|
||||||
|
close:
|
||||||
|
client_destroy (client);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
client_init (EV_P_ struct client *self, int sock_fd)
|
||||||
|
{
|
||||||
|
struct server_context *ctx = ev_userdata (loop);
|
||||||
|
|
||||||
|
memset (self, 0, sizeof *self);
|
||||||
|
write_queue_init (&self->write_queue);
|
||||||
|
|
||||||
|
set_blocking (sock_fd, false);
|
||||||
|
self->socket_fd = sock_fd;
|
||||||
|
|
||||||
|
ev_io_init (&self->read_watcher, on_client_ready, sock_fd, EV_READ);
|
||||||
|
ev_io_init (&self->write_watcher, on_client_ready, sock_fd, EV_WRITE);
|
||||||
|
self->read_watcher.data = self;
|
||||||
|
self->write_watcher.data = self;
|
||||||
|
|
||||||
|
// We're only interested in reading as the write queue is empty now
|
||||||
|
ev_io_start (EV_A_ &self->read_watcher);
|
||||||
|
|
||||||
|
LIST_PREPEND (ctx->clients, self);
|
||||||
|
ctx->n_clients++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- FastCGI client handler --------------------------------------------------
|
||||||
|
|
||||||
struct client_fcgi
|
struct client_fcgi
|
||||||
{
|
{
|
||||||
|
struct client client; ///< Parent class
|
||||||
struct fcgi_muxer muxer; ///< FastCGI de/multiplexer
|
struct fcgi_muxer muxer; ///< FastCGI de/multiplexer
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1832,33 +1902,39 @@ struct client_fcgi_request
|
||||||
struct request request; ///< Request
|
struct request request; ///< Request
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_fcgi_request_write (void *user_data, const void *data, size_t len)
|
client_fcgi_request_write_cb (void *user_data, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client_fcgi_request *request = user_data;
|
struct client_fcgi_request *request = user_data;
|
||||||
fcgi_request_write (request->fcgi_request, data, len);
|
fcgi_request_write (request->fcgi_request, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_fcgi_request_close (void *user_data)
|
client_fcgi_request_close_cb (void *user_data)
|
||||||
{
|
{
|
||||||
struct client_fcgi_request *request = user_data;
|
struct client_fcgi_request *request = user_data;
|
||||||
// TODO: fcgi_request_finish()? That will most probably end up with us
|
// No more data to send, terminate the substream/request
|
||||||
// receiving client_fcgi_request_destroy()
|
// XXX: this will most probably end up with client_fcgi_request_destroy(),
|
||||||
|
// we might or might not need to defer this action
|
||||||
|
fcgi_request_finish (request->fcgi_request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
static void *
|
static void *
|
||||||
client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request)
|
client_fcgi_request_start (void *user_data, struct fcgi_request *fcgi_request)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_fcgi *self = user_data;
|
||||||
|
|
||||||
// TODO: what if the request is aborted by ;
|
// TODO: what if the request is aborted by ;
|
||||||
struct client_fcgi_request *request = xmalloc (sizeof *request);
|
struct client_fcgi_request *request = xcalloc (1, sizeof *request);
|
||||||
request->fcgi_request = fcgi_request;
|
request->fcgi_request = fcgi_request;
|
||||||
request_init (&request->request);
|
request_init (&request->request);
|
||||||
request->request.ctx = client->ctx;
|
request->request.ctx = self->client.ctx;
|
||||||
request->request.write_cb = client_fcgi_request_write;
|
request->request.write_cb = client_fcgi_request_write_cb;
|
||||||
request->request.close_cb = client_fcgi_request_close;
|
request->request.close_cb = client_fcgi_request_close_cb;
|
||||||
request->request.user_data = request;
|
request->request.user_data = request;
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
@ -1875,42 +1951,33 @@ client_fcgi_request_destroy (void *handler_data)
|
||||||
{
|
{
|
||||||
struct client_fcgi_request *request = handler_data;
|
struct client_fcgi_request *request = handler_data;
|
||||||
request_free (&request->request);
|
request_free (&request->request);
|
||||||
free (handler_data);
|
free (request);
|
||||||
|
}
|
||||||
|
|
||||||
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
static void
|
||||||
|
client_fcgi_write_cb (void *user_data, const void *data, size_t len)
|
||||||
|
{
|
||||||
|
struct client_fcgi *self = user_data;
|
||||||
|
client_write (&self->client, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_fcgi_write (void *user_data, const void *data, size_t len)
|
client_fcgi_close_cb (void *user_data)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_fcgi *self = user_data;
|
||||||
client_write (client, data, len);
|
// FIXME: we should probably call something like client_shutdown(),
|
||||||
|
// which may have an argument whether we should really use close()
|
||||||
|
client_destroy (&self->client);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
client_fcgi_close (void *user_data)
|
|
||||||
{
|
|
||||||
struct client *client = user_data;
|
|
||||||
client_remove (client);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
client_fcgi_init (struct client *client)
|
|
||||||
{
|
|
||||||
struct client_fcgi *self = xcalloc (1, sizeof *self);
|
|
||||||
client->impl_data = self;
|
|
||||||
|
|
||||||
fcgi_muxer_init (&self->muxer);
|
|
||||||
self->muxer.write_cb = client_fcgi_write;
|
|
||||||
self->muxer.close_cb = client_fcgi_close;
|
|
||||||
self->muxer.request_start_cb = client_fcgi_request_start;
|
|
||||||
self->muxer.request_push_cb = client_fcgi_request_push;
|
|
||||||
self->muxer.request_destroy_cb = client_fcgi_request_destroy;
|
|
||||||
self->muxer.user_data = client;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_fcgi_shutdown (struct client *client)
|
client_fcgi_shutdown (struct client *client)
|
||||||
{
|
{
|
||||||
struct client_fcgi *self = client->impl_data;
|
struct client_fcgi *self = (struct client_fcgi *) client;
|
||||||
|
|
||||||
// TODO: respond with FCGI_END_REQUEST: FCGI_REQUEST_COMPLETE to everything,
|
// TODO: respond with FCGI_END_REQUEST: FCGI_REQUEST_COMPLETE to everything,
|
||||||
// and start sending out FCGI_OVERLOADED to all incoming requests. The
|
// and start sending out FCGI_OVERLOADED to all incoming requests. The
|
||||||
|
@ -1920,65 +1987,79 @@ client_fcgi_shutdown (struct client *client)
|
||||||
static void
|
static void
|
||||||
client_fcgi_destroy (struct client *client)
|
client_fcgi_destroy (struct client *client)
|
||||||
{
|
{
|
||||||
struct client_fcgi *self = client->impl_data;
|
struct client_fcgi *self = (struct client_fcgi *) client;
|
||||||
client->impl_data = NULL;
|
|
||||||
|
|
||||||
fcgi_muxer_free (&self->muxer);
|
fcgi_muxer_free (&self->muxer);
|
||||||
free (self);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_fcgi_push (struct client *client, const void *data, size_t len)
|
client_fcgi_push (struct client *client, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client_fcgi *self = client->impl_data;
|
struct client_fcgi *self = (struct client_fcgi *) client;
|
||||||
fcgi_muxer_push (&self->muxer, data, len);
|
fcgi_muxer_push (&self->muxer, data, len);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct client_impl g_client_fcgi =
|
static struct client_vtable client_fcgi_vtable =
|
||||||
{
|
{
|
||||||
.init = client_fcgi_init,
|
|
||||||
.shutdown = client_fcgi_shutdown,
|
.shutdown = client_fcgi_shutdown,
|
||||||
.destroy = client_fcgi_destroy,
|
.destroy = client_fcgi_destroy,
|
||||||
.push = client_fcgi_push,
|
.push = client_fcgi_push,
|
||||||
};
|
};
|
||||||
|
|
||||||
// - - SCGI - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
static struct client *
|
||||||
|
client_fcgi_create (EV_P_ int sock_fd)
|
||||||
|
{
|
||||||
|
struct client_fcgi *self = xcalloc (1, sizeof *self);
|
||||||
|
client_init (EV_A_ &self->client, sock_fd);
|
||||||
|
self->client.vtable = &client_fcgi_vtable;
|
||||||
|
|
||||||
|
fcgi_muxer_init (&self->muxer);
|
||||||
|
self->muxer.write_cb = client_fcgi_write_cb;
|
||||||
|
self->muxer.close_cb = client_fcgi_close_cb;
|
||||||
|
self->muxer.request_start_cb = client_fcgi_request_start;
|
||||||
|
self->muxer.request_push_cb = client_fcgi_request_push;
|
||||||
|
self->muxer.request_destroy_cb = client_fcgi_request_destroy;
|
||||||
|
self->muxer.user_data = self;
|
||||||
|
return &self->client;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- SCGI client handler -----------------------------------------------------
|
||||||
|
|
||||||
struct client_scgi
|
struct client_scgi
|
||||||
{
|
{
|
||||||
|
struct client client; ///< Parent class
|
||||||
struct scgi_parser parser; ///< SCGI stream parser
|
struct scgi_parser parser; ///< SCGI stream parser
|
||||||
struct request request; ///< Request (only one per connection)
|
struct request request; ///< Request (only one per connection)
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_scgi_write (void *user_data, const void *data, size_t len)
|
client_scgi_write_cb (void *user_data, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_scgi *self = user_data;
|
||||||
client_write (client, data, len);
|
client_write (&self->client, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_scgi_close (void *user_data)
|
client_scgi_close_cb (void *user_data)
|
||||||
{
|
{
|
||||||
// NOTE: this rather really means "close me [the request]"
|
// NOTE: this rather really means "close me [the request]"
|
||||||
struct client *client = user_data;
|
struct client_scgi *self = user_data;
|
||||||
client_remove (client);
|
// FIXME: we should probably call something like client_shutdown(),
|
||||||
|
// which may have an argument whether we should really use close()
|
||||||
|
client_destroy (&self->client);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_scgi_on_headers_read (void *user_data)
|
client_scgi_on_headers_read (void *user_data)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_scgi *self = user_data;
|
||||||
struct client_scgi *self = client->impl_data;
|
|
||||||
return request_start (&self->request, &self->parser.headers);
|
return request_start (&self->request, &self->parser.headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_scgi_on_content (void *user_data, const void *data, size_t len)
|
client_scgi_on_content (void *user_data, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_scgi *self = user_data;
|
||||||
struct client_scgi *self = client->impl_data;
|
|
||||||
|
|
||||||
// XXX: do we have to count CONTENT_LENGTH and supply our own EOF?
|
// XXX: do we have to count CONTENT_LENGTH and supply our own EOF?
|
||||||
// If we do produce our own EOF, we should probably make sure we don't
|
// If we do produce our own EOF, we should probably make sure we don't
|
||||||
|
@ -1986,39 +2067,20 @@ client_scgi_on_content (void *user_data, const void *data, size_t len)
|
||||||
return request_push (&self->request, data, len);
|
return request_push (&self->request, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
client_scgi_init (struct client *client)
|
|
||||||
{
|
|
||||||
struct client_scgi *self = xcalloc (1, sizeof *self);
|
|
||||||
client->impl_data = self;
|
|
||||||
|
|
||||||
request_init (&self->request);
|
|
||||||
self->request.ctx = client->ctx;
|
|
||||||
self->request.write_cb = client_scgi_write;
|
|
||||||
self->request.close_cb = client_scgi_close;
|
|
||||||
self->request.user_data = client;
|
|
||||||
|
|
||||||
scgi_parser_init (&self->parser);
|
|
||||||
self->parser.on_headers_read = client_scgi_on_headers_read;
|
|
||||||
self->parser.on_content = client_scgi_on_content;
|
|
||||||
self->parser.user_data = client;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_scgi_destroy (struct client *client)
|
client_scgi_destroy (struct client *client)
|
||||||
{
|
{
|
||||||
struct client_scgi *self = client->impl_data;
|
struct client_scgi *self = (struct client_scgi *) client;
|
||||||
client->impl_data = NULL;
|
|
||||||
|
|
||||||
request_free (&self->request);
|
request_free (&self->request);
|
||||||
scgi_parser_free (&self->parser);
|
scgi_parser_free (&self->parser);
|
||||||
free (self);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_scgi_push (struct client *client, const void *data, size_t len)
|
client_scgi_push (struct client *client, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client_scgi *self = client->impl_data;
|
struct client_scgi *self = (struct client_scgi *) client;
|
||||||
struct error *e = NULL;
|
struct error *e = NULL;
|
||||||
if (scgi_parser_push (&self->parser, data, len, &e))
|
if (scgi_parser_push (&self->parser, data, len, &e))
|
||||||
return true;
|
return true;
|
||||||
|
@ -2031,17 +2093,37 @@ client_scgi_push (struct client *client, const void *data, size_t len)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct client_impl g_client_scgi =
|
static struct client_vtable client_scgi_vtable =
|
||||||
{
|
{
|
||||||
.init = client_scgi_init,
|
|
||||||
.destroy = client_scgi_destroy,
|
.destroy = client_scgi_destroy,
|
||||||
.push = client_scgi_push,
|
.push = client_scgi_push,
|
||||||
};
|
};
|
||||||
|
|
||||||
// - - WebSockets - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
static struct client *
|
||||||
|
client_scgi_create (EV_P_ int sock_fd)
|
||||||
|
{
|
||||||
|
struct client_scgi *self = xcalloc (1, sizeof *self);
|
||||||
|
client_init (EV_A_ &self->client, sock_fd);
|
||||||
|
self->client.vtable = &client_scgi_vtable;
|
||||||
|
|
||||||
|
request_init (&self->request);
|
||||||
|
self->request.ctx = self->client.ctx;
|
||||||
|
self->request.write_cb = client_scgi_write_cb;
|
||||||
|
self->request.close_cb = client_scgi_close_cb;
|
||||||
|
self->request.user_data = self;
|
||||||
|
|
||||||
|
scgi_parser_init (&self->parser);
|
||||||
|
self->parser.on_headers_read = client_scgi_on_headers_read;
|
||||||
|
self->parser.on_content = client_scgi_on_content;
|
||||||
|
self->parser.user_data = self;
|
||||||
|
return &self->client;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- WebSockets client handler -----------------------------------------------
|
||||||
|
|
||||||
struct client_ws
|
struct client_ws
|
||||||
{
|
{
|
||||||
|
struct client client; ///< Parent class
|
||||||
struct ws_handler handler; ///< WebSockets connection handler
|
struct ws_handler handler; ///< WebSockets connection handler
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -2049,9 +2131,7 @@ static bool
|
||||||
client_ws_on_message (void *user_data,
|
client_ws_on_message (void *user_data,
|
||||||
enum ws_opcode type, const void *data, size_t len)
|
enum ws_opcode type, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_ws *self = user_data;
|
||||||
struct client_ws *self = client->impl_data;
|
|
||||||
|
|
||||||
if (type != WS_OPCODE_TEXT)
|
if (type != WS_OPCODE_TEXT)
|
||||||
{
|
{
|
||||||
ws_handler_fail (&self->handler, WS_STATUS_UNSUPPORTED_DATA);
|
ws_handler_fail (&self->handler, WS_STATUS_UNSUPPORTED_DATA);
|
||||||
|
@ -2060,7 +2140,7 @@ client_ws_on_message (void *user_data,
|
||||||
|
|
||||||
struct str response;
|
struct str response;
|
||||||
str_init (&response);
|
str_init (&response);
|
||||||
process_json_rpc (client->ctx, data, len, &response);
|
process_json_rpc (self->client.ctx, data, len, &response);
|
||||||
ws_handler_send (&self->handler,
|
ws_handler_send (&self->handler,
|
||||||
WS_OPCODE_TEXT, response.str, response.len);
|
WS_OPCODE_TEXT, response.str, response.len);
|
||||||
str_free (&response);
|
str_free (&response);
|
||||||
|
@ -2068,74 +2148,78 @@ client_ws_on_message (void *user_data,
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_ws_write (void *user_data, const void *data, size_t len)
|
client_ws_write_cb (void *user_data, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client *client = user_data;
|
||||||
client_write (client, data, len);
|
client_write (client, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_ws_close (void *user_data)
|
client_ws_close_cb (void *user_data)
|
||||||
{
|
{
|
||||||
struct client *client = user_data;
|
struct client_ws *self = user_data;
|
||||||
client_remove (client);
|
// FIXME: we should probably call something like client_shutdown(),
|
||||||
|
// which may have an argument whether we should really use close()
|
||||||
|
client_destroy (&self->client);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||||
client_ws_init (struct client *client)
|
|
||||||
{
|
|
||||||
struct client_ws *self = xmalloc (sizeof *self);
|
|
||||||
client->impl_data = self;
|
|
||||||
|
|
||||||
ws_handler_init (&self->handler);
|
|
||||||
self->handler.on_message = client_ws_on_message;
|
|
||||||
self->handler.write_cb = client_ws_write;
|
|
||||||
self->handler.close_cb = client_ws_close;
|
|
||||||
self->handler.user_data = client;
|
|
||||||
|
|
||||||
// One mebibyte seems to be a reasonable value
|
|
||||||
self->handler.max_payload_len = 1 << 10;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_ws_shutdown (struct client *client)
|
client_ws_shutdown (struct client *client)
|
||||||
{
|
{
|
||||||
struct client_ws *self = client->impl_data;
|
struct client_ws *self = (struct client_ws *) client;
|
||||||
ws_handler_close (&self->handler, WS_STATUS_GOING_AWAY, NULL, 0);
|
ws_handler_close (&self->handler, WS_STATUS_GOING_AWAY, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_ws_destroy (struct client *client)
|
client_ws_destroy (struct client *client)
|
||||||
{
|
{
|
||||||
struct client_ws *self = client->impl_data;
|
struct client_ws *self = (struct client_ws *) client;
|
||||||
client->impl_data = NULL;
|
|
||||||
|
|
||||||
ws_handler_free (&self->handler);
|
ws_handler_free (&self->handler);
|
||||||
free (self);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
client_ws_push (struct client *client, const void *data, size_t len)
|
client_ws_push (struct client *client, const void *data, size_t len)
|
||||||
{
|
{
|
||||||
struct client_ws *self = client->impl_data;
|
struct client_ws *self = (struct client_ws *) client;
|
||||||
return ws_handler_push (&self->handler, data, len);
|
return ws_handler_push (&self->handler, data, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct client_impl g_client_ws =
|
static struct client_vtable client_ws_vtable =
|
||||||
{
|
{
|
||||||
.init = client_ws_init,
|
|
||||||
.shutdown = client_ws_shutdown,
|
.shutdown = client_ws_shutdown,
|
||||||
.destroy = client_ws_destroy,
|
.destroy = client_ws_destroy,
|
||||||
.push = client_ws_push,
|
.push = client_ws_push,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static struct client *
|
||||||
|
client_ws_create (EV_P_ int sock_fd)
|
||||||
|
{
|
||||||
|
struct client_ws *self = xcalloc (1, sizeof *self);
|
||||||
|
client_init (EV_A_ &self->client, sock_fd);
|
||||||
|
self->client.vtable = &client_ws_vtable;
|
||||||
|
|
||||||
|
ws_handler_init (&self->handler);
|
||||||
|
self->handler.on_message = client_ws_on_message;
|
||||||
|
self->handler.write_cb = client_ws_write_cb;
|
||||||
|
self->handler.close_cb = client_ws_close_cb;
|
||||||
|
self->handler.user_data = self;
|
||||||
|
|
||||||
|
// One mebibyte seems to be a reasonable value
|
||||||
|
self->handler.max_payload_len = 1 << 10;
|
||||||
|
return &self->client;
|
||||||
|
}
|
||||||
|
|
||||||
// --- Basic server stuff ------------------------------------------------------
|
// --- Basic server stuff ------------------------------------------------------
|
||||||
|
|
||||||
|
typedef struct client *(*client_create_fn) (EV_P_ int sock_fd);
|
||||||
|
|
||||||
struct listener
|
struct listener
|
||||||
{
|
{
|
||||||
int fd; ///< Listening socket FD
|
int fd; ///< Listening socket FD
|
||||||
ev_io watcher; ///< New connection available
|
ev_io watcher; ///< New connection available
|
||||||
struct client_impl *impl; ///< Client behaviour
|
client_create_fn create; ///< Client constructor
|
||||||
};
|
};
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -2171,7 +2255,7 @@ on_quit_timeout (EV_P_ ev_timer *watcher, int revents)
|
||||||
(void) revents;
|
(void) revents;
|
||||||
|
|
||||||
LIST_FOR_EACH (struct client, iter, self->clients)
|
LIST_FOR_EACH (struct client, iter, self->clients)
|
||||||
client_remove (iter);
|
client_destroy (iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -2179,91 +2263,13 @@ initiate_quit (struct server_context *self)
|
||||||
{
|
{
|
||||||
close_listeners (self);
|
close_listeners (self);
|
||||||
LIST_FOR_EACH (struct client, iter, self->clients)
|
LIST_FOR_EACH (struct client, iter, self->clients)
|
||||||
if (iter->impl->shutdown)
|
if (iter->vtable->shutdown)
|
||||||
iter->impl->shutdown (iter->impl_data);
|
iter->vtable->shutdown (iter);
|
||||||
|
|
||||||
ev_timer_set (&self->quit_timeout_watcher, 3., 0.);
|
ev_timer_set (&self->quit_timeout_watcher, 3., 0.);
|
||||||
self->quitting = true;
|
self->quitting = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
|
||||||
client_read_loop (EV_P_ struct client *client, ev_io *watcher)
|
|
||||||
{
|
|
||||||
char buf[8192];
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
ssize_t n_read = recv (watcher->fd, buf, sizeof buf, 0);
|
|
||||||
if (n_read >= 0)
|
|
||||||
{
|
|
||||||
if (!client->impl->push (client, buf, n_read))
|
|
||||||
return false;
|
|
||||||
if (!n_read)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (errno == EAGAIN)
|
|
||||||
return true;
|
|
||||||
else if (errno != EINTR)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Don't receive the EOF condition repeatedly
|
|
||||||
ev_io_stop (EV_A_ watcher);
|
|
||||||
|
|
||||||
// We can probably still write, so let's just return
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
on_client_ready (EV_P_ ev_io *watcher, int revents)
|
|
||||||
{
|
|
||||||
struct client *client = watcher->data;
|
|
||||||
|
|
||||||
if (revents & EV_READ)
|
|
||||||
if (!client_read_loop (EV_A_ client, watcher))
|
|
||||||
goto close;
|
|
||||||
if (revents & EV_WRITE)
|
|
||||||
// TODO: shouldn't we at least provide an option (to be used by a client
|
|
||||||
// implementation if it so desires) to close the connection once we've
|
|
||||||
// finished flushing the write queue? This should probably even be
|
|
||||||
// the default behaviour, as it's fairly uncommon for clients to
|
|
||||||
// shutdown the socket for writes while leaving it open for reading.
|
|
||||||
// Actually, we should wait until the client closes the connection.
|
|
||||||
// TODO: some sort of "on_buffers_flushed" callback for streaming huge
|
|
||||||
// chunks of external (or generated) data.
|
|
||||||
if (!flush_queue (&client->write_queue, watcher))
|
|
||||||
goto close;
|
|
||||||
return;
|
|
||||||
|
|
||||||
close:
|
|
||||||
client_remove (client);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
make_client (EV_P_ struct client_impl *impl, int sock_fd)
|
|
||||||
{
|
|
||||||
struct server_context *ctx = ev_userdata (loop);
|
|
||||||
set_blocking (sock_fd, false);
|
|
||||||
|
|
||||||
struct client *client = xmalloc (sizeof *client);
|
|
||||||
client_init (client);
|
|
||||||
client->socket_fd = sock_fd;
|
|
||||||
client->impl = impl;
|
|
||||||
|
|
||||||
ev_io_init (&client->read_watcher, on_client_ready, sock_fd, EV_READ);
|
|
||||||
ev_io_init (&client->write_watcher, on_client_ready, sock_fd, EV_WRITE);
|
|
||||||
client->read_watcher.data = client;
|
|
||||||
client->write_watcher.data = client;
|
|
||||||
|
|
||||||
// We're only interested in reading as the write queue is empty now
|
|
||||||
ev_io_start (EV_A_ &client->read_watcher);
|
|
||||||
|
|
||||||
// Initialize the higher-level implementation
|
|
||||||
client->impl->init (client);
|
|
||||||
|
|
||||||
LIST_PREPEND (ctx->clients, client);
|
|
||||||
ctx->n_clients++;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
on_client_available (EV_P_ ev_io *watcher, int revents)
|
on_client_available (EV_P_ ev_io *watcher, int revents)
|
||||||
{
|
{
|
||||||
|
@ -2275,7 +2281,7 @@ on_client_available (EV_P_ ev_io *watcher, int revents)
|
||||||
{
|
{
|
||||||
int sock_fd = accept (watcher->fd, NULL, NULL);
|
int sock_fd = accept (watcher->fd, NULL, NULL);
|
||||||
if (sock_fd != -1)
|
if (sock_fd != -1)
|
||||||
make_client (EV_A_ listener->impl, sock_fd);
|
listener->create (EV_A_ sock_fd);
|
||||||
else if (errno == EAGAIN)
|
else if (errno == EAGAIN)
|
||||||
return;
|
return;
|
||||||
else if (errno != EINTR && errno != ECONNABORTED)
|
else if (errno != EINTR && errno != ECONNABORTED)
|
||||||
|
@ -2345,7 +2351,7 @@ listener_bind (struct addrinfo *gai_iter)
|
||||||
|
|
||||||
static void
|
static void
|
||||||
listener_add (struct server_context *ctx, const char *host, const char *port,
|
listener_add (struct server_context *ctx, const char *host, const char *port,
|
||||||
const struct addrinfo *gai_hints, struct client_impl *impl)
|
const struct addrinfo *gai_hints, client_create_fn create)
|
||||||
{
|
{
|
||||||
struct addrinfo *gai_result, *gai_iter;
|
struct addrinfo *gai_result, *gai_iter;
|
||||||
int err = getaddrinfo (host, port, gai_hints, &gai_result);
|
int err = getaddrinfo (host, port, gai_hints, &gai_result);
|
||||||
|
@ -2369,7 +2375,7 @@ listener_add (struct server_context *ctx, const char *host, const char *port,
|
||||||
ev_io_init (&listener->watcher, on_client_available, fd, EV_READ);
|
ev_io_init (&listener->watcher, on_client_available, fd, EV_READ);
|
||||||
ev_io_start (EV_DEFAULT_ &listener->watcher);
|
ev_io_start (EV_DEFAULT_ &listener->watcher);
|
||||||
listener->watcher.data = listener;
|
listener->watcher.data = listener;
|
||||||
listener->impl = impl;
|
listener->create = create;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
freeaddrinfo (gai_result);
|
freeaddrinfo (gai_result);
|
||||||
|
@ -2407,13 +2413,13 @@ setup_listen_fds (struct server_context *ctx, struct error **e)
|
||||||
|
|
||||||
for (size_t i = 0; i < ports_fcgi.len; i++)
|
for (size_t i = 0; i < ports_fcgi.len; i++)
|
||||||
listener_add (ctx, bind_host, ports_fcgi.vector[i],
|
listener_add (ctx, bind_host, ports_fcgi.vector[i],
|
||||||
&gai_hints, &g_client_fcgi);
|
&gai_hints, client_fcgi_create);
|
||||||
for (size_t i = 0; i < ports_scgi.len; i++)
|
for (size_t i = 0; i < ports_scgi.len; i++)
|
||||||
listener_add (ctx, bind_host, ports_scgi.vector[i],
|
listener_add (ctx, bind_host, ports_scgi.vector[i],
|
||||||
&gai_hints, &g_client_scgi);
|
&gai_hints, client_scgi_create);
|
||||||
for (size_t i = 0; i < ports_ws.len; i++)
|
for (size_t i = 0; i < ports_ws.len; i++)
|
||||||
listener_add (ctx, bind_host, ports_ws.vector[i],
|
listener_add (ctx, bind_host, ports_ws.vector[i],
|
||||||
&gai_hints, &g_client_ws);
|
&gai_hints, client_ws_create);
|
||||||
|
|
||||||
str_vector_free (&ports_fcgi);
|
str_vector_free (&ports_fcgi);
|
||||||
str_vector_free (&ports_scgi);
|
str_vector_free (&ports_scgi);
|
||||||
|
@ -2526,7 +2532,19 @@ on_termination_signal (EV_P_ ev_signal *handle, int revents)
|
||||||
(void) handle;
|
(void) handle;
|
||||||
(void) revents;
|
(void) revents;
|
||||||
|
|
||||||
// TODO: initiate_quit (ctx);
|
initiate_quit (ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
setup_signal_handlers (struct server_context *ctx)
|
||||||
|
{
|
||||||
|
ev_signal_init (&ctx->sigterm_watcher, on_termination_signal, SIGTERM);
|
||||||
|
ev_signal_start (EV_DEFAULT_ &ctx->sigterm_watcher);
|
||||||
|
|
||||||
|
ev_signal_init (&ctx->sigint_watcher, on_termination_signal, SIGINT);
|
||||||
|
ev_signal_start (EV_DEFAULT_ &ctx->sigint_watcher);
|
||||||
|
|
||||||
|
(void) signal (SIGPIPE, SIG_IGN);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -2668,14 +2686,7 @@ main (int argc, char *argv[])
|
||||||
exit_fatal ("libev initialization failed");
|
exit_fatal ("libev initialization failed");
|
||||||
|
|
||||||
ev_set_userdata (loop, &ctx);
|
ev_set_userdata (loop, &ctx);
|
||||||
|
setup_signal_handlers (&ctx);
|
||||||
ev_signal_init (&ctx.sigterm_watcher, on_termination_signal, SIGTERM);
|
|
||||||
ev_signal_start (EV_DEFAULT_ &ctx.sigterm_watcher);
|
|
||||||
|
|
||||||
ev_signal_init (&ctx.sigint_watcher, on_termination_signal, SIGINT);
|
|
||||||
ev_signal_start (EV_DEFAULT_ &ctx.sigint_watcher);
|
|
||||||
|
|
||||||
(void) signal (SIGPIPE, SIG_IGN);
|
|
||||||
|
|
||||||
LIST_PREPEND (ctx.handlers, &g_request_handler_static);
|
LIST_PREPEND (ctx.handlers, &g_request_handler_static);
|
||||||
LIST_PREPEND (ctx.handlers, &g_request_handler_json_rpc);
|
LIST_PREPEND (ctx.handlers, &g_request_handler_json_rpc);
|
||||||
|
|
Loading…
Reference in New Issue