Woo we can draw over the network now

This commit is contained in:
Přemysl Eric Janouch 2014-10-27 13:54:53 +01:00
parent 29bec0c0e0
commit 1f2e487302
2 changed files with 606 additions and 35 deletions

View File

@ -39,6 +39,17 @@
#define BITMAP_BLOCK_SIZE 50 ///< Step for extending bitmap size #define BITMAP_BLOCK_SIZE 50 ///< Step for extending bitmap size
#define PROTOCOL_VERSION 1 ///< Network protocol version
enum
{
MESSAGE_HELLO, ///< Server/client hello
MESSAGE_GET_BITMAP, ///< Request bitmap data
MESSAGE_PUT_POINT, ///< Request to place a point
MESSAGE_COUNT ///< Total number of messages
};
typedef enum network_mode network_mode_t; typedef enum network_mode network_mode_t;
enum network_mode enum network_mode
{ {
@ -47,12 +58,20 @@ enum network_mode
NETWORK_MODE_CLIENT ///< We're a client NETWORK_MODE_CLIENT ///< We're a client
}; };
typedef struct write_req write_req_t;
struct write_req
{
uv_write_t req; ///< libuv write request
uv_buf_t buf; ///< The data to be written
};
typedef struct client client_t; typedef struct client client_t;
struct client struct client
{ {
LIST_HEADER (client_t) LIST_HEADER (client_t)
uv_tcp_t handle; ///< TCP connection handle uv_tcp_t handle; ///< TCP connection handle
struct msg_reader msg_reader; ///< Client message reader
}; };
typedef struct app_context app_context_t; typedef struct app_context app_context_t;
@ -70,6 +89,7 @@ struct app_context
// Client: // Client:
uv_tcp_t server_fd; ///< Connection to the server uv_tcp_t server_fd; ///< Connection to the server
struct msg_reader msg_reader; ///< Server message reader
// Server: // Server:
uv_tcp_t listen_fd; ///< Listening FD uv_tcp_t listen_fd; ///< Listening FD
@ -98,20 +118,14 @@ struct app_context
uint8_t current_color_right; ///< Right mouse button color uint8_t current_color_right; ///< Right mouse button color
}; };
static void remove_client (app_context_t *app, client_t *client);
static void on_server_disconnected (app_context_t *app);
static void static void
app_init (app_context_t *self) app_init (app_context_t *self)
{ {
memset (self, 0, sizeof *self); memset (self, 0, sizeof *self);
} msg_reader_init (&self->msg_reader);
static void
remove_client (app_context_t *app, client_t *client)
{
// TODO: cancel any write requests?
// XXX: should we unref it?
uv_close ((uv_handle_t *) &client->handle, NULL);
LIST_UNLINK (app->clients, client);
free (client);
} }
static void static void
@ -120,11 +134,150 @@ app_free (app_context_t *self)
if (self->tk) if (self->tk)
termo_destroy (self->tk); termo_destroy (self->tk);
while (self->clients) while (self->clients)
// XXX: we probably shouldn't do this from here
remove_client (self, self->clients); remove_client (self, self->clients);
free (self->bitmap); free (self->bitmap);
msg_reader_free (&self->msg_reader);
} }
// --- Server-client messaging -------------------------------------------------
static write_req_t *
flush_writer (struct msg_writer *writer)
{
size_t len;
void *data = msg_writer_flush (writer, &len);
write_req_t *req = xcalloc (1, sizeof *req);
req->buf = uv_buf_init (data, len);
return req;
}
static void
on_data_written_to_client (uv_write_t *req, int status)
{
write_req_t *wr = (write_req_t *) req;
app_context_t *app = req->handle->loop->data;
client_t *client = req->data;
if (status)
// Write failed
remove_client (app, client);
free (wr->buf.base);
free (wr);
}
static void
flush_writer_to_client (struct msg_writer *writer, client_t *client)
{
write_req_t *wr = flush_writer (writer);
wr->req.data = client;
(void) uv_write (&wr->req, (uv_stream_t *) &client->handle,
&wr->buf, 1, on_data_written_to_client);
// XXX: should we put the request on a list so that we can get rid of it?
}
static void
on_data_written_to_server (uv_write_t *req, int status)
{
write_req_t *wr = (write_req_t *) req;
app_context_t *app = req->handle->loop->data;
if (status)
// Write failed
on_server_disconnected (app);
free (wr->buf.base);
free (wr);
}
static void
flush_writer_to_server (struct msg_writer *writer, app_context_t *app)
{
write_req_t *wr = flush_writer (writer);
(void) uv_write (&wr->req, (uv_stream_t *) &app->server_fd,
&wr->buf, 1, on_data_written_to_server);
// XXX: should we put the request on a list so that we can get rid of it?
}
static void
send_draw_point_response (client_t *client, int x, int y, uint8_t color)
{
struct msg_writer writer;
msg_writer_init (&writer);
msg_writer_u8 (&writer, MESSAGE_PUT_POINT);
msg_writer_i32 (&writer, x);
msg_writer_i32 (&writer, y);
msg_writer_u8 (&writer, color);
flush_writer_to_client (&writer, client);
}
static void
send_draw_point_request (app_context_t *app, int x, int y, uint8_t color)
{
struct msg_writer writer;
msg_writer_init (&writer);
msg_writer_u8 (&writer, MESSAGE_PUT_POINT);
msg_writer_i32 (&writer, x);
msg_writer_i32 (&writer, y);
msg_writer_u8 (&writer, color);
flush_writer_to_server (&writer, app);
}
static void
send_hello_request (app_context_t *app)
{
struct msg_writer writer;
msg_writer_init (&writer);
msg_writer_u8 (&writer, MESSAGE_HELLO);
msg_writer_u8 (&writer, PROTOCOL_VERSION);
flush_writer_to_server (&writer, app);
}
static void
send_hello_response (client_t *client)
{
struct msg_writer writer;
msg_writer_init (&writer);
msg_writer_u8 (&writer, MESSAGE_HELLO);
msg_writer_u8 (&writer, PROTOCOL_VERSION);
flush_writer_to_client (&writer, client);
}
static void
send_get_bitmap_request (app_context_t *app)
{
struct msg_writer writer;
msg_writer_init (&writer);
msg_writer_u8 (&writer, MESSAGE_GET_BITMAP);
flush_writer_to_server (&writer, app);
}
static void
send_get_bitmap_response (client_t *client, app_context_t *app)
{
struct msg_writer writer;
msg_writer_init (&writer);
msg_writer_u8 (&writer, MESSAGE_GET_BITMAP);
msg_writer_i32 (&writer, app->bitmap_x);
msg_writer_i32 (&writer, app->bitmap_y);
msg_writer_u64 (&writer, app->bitmap_w);
msg_writer_u64 (&writer, app->bitmap_h);
msg_writer_blob (&writer, app->bitmap,
app->bitmap_w * app->bitmap_h * sizeof *app->bitmap);
flush_writer_to_client (&writer, client);
}
// --- Server-client messaging -------------------------------------------------
static void static void
display (const char *format, ...) display (const char *format, ...)
{ {
@ -306,6 +459,11 @@ draw_point (app_context_t *app, int x, int y, uint8_t color)
addch (app->palette[color]); addch (app->palette[color]);
refresh (); refresh ();
} }
// Broadcast the clients about the event
if (app->mode == NETWORK_MODE_SERVER)
for (client_t *iter = app->clients; iter; iter = iter->next)
send_draw_point_response (iter, x, y, color);
} }
// --- Exports ----------------------------------------------------------------- // --- Exports -----------------------------------------------------------------
@ -538,7 +696,8 @@ on_key (app_context_t *app, termo_key_t *key)
if (event != TERMO_MOUSE_PRESS && event != TERMO_MOUSE_DRAG) if (event != TERMO_MOUSE_PRESS && event != TERMO_MOUSE_DRAG)
return true; return true;
if (button == 2) // Middle mouse button, or Ctrl + left mouse button, moves the canvas
if (button == 2 || (button == 1 && key->modifiers == TERMO_KEYMOD_CTRL))
{ {
if (event == TERMO_MOUSE_DRAG) if (event == TERMO_MOUSE_DRAG)
{ {
@ -568,7 +727,12 @@ on_key (app_context_t *app, termo_key_t *key)
int canvas_y = app->corner_y + screen_y - TOP_BAR_CUTOFF; int canvas_y = app->corner_y + screen_y - TOP_BAR_CUTOFF;
if (screen_y >= TOP_BAR_CUTOFF) if (screen_y >= TOP_BAR_CUTOFF)
{
if (app->mode == NETWORK_MODE_CLIENT)
send_draw_point_request (app, canvas_x, canvas_y, *color);
else
draw_point (app, canvas_x, canvas_y, *color); draw_point (app, canvas_x, canvas_y, *color);
}
else if (screen_y > 0 && event != TERMO_MOUSE_DRAG) else if (screen_y > 0 && event != TERMO_MOUSE_DRAG)
{ {
int pair = (float) screen_x / COLS * PALETTE_WIDTH; int pair = (float) screen_x / COLS * PALETTE_WIDTH;
@ -650,14 +814,13 @@ app_uv_allocator (uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
buf->len = sizeof app->read_buf; buf->len = sizeof app->read_buf;
} }
static void // --- Client-specific stuff ---------------------------------------------------
on_server_data (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
(void) buf;
app_context_t *app = stream->loop->data; typedef void (*server_handler_fn) (app_context_t *, struct msg_unpacker *);
if (nread == UV_EOF || nread < 0)
{ static void
on_server_disconnected (app_context_t *app)
{
// TODO: cancel any write requests? // TODO: cancel any write requests?
// XXX: should we unref it? // XXX: should we unref it?
uv_close ((uv_handle_t *) &app->server_fd, NULL); uv_close ((uv_handle_t *) &app->server_fd, NULL);
@ -668,26 +831,214 @@ on_server_data (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
// Let the user save the picture at least. // Let the user save the picture at least.
// Also prevents us from trying to use the dead server handle. // Also prevents us from trying to use the dead server handle.
app->mode = NETWORK_MODE_STANDALONE; app->mode = NETWORK_MODE_STANDALONE;
}
static void
on_server_hello (app_context_t *app, struct msg_unpacker *unpacker)
{
(void) app;
uint8_t version;
if (!msg_unpacker_u8 (unpacker, &version))
return;
if (version != PROTOCOL_VERSION)
// XXX: possibly incompatible version, disconnect?
return;
}
static void
on_server_get_bitmap (app_context_t *app, struct msg_unpacker *unpacker)
{
int32_t x, y;
uint64_t w, h;
if (!msg_unpacker_i32 (unpacker, &x)
|| !msg_unpacker_i32 (unpacker, &y)
|| !msg_unpacker_u64 (unpacker, &w)
|| !msg_unpacker_u64 (unpacker, &h))
return;
// TODO: employ at least some RLE encoding
size_t size = w * h * sizeof *app->bitmap;
if ((h && w > SIZE_MAX / h) || w > SIZE_MAX || h > SIZE_MAX)
return;
const void *data;
if (!msg_unpacker_blob (unpacker, &data, size))
return;
free (app->bitmap);
app->bitmap = memcpy (xcalloc (1, size), data, size);
app->bitmap_x = x;
app->bitmap_y = y;
app->bitmap_w = w;
app->bitmap_h = h;
redraw_canvas (app);
}
static void
on_server_put_point (app_context_t *app, struct msg_unpacker *unpacker)
{
int32_t x, y;
uint8_t color;
if (!msg_unpacker_i32 (unpacker, &x)
|| !msg_unpacker_i32 (unpacker, &y)
|| !msg_unpacker_u8 (unpacker, &color))
return;
draw_point (app, x, y, color);
}
static void
on_server_data (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
app_context_t *app = stream->loop->data;
if (nread == UV_EOF || nread < 0)
{
on_server_disconnected (app);
return; return;
} }
// TODO: process the data msg_reader_feed (&app->msg_reader, buf->base, nread);
static const server_handler_fn handlers[MESSAGE_COUNT] =
{
[MESSAGE_HELLO] = on_server_hello,
[MESSAGE_GET_BITMAP] = on_server_get_bitmap,
[MESSAGE_PUT_POINT] = on_server_put_point,
};
void *msg;
size_t len;
while ((msg = msg_reader_get (&app->msg_reader, &len)))
{
struct msg_unpacker unpacker;
msg_unpacker_init (&unpacker, msg, len);
uint8_t type;
if (!msg_unpacker_u8 (&unpacker, &type)
|| type >= MESSAGE_COUNT)
// XXX: unknown message, disconnect?
continue;
server_handler_fn handler = handlers[type];
if (!handler)
// XXX: unknown message, disconnect?
continue;
handler (app, &unpacker);
if (msg_unpacker_get_available (&unpacker) > 0)
// XXX: overlong message, disconnect?
continue;
}
}
// --- Server-specific stuff ---------------------------------------------------
typedef bool (*client_handler_fn)
(app_context_t *, client_t *, struct msg_unpacker *);
static void
remove_client (app_context_t *app, client_t *client)
{
// TODO: cancel any write requests?
// XXX: should we unref it?
uv_close ((uv_handle_t *) &client->handle, NULL);
LIST_UNLINK (app->clients, client);
msg_reader_free (&client->msg_reader);
free (client);
}
static bool
on_client_hello (app_context_t *app, client_t *client,
struct msg_unpacker *unpacker)
{
(void) app;
uint8_t version;
if (!msg_unpacker_u8 (unpacker, &version)
|| version != PROTOCOL_VERSION)
// Nope, I don't like you
return false;
send_hello_response (client);
return true;
}
static bool
on_client_get_bitmap (app_context_t *app, client_t *client,
struct msg_unpacker *unpacker)
{
(void) unpacker;
send_get_bitmap_response (client, app);
return true;
}
static bool
on_client_put_point (app_context_t *app, client_t *client,
struct msg_unpacker *unpacker)
{
(void) client;
int32_t x, y;
uint8_t color;
if (!msg_unpacker_i32 (unpacker, &x)
|| !msg_unpacker_i32 (unpacker, &y)
|| !msg_unpacker_u8 (unpacker, &color))
return false;
// The function takes care of broadcasting to all the other clients,
// as well as back to the original sender
draw_point (app, x, y, color);
return true;
} }
static void static void
on_client_data (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) on_client_data (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{ {
(void) buf;
app_context_t *app = stream->loop->data; app_context_t *app = stream->loop->data;
client_t *client = stream->data; client_t *client = stream->data;
if (nread == UV_EOF || nread < 0) if (nread == UV_EOF || nread < 0)
{ goto disconnect; // Connection closed or error
remove_client (app, client);
return;
}
// TODO: process the data msg_reader_feed (&client->msg_reader, buf->base, nread);
static const client_handler_fn handlers[MESSAGE_COUNT] =
{
[MESSAGE_HELLO] = on_client_hello,
[MESSAGE_GET_BITMAP] = on_client_get_bitmap,
[MESSAGE_PUT_POINT] = on_client_put_point,
};
void *msg;
size_t len;
while ((msg = msg_reader_get (&client->msg_reader, &len)))
{
struct msg_unpacker unpacker;
msg_unpacker_init (&unpacker, msg, len);
uint8_t type;
if (!msg_unpacker_u8 (&unpacker, &type))
goto disconnect; // Invalid message
if (type >= MESSAGE_COUNT)
goto disconnect; // Unknown message
client_handler_fn handler = handlers[type];
if (!handler)
goto disconnect; // Unknown message
if (!handler (app, client, &unpacker))
goto disconnect; // Invalid message
if (msg_unpacker_get_available (&unpacker) > 0)
goto disconnect; // Overlong message data
}
return;
disconnect:
remove_client (app, client);
} }
static void static void
@ -708,6 +1059,7 @@ on_new_client (uv_stream_t *server, int status)
goto free_handle; goto free_handle;
client->handle.data = client; client->handle.data = client;
msg_reader_init (&client->msg_reader);
LIST_PREPEND (app->clients, client); LIST_PREPEND (app->clients, client);
return; return;
@ -907,6 +1259,9 @@ initialize_client (app_context_t *app, struct addrinfo *address)
"error", "initialization failed", uv_strerror (err)); "error", "initialization failed", uv_strerror (err));
exit (EXIT_FAILURE); exit (EXIT_FAILURE);
} }
send_hello_request (app);
send_get_bitmap_request (app);
} }
static void static void

216
utils.c
View File

@ -216,6 +216,20 @@ str_append_printf (struct str *self, const char *fmt, ...)
return size; return size;
} }
static void
str_remove_slice (struct str *self, size_t start, size_t length)
{
size_t end = start + length;
if (end > self->len)
end = self->len;
memmove (self->str + start, self->str + end, self->len - end);
self->str[self->len -= length] = '\0';
// Shrink the string if the allocation becomes way too large
if (self->alloc >= STR_SHRINK_THRESHOLD && self->len < (self->alloc >> 2))
self->str = xrealloc (self->str, self->alloc >>= 2);
}
// --- Utilities --------------------------------------------------------------- // --- Utilities ---------------------------------------------------------------
static bool static bool
@ -270,6 +284,208 @@ xstrtoul (unsigned long *out, const char *s, int base)
return errno == 0 && !*end && end != s; return errno == 0 && !*end && end != s;
} }
// --- Message reader ----------------------------------------------------------
struct msg_reader
{
struct str buf; ///< Input buffer
uint64_t offset; ///< Current offset in the buffer
};
static void
msg_reader_init (struct msg_reader *self)
{
str_init (&self->buf);
self->offset = 0;
}
static void
msg_reader_free (struct msg_reader *self)
{
str_free (&self->buf);
}
static void
msg_reader_compact (struct msg_reader *self)
{
str_remove_slice (&self->buf, 0, self->offset);
self->offset = 0;
}
static void
msg_reader_feed (struct msg_reader *self, const void *data, size_t len)
{
// TODO: have some mechanism to prevent flooding
msg_reader_compact (self);
str_append_data (&self->buf, data, len);
}
static void *
msg_reader_get (struct msg_reader *self, size_t *len)
{
// Try to read in the length of the message
if (self->offset + sizeof (uint64_t) > self->buf.len)
return NULL;
uint8_t *x = (uint8_t *) self->buf.str + self->offset;
uint64_t msg_len
= (uint64_t) x[0] << 56 | (uint64_t) x[1] << 48
| (uint64_t) x[2] << 40 | (uint64_t) x[3] << 32
| (uint64_t) x[4] << 24 | (uint64_t) x[5] << 16
| (uint64_t) x[6] << 8 | (uint64_t) x[7];
if (msg_len < sizeof msg_len)
{
// The message is shorter than its header
// TODO: have some mechanism to report errors
return NULL;
}
if (self->offset + msg_len < self->offset)
{
// Trying to read an insane amount of data but whatever
msg_reader_compact (self);
return NULL;
}
// Check if we've got the full message in the buffer and return it
if (self->offset + msg_len > self->buf.len)
return NULL;
// We have to subtract the header from the reported length
void *data = self->buf.str + self->offset + sizeof msg_len;
self->offset += msg_len;
*len = msg_len - sizeof msg_len;
return data;
}
// --- Message unpacker --------------------------------------------------------
struct msg_unpacker
{
const char *data;
size_t offset;
size_t len;
};
static void
msg_unpacker_init (struct msg_unpacker *self, const void *data, size_t len)
{
self->data = data;
self->len = len;
self->offset = 0;
}
static size_t
msg_unpacker_get_available (struct msg_unpacker *self)
{
return self->len - self->offset;
}
static bool
msg_unpacker_blob (struct msg_unpacker *self, const void **data, size_t len)
{
if (self->len - self->offset < len)
return false;
*data = self->data + self->offset;
self->offset += len;
return true;
}
#define UNPACKER_INT_BEGIN \
if (self->len - self->offset < sizeof *value) \
return false; \
uint8_t *x = (uint8_t *) self->data + self->offset; \
self->offset += sizeof *value;
static bool
msg_unpacker_u8 (struct msg_unpacker *self, uint8_t *value)
{
UNPACKER_INT_BEGIN
*value = x[0];
return true;
}
static bool
msg_unpacker_i32 (struct msg_unpacker *self, int32_t *value)
{
UNPACKER_INT_BEGIN
*value
= (uint32_t) x[0] << 24 | (uint32_t) x[1] << 16
| (uint32_t) x[2] << 8 | (uint32_t) x[3];
return true;
}
static bool
msg_unpacker_u64 (struct msg_unpacker *self, uint64_t *value)
{
UNPACKER_INT_BEGIN
*value
= (uint64_t) x[0] << 56 | (uint64_t) x[1] << 48
| (uint64_t) x[2] << 40 | (uint64_t) x[3] << 32
| (uint64_t) x[4] << 24 | (uint64_t) x[5] << 16
| (uint64_t) x[6] << 8 | (uint64_t) x[7];
return true;
}
#undef UNPACKER_INT_BEGIN
// --- Message packer and writer -----------------------------------------------
struct msg_writer
{
struct str buf; ///< Holds the message data
};
static void
msg_writer_init (struct msg_writer *self)
{
str_init (&self->buf);
// Placeholder for message length
str_append_data (&self->buf, "\x00\x00\x00\x00" "\x00\x00\x00\x00", 8);
}
static void
msg_writer_blob (struct msg_writer *self, const void *data, size_t len)
{
str_append_data (&self->buf, data, len);
}
static void
msg_writer_u8 (struct msg_writer *self, uint8_t x)
{
str_append_data (&self->buf, &x, 1);
}
static void
msg_writer_i32 (struct msg_writer *self, int32_t x)
{
uint32_t u = x;
uint8_t tmp[4] = { u >> 24, u >> 16, u >> 8, u };
str_append_data (&self->buf, tmp, sizeof tmp);
}
static void
msg_writer_u64 (struct msg_writer *self, uint64_t x)
{
uint8_t tmp[8] =
{ x >> 56, x >> 48, x >> 40, x >> 32, x >> 24, x >> 16, x >> 8, x };
str_append_data (&self->buf, tmp, sizeof tmp);
}
static void *
msg_writer_flush (struct msg_writer *self, size_t *len)
{
// Update the message length
uint64_t x = self->buf.len;
uint8_t tmp[8] =
{ x >> 56, x >> 48, x >> 40, x >> 32, x >> 24, x >> 16, x >> 8, x };
memcpy (self->buf.str, tmp, sizeof tmp);
*len = x;
return str_steal (&self->buf);
}
// --- Option handler ---------------------------------------------------------- // --- Option handler ----------------------------------------------------------
// Simple wrapper for the getopt_long API to make it easier to use and maintain. // Simple wrapper for the getopt_long API to make it easier to use and maintain.