Make the relay acknowledge all received commands
All checks were successful
Alpine 3.21 Success
Arch Linux AUR Success
OpenBSD 7.6 Success

To that effect, bump liberty and the xC relay protocol version.
Relay events have been reordered to improve forward compatibility.

Also prevent use-after-free when serialization fails.

xP now slightly throttles activity notifications,
and indicates when there are unacknowledged commands.
This commit is contained in:
Přemysl Eric Janouch 2025-05-09 22:34:25 +02:00
parent 4cf8c394b9
commit 7ba17a0161
Signed by: p
GPG Key ID: A0420B94F92B9493
10 changed files with 158 additions and 68 deletions

View File

@ -1,4 +1,4 @@
Copyright (c) 2014 - 2024, Přemysl Eric Janouch <p@janouch.name>
Copyright (c) 2014 - 2025, Přemysl Eric Janouch <p@janouch.name>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted.

12
NEWS
View File

@ -1,3 +1,15 @@
Unreleased
* xC: added more characters as nickname delimiters,
so that @nick works as a highlight
* xC: prevented rare crashes in relay code
* xP: added a network lag indicator to the user interface
* Bumped relay protocol version
2.1.0 (2024-12-19) "Bunnyrific"
* xC: fixed a crash when the channel topic had too many formatting items

@ -1 +1 @@
Subproject commit af889b733e81fa40d7a7ff652386585115e186f5
Subproject commit 31ae40085206dc365a15fd6e9d13978e392f8b35

View File

@ -337,9 +337,14 @@ func relaySend(data RelayCommandData, callback callback) bool {
CommandSeq: commandSeq,
Data: data,
}
if callback != nil {
commandCallbacks[m.CommandSeq] = callback
if callback == nil {
callback = func(err string, response *RelayResponseData) {
if response == nil {
showErrorMessage(err)
}
}
}
commandCallbacks[m.CommandSeq] = callback
commandSeq++
// TODO(p): Handle errors better.

75
xC.c
View File

@ -1818,6 +1818,7 @@ struct client
uint32_t event_seq; ///< Outgoing message counter
bool initialized; ///< Initial sync took place
bool closing; ///< We're closing the connection
struct poller_fd socket_event; ///< The socket can be read/written to
};
@ -1875,7 +1876,7 @@ enum server_state
IRC_CONNECTED, ///< Trying to register
IRC_REGISTERED, ///< We can chat now
IRC_CLOSING, ///< Flushing output before shutdown
IRC_HALF_CLOSED ///< Connection shutdown from our side
IRC_HALF_CLOSED ///< Connection shut down from our side
};
/// Convert an IRC identifier character to lower-case
@ -2263,14 +2264,6 @@ struct app_context
struct str_map servers; ///< Our servers
// Relay:
int relay_fd; ///< Listening socket FD
struct client *clients; ///< Our relay clients
/// A single message buffer to prepare all outcoming messages within
struct relay_event_message relay_message;
// Events:
struct poller_fd tty_event; ///< Terminal input event
@ -2322,6 +2315,14 @@ struct app_context
char *editor_filename; ///< The file being edited by user
int terminal_suspended; ///< Terminal suspension level
// Relay:
int relay_fd; ///< Listening socket FD
struct client *clients; ///< Our relay clients
/// A single message buffer to prepare all outcoming messages within
struct relay_event_message relay_message;
// Plugins:
struct plugin *plugins; ///< Loaded plugins
@ -2392,8 +2393,6 @@ app_context_init (struct app_context *self)
self->config = config_make ();
poller_init (&self->poller);
self->relay_fd = -1;
self->servers = str_map_make ((str_map_free_fn) server_unref);
self->servers.key_xfrm = tolower_ascii_strxfrm;
@ -2417,6 +2416,8 @@ app_context_init (struct app_context *self)
self->nick_palette =
filter_color_cube_for_acceptable_nick_colors (&self->nick_palette_len);
self->relay_fd = -1;
}
static void
@ -4152,8 +4153,11 @@ client_kill (struct client *c)
static void
client_update_poller (struct client *c, const struct pollfd *pfd)
{
// In case of closing without any data in the write buffer,
// we don't actually need to be able to write to the socket,
// but the condition should be quick to satisfy.
int new_events = POLLIN;
if (c->write_buffer.len)
if (c->write_buffer.len || c->closing)
new_events |= POLLOUT;
hard_assert (new_events != 0);
@ -4168,9 +4172,7 @@ relay_send (struct client *c)
{
struct relay_event_message *m = &c->ctx->relay_message;
m->event_seq = c->event_seq++;
// TODO: Also don't try sending anything if half-closed.
if (!c->initialized || c->socket_fd == -1)
if (!c->initialized || c->closing || c->socket_fd == -1)
return;
// liberty has msg_{reader,writer} already, but they use 8-byte lengths.
@ -4180,12 +4182,18 @@ relay_send (struct client *c)
|| (frame_len = c->write_buffer.len - frame_len_pos - 4) > UINT32_MAX)
{
print_error ("serialization failed, killing client");
client_kill (c);
return;
// We can't kill the client immediately,
// because more relay_send() calls may follow.
c->write_buffer.len = frame_len_pos;
c->closing = true;
}
else
{
uint32_t len = htonl (frame_len);
memcpy (c->write_buffer.str + frame_len_pos, &len, sizeof len);
}
uint32_t len = htonl (frame_len);
memcpy (c->write_buffer.str + frame_len_pos, &len, sizeof len);
client_update_poller (c, NULL);
}
@ -15604,28 +15612,31 @@ client_process_message (struct client *c,
return true;
}
bool acknowledge = true;
switch (m->data.command)
{
case RELAY_COMMAND_HELLO:
c->initialized = true;
if (m->data.hello.version != RELAY_VERSION)
{
// TODO: This should send back an error message and shut down.
log_global_error (c->ctx,
"Protocol version mismatch, killing client");
return false;
relay_prepare_error (c->ctx,
m->command_seq, "Protocol version mismatch");
relay_send (c);
c->closing = true;
return true;
}
c->initialized = true;
client_resync (c);
break;
case RELAY_COMMAND_PING:
relay_prepare_response (c->ctx, m->command_seq)
->data.command = RELAY_COMMAND_PING;
relay_send (c);
break;
case RELAY_COMMAND_ACTIVE:
reset_autoaway (c->ctx);
break;
case RELAY_COMMAND_BUFFER_COMPLETE:
acknowledge = false;
client_process_buffer_complete (c, m->command_seq, buffer,
&m->data.buffer_complete);
break;
@ -15639,13 +15650,21 @@ client_process_message (struct client *c,
buffer_toggle_unimportant (c->ctx, buffer);
break;
case RELAY_COMMAND_BUFFER_LOG:
acknowledge = false;
client_process_buffer_log (c, m->command_seq, buffer);
break;
default:
acknowledge = false;
log_global_debug (c->ctx, "Unhandled client command");
relay_prepare_error (c->ctx, m->command_seq, "Unknown command");
relay_send (c);
}
if (acknowledge)
{
relay_prepare_response (c->ctx, m->command_seq)
->data.command = m->data.command;
relay_send (c);
}
return true;
}
@ -15667,7 +15686,7 @@ client_process_buffer (struct client *c)
break;
struct relay_command_message m = {};
bool ok = client_process_message (c, &r, &m);
bool ok = c->closing || client_process_message (c, &r, &m);
relay_command_message_free (&m);
if (!ok)
return false;
@ -15739,7 +15758,11 @@ on_client_ready (const struct pollfd *pfd, void *user_data)
{
struct client *c = user_data;
if (client_try_read (c) && client_try_write (c))
{
client_update_poller (c, pfd);
if (c->closing && !c->write_buffer.len)
client_kill (c);
}
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

53
xC.lxdr
View File

@ -1,7 +1,8 @@
// Backwards-compatible protocol version.
const VERSION = 1;
const VERSION = 2;
// From the frontend to the relay.
// All commands receive either an Event.RESPONSE, or an Event.ERROR.
struct CommandMessage {
// The command sequence number will be repeated in responses
// in the respective fields.
@ -32,13 +33,10 @@ struct CommandMessage {
// XXX: Perhaps this should rather be handled through a /buffer command.
case BUFFER_TOGGLE_UNIMPORTANT:
string buffer_name;
case PING_RESPONSE:
u32 event_seq;
// Only these commands may produce Event.RESPONSE, as below,
// but any command may produce an error.
case PING:
void;
case PING_RESPONSE:
u32 event_seq;
case BUFFER_COMPLETE:
string buffer_name;
string text;
@ -52,6 +50,9 @@ struct CommandMessage {
struct EventMessage {
u32 event_seq;
union EventData switch (enum Event {
ERROR,
RESPONSE,
PING,
BUFFER_LINE,
BUFFER_UPDATE,
@ -64,12 +65,28 @@ struct EventMessage {
SERVER_UPDATE,
SERVER_RENAME,
SERVER_REMOVE,
ERROR,
RESPONSE,
} event) {
// Restriction: command_seq strictly follows the sequence received
// by the relay, across both of these replies.
case ERROR:
u32 command_seq;
string error;
case RESPONSE:
u32 command_seq;
union ResponseData switch (Command command) {
case BUFFER_COMPLETE:
u32 start;
string completions<>;
case BUFFER_LOG:
// UTF-8, but not guaranteed.
u8 log<>;
default:
// Reception acknowledged.
void;
} data;
case PING:
void;
case BUFFER_LINE:
string buffer_name;
// Whether the line should also be displayed in the active buffer.
@ -188,23 +205,5 @@ struct EventMessage {
string new;
case SERVER_REMOVE:
string server_name;
// Restriction: command_seq strictly follows the sequence received
// by the relay, across both of these replies.
case ERROR:
u32 command_seq;
string error;
case RESPONSE:
u32 command_seq;
union ResponseData switch (Command command) {
case PING:
void;
case BUFFER_COMPLETE:
u32 start;
string completions<>;
case BUFFER_LOG:
// UTF-8, but not guaranteed.
u8 log<>;
} data;
} data;
};

View File

@ -173,8 +173,11 @@ class RelayRPC {
func send(data: RelayCommandData, callback: Callback? = nil) {
self.commandSeq += 1
let m = RelayCommandMessage(commandSeq: self.commandSeq, data: data)
if let callback = callback {
self.commandCallbacks[m.commandSeq] = callback
self.commandCallbacks[m.commandSeq] = callback ?? { error, data in
if data == nil {
NSSound.beep()
Logger().warning("\(error)")
}
}
var w = RelayWriter()

View File

@ -1,4 +1,4 @@
// Copyright (c) 2022 - 2024, Přemysl Eric Janouch <p@janouch.name>
// Copyright (c) 2022 - 2025, Přemysl Eric Janouch <p@janouch.name>
// SPDX-License-Identifier: 0BSD
import * as Relay from './proto.js'
@ -67,18 +67,19 @@ class RelayRPC extends EventTarget {
_processOne(message) {
let e = message.data
let p
switch (e.event) {
case Relay.Event.Error:
if (this.promised[e.commandSeq] !== undefined)
this.promised[e.commandSeq].reject(e.error)
else
if ((p = this.promised[e.commandSeq]) === undefined)
console.error(`Unawaited error: ${e.error}`)
else if (p !== true)
p.reject(e.error)
break
case Relay.Event.Response:
if (this.promised[e.commandSeq] !== undefined)
this.promised[e.commandSeq].resolve(e.data)
else
if ((p = this.promised[e.commandSeq]) === undefined)
console.error("Unawaited response")
else if (p !== true)
p.resolve(e.data)
break
default:
e.eventSeq = message.eventSeq
@ -95,6 +96,13 @@ class RelayRPC extends EventTarget {
this.promised[seq].reject("No response")
delete this.promised[seq]
}
m.redraw()
}
get busy() {
for (const seq in this.promised)
return true
return false
}
send(params) {
@ -110,6 +118,9 @@ class RelayRPC extends EventTarget {
this.ws.send(JSON.stringify({commandSeq: seq, data: params}))
this.promised[seq] = true
m.redraw()
// Automagically detect if we want a result.
let data = undefined
const promise = new Promise(
@ -191,6 +202,17 @@ let bufferAutoscroll = true
let servers = new Map()
let lastActive = undefined
function notifyActive() {
// Reduce unnecessary traffic.
const now = Date.now()
if (lastActive === undefined || (now - lastActive >= 5000)) {
lastActive = now
rpc.send({command: 'Active'})
}
}
function bufferResetStats(b) {
b.newMessages = 0
b.newUnimportantMessages = 0
@ -998,7 +1020,7 @@ let Input = {
onKeyDown: event => {
// TODO: And perhaps on other actions, too.
rpc.send({command: 'Active'})
notifyActive()
let b = buffers.get(bufferCurrent)
if (b === undefined || event.isComposing)
@ -1103,7 +1125,13 @@ let Main = {
return m('.xP', {}, [
overlay,
m('.title', {}, [m('b', {}, `xP`), m(Topic)]),
m('.title', {}, [
m('span', [
rpc.busy ? '⋯ ' : undefined,
m('b', {}, `xP`),
]),
m(Topic),
]),
m('.middle', {}, [m(BufferList), m(BufferContainer)]),
m(Status),
m('.input', {}, [m(Prompt), m(Input)]),

View File

@ -179,6 +179,14 @@ beep()
// --- Networking --------------------------------------------------------------
static void
on_relay_generic_response(
std::wstring error, const Relay::ResponseData *response)
{
if (!response)
show_error_message(QString::fromStdWString(error));
}
static void
relay_send(Relay::CommandData *data, Callback callback = {})
{
@ -190,6 +198,8 @@ relay_send(Relay::CommandData *data, Callback callback = {})
if (callback)
g.command_callbacks[m.command_seq] = std::move(callback);
else
g.command_callbacks[m.command_seq] = on_relay_generic_response;
auto len = qToBigEndian<uint32_t>(w.data.size());
auto prefix = reinterpret_cast<const char *>(&len);

View File

@ -221,6 +221,14 @@ relay_try_write(std::wstring &error)
return true;
}
static void
on_relay_generic_response(
std::wstring error, const Relay::ResponseData *response)
{
if (!response)
show_error_message(error.c_str());
}
static void
relay_send(Relay::CommandData *data, Callback callback = {})
{
@ -232,6 +240,8 @@ relay_send(Relay::CommandData *data, Callback callback = {})
if (callback)
g.command_callbacks[m.command_seq] = std::move(callback);
else
g.command_callbacks[m.command_seq] = on_relay_generic_response;
uint32_t len = htonl(w.data.size());
uint8_t *prefix = reinterpret_cast<uint8_t *>(&len);