degesch: clean up the async code a bit
This commit is contained in:
parent
2357f1382a
commit
7de1309421
142
degesch.c
142
degesch.c
|
@ -1065,9 +1065,9 @@ struct transport
|
||||||
void (*cleanup) (struct server *s);
|
void (*cleanup) (struct server *s);
|
||||||
|
|
||||||
/// The underlying socket may have become readable, update `read_buffer'
|
/// The underlying socket may have become readable, update `read_buffer'
|
||||||
enum transport_io_result (*on_readable) (struct server *s);
|
enum transport_io_result (*try_read) (struct server *s);
|
||||||
/// The underlying socket may have become writeable, flush `write_buffer'
|
/// The underlying socket may have become writeable, flush `write_buffer'
|
||||||
enum transport_io_result (*on_writeable) (struct server *s);
|
enum transport_io_result (*try_write) (struct server *s);
|
||||||
/// Return event mask to use in the poller
|
/// Return event mask to use in the poller
|
||||||
int (*get_poll_events) (struct server *s);
|
int (*get_poll_events) (struct server *s);
|
||||||
|
|
||||||
|
@ -3203,7 +3203,9 @@ irc_send (struct server *s, const char *format, ...)
|
||||||
print_debug ("tried sending a message to a dead server connection");
|
print_debug ("tried sending a message to a dead server connection");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (s->state == IRC_CLOSING)
|
|
||||||
|
if (s->state == IRC_CLOSING
|
||||||
|
|| s->state == IRC_HALF_CLOSED)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
va_list ap;
|
va_list ap;
|
||||||
|
@ -3329,7 +3331,7 @@ initiate_quit (struct app_context *ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
on_irc_disconnected (struct server *s)
|
irc_disconnect (struct server *s)
|
||||||
{
|
{
|
||||||
hard_assert (irc_is_connected (s));
|
hard_assert (irc_is_connected (s));
|
||||||
|
|
||||||
|
@ -3337,6 +3339,7 @@ on_irc_disconnected (struct server *s)
|
||||||
if (s->transport
|
if (s->transport
|
||||||
&& s->transport->cleanup)
|
&& s->transport->cleanup)
|
||||||
s->transport->cleanup (s);
|
s->transport->cleanup (s);
|
||||||
|
s->transport = NULL;
|
||||||
|
|
||||||
xclose (s->socket);
|
xclose (s->socket);
|
||||||
s->socket = -1;
|
s->socket = -1;
|
||||||
|
@ -3403,7 +3406,7 @@ on_irc_ping_timeout (void *user_data)
|
||||||
{
|
{
|
||||||
struct server *s = user_data;
|
struct server *s = user_data;
|
||||||
log_server_error (s, s->buffer, "Connection timeout");
|
log_server_error (s, s->buffer, "Connection timeout");
|
||||||
on_irc_disconnected (s);
|
irc_disconnect (s);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -3419,74 +3422,91 @@ on_irc_timeout (void *user_data)
|
||||||
static void irc_process_message
|
static void irc_process_message
|
||||||
(const struct irc_message *msg, const char *raw, void *user_data);
|
(const struct irc_message *msg, const char *raw, void *user_data);
|
||||||
|
|
||||||
static void
|
static enum transport_io_result
|
||||||
on_irc_ready (const struct pollfd *pfd, struct server *s)
|
irc_try_read (struct server *s)
|
||||||
|
{
|
||||||
|
enum transport_io_result result = s->transport->try_read (s);
|
||||||
|
if (result == TRANSPORT_IO_OK)
|
||||||
{
|
{
|
||||||
struct transport *transport = s->transport;
|
|
||||||
enum transport_io_result result;
|
|
||||||
|
|
||||||
if ((result = transport->on_readable (s)) == TRANSPORT_IO_ERROR)
|
|
||||||
goto error;
|
|
||||||
bool read_eof = result == TRANSPORT_IO_EOF;
|
|
||||||
|
|
||||||
if (s->read_buffer.len >= (1 << 20))
|
if (s->read_buffer.len >= (1 << 20))
|
||||||
{
|
{
|
||||||
// XXX: this is stupid; if anything, count it in dependence of time
|
// XXX: this is stupid; if anything, count it in dependence of time
|
||||||
log_server_error (s, s->buffer,
|
log_server_error (s, s->buffer,
|
||||||
"The IRC server seems to spew out data frantically");
|
"The IRC server seems to spew out data frantically");
|
||||||
goto disconnect;
|
return TRANSPORT_IO_ERROR;
|
||||||
}
|
}
|
||||||
if (s->read_buffer.len)
|
if (s->read_buffer.len)
|
||||||
irc_process_buffer (&s->read_buffer, irc_process_message, s);
|
irc_process_buffer (&s->read_buffer, irc_process_message, s);
|
||||||
|
}
|
||||||
if ((result = transport->on_writeable (s)) == TRANSPORT_IO_ERROR)
|
return result;
|
||||||
goto error;
|
|
||||||
bool write_eof = result == TRANSPORT_IO_EOF;
|
|
||||||
|
|
||||||
// FIXME: this may probably fire multiple times if we're flushing after it,
|
|
||||||
// we should probably store this information next to the state
|
|
||||||
if (read_eof || write_eof)
|
|
||||||
log_server_error (s, s->buffer, "The IRC server closed the connection");
|
|
||||||
|
|
||||||
// It makes no sense to flush anything if the write needs to read
|
|
||||||
// and we receive an EOF -> disconnect right away
|
|
||||||
if (write_eof)
|
|
||||||
goto disconnect;
|
|
||||||
|
|
||||||
// If we've been asked to flush the write buffer and our job is complete,
|
|
||||||
// we send an EOF to the server, changing the state to IRC_HALF_CLOSED
|
|
||||||
if (s->state == IRC_CLOSING && !s->write_buffer.len)
|
|
||||||
irc_real_shutdown (s);
|
|
||||||
|
|
||||||
if (read_eof)
|
|
||||||
{
|
|
||||||
// Both ends closed, we're done
|
|
||||||
if (s->state == IRC_HALF_CLOSED)
|
|
||||||
goto disconnect;
|
|
||||||
|
|
||||||
// Otherwise we want to flush the write buffer
|
|
||||||
irc_shutdown (s);
|
|
||||||
|
|
||||||
// If that went well, we can disconnect now
|
|
||||||
if (s->state == IRC_HALF_CLOSED)
|
|
||||||
goto disconnect;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static enum transport_io_result
|
||||||
|
irc_try_write (struct server *s)
|
||||||
|
{
|
||||||
|
enum transport_io_result result = s->transport->try_write (s);
|
||||||
|
if (result == TRANSPORT_IO_OK)
|
||||||
|
{
|
||||||
|
// If we're flushing the write buffer and our job is complete, we send
|
||||||
|
// an EOF to the server, changing the state to IRC_HALF_CLOSED
|
||||||
|
if (s->state == IRC_CLOSING && !s->write_buffer.len)
|
||||||
|
irc_real_shutdown (s);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
irc_try_read_write (struct server *s)
|
||||||
|
{
|
||||||
|
enum transport_io_result read_result;
|
||||||
|
enum transport_io_result write_result;
|
||||||
|
if ((read_result = irc_try_read (s)) == TRANSPORT_IO_ERROR
|
||||||
|
|| (write_result = irc_try_write (s)) == TRANSPORT_IO_ERROR)
|
||||||
|
{
|
||||||
|
log_server_error (s, s->buffer, "Server connection failed");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: this may probably fire multiple times when we're flushing,
|
||||||
|
// we should probably store a flag next to the state
|
||||||
|
if (read_result == TRANSPORT_IO_EOF
|
||||||
|
|| write_result == TRANSPORT_IO_EOF)
|
||||||
|
log_server_error (s, s->buffer, "Server closed the connection");
|
||||||
|
|
||||||
|
// If the write needs to read and we receive an EOF, we can't flush
|
||||||
|
if (write_result == TRANSPORT_IO_EOF)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (read_result == TRANSPORT_IO_EOF)
|
||||||
|
{
|
||||||
|
// Eventually initiate shutdown to flush the write buffer
|
||||||
|
irc_shutdown (s);
|
||||||
|
|
||||||
|
// If there's nothing to write, we can disconnect now
|
||||||
|
if (s->state == IRC_HALF_CLOSED)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
on_irc_ready (const struct pollfd *pfd, struct server *s)
|
||||||
|
{
|
||||||
|
if (irc_try_read_write (s))
|
||||||
|
{
|
||||||
// XXX: shouldn't we rather wait for PONG messages?
|
// XXX: shouldn't we rather wait for PONG messages?
|
||||||
irc_reset_connection_timeouts (s);
|
irc_reset_connection_timeouts (s);
|
||||||
irc_update_poller (s, pfd);
|
irc_update_poller (s, pfd);
|
||||||
return;
|
}
|
||||||
|
else
|
||||||
error:
|
// We don't want to keep the socket anymore
|
||||||
log_server_error (s, s->buffer, "Reading from the IRC server failed");
|
irc_disconnect (s);
|
||||||
disconnect:
|
|
||||||
on_irc_disconnected (s);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Plain transport ---------------------------------------------------------
|
// --- Plain transport ---------------------------------------------------------
|
||||||
|
|
||||||
static enum transport_io_result
|
static enum transport_io_result
|
||||||
transport_plain_on_readable (struct server *s)
|
transport_plain_try_read (struct server *s)
|
||||||
{
|
{
|
||||||
struct str *buf = &s->read_buffer;
|
struct str *buf = &s->read_buffer;
|
||||||
ssize_t n_read;
|
ssize_t n_read;
|
||||||
|
@ -3516,7 +3536,7 @@ transport_plain_on_readable (struct server *s)
|
||||||
}
|
}
|
||||||
|
|
||||||
static enum transport_io_result
|
static enum transport_io_result
|
||||||
transport_plain_on_writeable (struct server *s)
|
transport_plain_try_write (struct server *s)
|
||||||
{
|
{
|
||||||
struct str *buf = &s->write_buffer;
|
struct str *buf = &s->write_buffer;
|
||||||
ssize_t n_written;
|
ssize_t n_written;
|
||||||
|
@ -3552,8 +3572,8 @@ transport_plain_get_poll_events (struct server *s)
|
||||||
|
|
||||||
static struct transport g_transport_plain =
|
static struct transport g_transport_plain =
|
||||||
{
|
{
|
||||||
.on_readable = transport_plain_on_readable,
|
.try_read = transport_plain_try_read,
|
||||||
.on_writeable = transport_plain_on_writeable,
|
.try_write = transport_plain_try_write,
|
||||||
.get_poll_events = transport_plain_get_poll_events,
|
.get_poll_events = transport_plain_get_poll_events,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -3696,7 +3716,7 @@ transport_tls_cleanup (struct server *s)
|
||||||
}
|
}
|
||||||
|
|
||||||
static enum transport_io_result
|
static enum transport_io_result
|
||||||
transport_tls_on_readable (struct server *s)
|
transport_tls_try_read (struct server *s)
|
||||||
{
|
{
|
||||||
struct transport_tls_data *data = s->transport_data;
|
struct transport_tls_data *data = s->transport_data;
|
||||||
if (data->ssl_tx_want_rx)
|
if (data->ssl_tx_want_rx)
|
||||||
|
@ -3733,7 +3753,7 @@ transport_tls_on_readable (struct server *s)
|
||||||
}
|
}
|
||||||
|
|
||||||
static enum transport_io_result
|
static enum transport_io_result
|
||||||
transport_tls_on_writeable (struct server *s)
|
transport_tls_try_write (struct server *s)
|
||||||
{
|
{
|
||||||
struct transport_tls_data *data = s->transport_data;
|
struct transport_tls_data *data = s->transport_data;
|
||||||
if (data->ssl_rx_want_tx)
|
if (data->ssl_rx_want_tx)
|
||||||
|
@ -3794,8 +3814,8 @@ static struct transport g_transport_tls =
|
||||||
{
|
{
|
||||||
.init = transport_tls_init,
|
.init = transport_tls_init,
|
||||||
.cleanup = transport_tls_cleanup,
|
.cleanup = transport_tls_cleanup,
|
||||||
.on_readable = transport_tls_on_readable,
|
.try_read = transport_tls_try_read,
|
||||||
.on_writeable = transport_tls_on_writeable,
|
.try_write = transport_tls_try_write,
|
||||||
.get_poll_events = transport_tls_get_poll_events,
|
.get_poll_events = transport_tls_get_poll_events,
|
||||||
.in_before_shutdown = transport_tls_in_before_shutdown,
|
.in_before_shutdown = transport_tls_in_before_shutdown,
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue