From 6f39aa66156f53b27e3b9cfe8457fc2f64129e56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Eric=20Janouch?= Date: Thu, 15 Sep 2022 22:45:14 +0200 Subject: [PATCH] xP: use the binary protocol for incoming events And batch event messages together as much as possible. JSON has proven itself to be really slow (for example, encoding/json.Marshaler is a slow interface), and browsers have significant overhead per WS message. Commands are still sent as JSON, sending them in binary would be a laborious rewrite without measurable merits. The xP server now only prints debug output when requested, because that was another source of major slowdowns. --- xC-gen-proto-js.awk | 223 ++++++++++++++++++++++++++++++++++++++++++++ xP/.gitignore | 1 + xP/Makefile | 4 +- xP/public/xP.js | 202 ++++++++++++++++++++------------------- xP/xP.go | 122 ++++++++++++++++-------- 5 files changed, 413 insertions(+), 139 deletions(-) create mode 100644 xC-gen-proto-js.awk diff --git a/xC-gen-proto-js.awk b/xC-gen-proto-js.awk new file mode 100644 index 0000000..752fd18 --- /dev/null +++ b/xC-gen-proto-js.awk @@ -0,0 +1,223 @@ +# xC-gen-proto-js.awk: Javascript backend for xC-gen-proto.awk. +# +# Copyright (c) 2022, Přemysl Eric Janouch +# SPDX-License-Identifier: 0BSD +# +# This backend is currently for decoding the binary format only. +# (JSON is way too expensive to process and transfer.) +# +# Import the resulting script as a Javascript module. + +function define_internal(name) { + Types[name] = "internal" +} + +function define_sint(size, shortname) { + shortname = "i" size + define_internal(shortname) + CodegenDeserialize[shortname] = "\t%s = r." shortname "()\n" + + print "" + print "\t" shortname "() {" + if (size == "64") { + # XXX: 2^53 - 1 must be enough for anyone. BigInts are a PITA. + print "\t\tconst " shortname \ + " = Number(this.getBigInt" size "(this.offset))" + } else { + print "\t\tconst " shortname " = this.getInt" size "(this.offset)" + } + print "\t\tthis.offset += " (size / 8) + print "\t\treturn " shortname + print "\t}" +} + +function define_uint(size, shortname) { + shortname = "u" size + define_internal(shortname) + CodegenDeserialize[shortname] = "\t%s = r." shortname "()\n" + + print "" + print "\t" shortname "() {" + if (size == "64") { + # XXX: 2^53 - 1 must be enough for anyone. BigInts are a PITA. + print "\t\tconst " shortname \ + " = Number(this.getBigUint" size "(this.offset))" + } else { + print "\t\tconst " shortname " = this.getUint" size "(this.offset)" + } + print "\t\tthis.offset += " (size / 8) + print "\t\treturn " shortname + print "\t}" +} + +function codegen_begin() { + print "export class Reader extends DataView {" + print "\tconstructor() {" + print "\t\tsuper(...arguments)" + print "\t\tthis.offset = 0" + print "\t\tthis.decoder = new TextDecoder('utf-8', {fatal: true})" + print "\t}" + print "" + print "\tget empty() {" + print "\t\treturn this.byteLength <= this.offset" + print "\t}" + print "" + print "\trequire(len) {" + print "\t\tif (this.byteLength - this.offset < len)" + print "\t\t\tthrow `Premature end of data`" + print "\t\treturn this.byteOffset + this.offset" + print "\t}" + + define_internal("string") + CodegenDeserialize["string"] = "\t%s = r.string()\n" + + print "" + print "\tstring() {" + print "\t\tconst len = this.getUint32(this.offset)" + print "\t\tthis.offset += 4" + print "\t\tconst array = new Uint8Array(" + print "\t\t\tthis.buffer, this.require(len), len)" + print "\t\tthis.offset += len" + print "\t\treturn this.decoder.decode(array)" + print "\t}" + + define_internal("bool") + CodegenDeserialize["bool"] = "\t%s = r.bool()\n" + + print "" + print "\tbool() {" + print "\t\tconst u8 = this.getUint8(this.offset)" + print "\t\tthis.offset += 1" + print "\t\treturn u8 != 0" + print "\t}" + + define_sint("8") + define_sint("16") + define_sint("32") + define_sint("64") + define_uint("8") + define_uint("16") + define_uint("32") + define_uint("64") + + print "}" +} + +function codegen_constant(name, value) { + print "" + print "export const " decapitalize(snaketocamel(name)) " = " value +} + +function codegen_enum_value(name, subname, value, cg) { + append(cg, "fields", "\t" snaketocamel(subname) ": " value ",\n") +} + +function codegen_enum(name, cg) { + print "" + print "export const " name " = Object.freeze({" + print cg["fields"] "})" + + CodegenDeserialize[name] = "\t%s = r.i8()\n" + for (i in cg) + delete cg[i] +} + +function codegen_struct_field(d, cg, camel, f, deserialize) { + camel = decapitalize(snaketocamel(d["name"])) + f = "s." camel + append(cg, "fields", "\t" camel "\n") + + deserialize = CodegenDeserialize[d["type"]] + if (!d["isarray"]) { + append(cg, "deserialize", sprintf(deserialize, f)) + return + } + + append(cg, "deserialize", + "\t{\n" \ + indent(sprintf(CodegenDeserialize["u32"], "const len"))) + if (d["type"] == "u8") { + append(cg, "deserialize", + "\t\t" f " = new Uint8Array(\n" \ + "\t\t\tr.buffer, r.require(len), len)\n" \ + "\t\tr.offset += len\n" \ + "\t}\n") + return + } + if (d["type"] == "i8") { + append(cg, "deserialize", + "\t\t" f " = new Int8Array(\n" \ + "\t\t\tr.buffer, r.require(len), len)\n" \ + "\t\tr.offset += len\n" \ + "\t}\n") + return + } + + append(cg, "deserialize", + "\t\t" f " = new Array(len)\n" \ + "\t}\n" \ + "\tfor (let i = 0; i < " f ".length; i++)\n" \ + indent(sprintf(deserialize, f "[i]"))) +} + +function codegen_struct_tag(d, cg) { + append(cg, "fields", "\t" decapitalize(snaketocamel(d["name"])) "\n") + # Do not deserialize here, that is already done by the containing union. +} + +function codegen_struct(name, cg) { + print "" + print "export class " name " {" + print cg["fields"] cg["methods"] + print "\tstatic deserialize(r) {" + print "\t\tconst s = new " name "()" + print indent(cg["deserialize"]) "\t\treturn s" + print "\t}" + print "}" + + CodegenDeserialize[name] = "\t%s = " name ".deserialize(r)\n" + for (i in cg) + delete cg[i] +} + +function codegen_union_tag(d, cg) { + cg["tagtype"] = d["type"] + cg["tagname"] = d["name"] +} + +function codegen_union_struct(name, casename, cg, scg, structname) { + append(scg, "methods", + "\n" \ + "\tconstructor() {\n" \ + "\t\tthis." decapitalize(snaketocamel(cg["tagname"])) \ + " = " cg["tagtype"] "." snaketocamel(casename) "\n" \ + "\t}\n") + + # And thus not all generated structs are present in Types. + structname = name snaketocamel(casename) + codegen_struct(structname, scg) + + append(cg, "deserialize", + "\tcase " cg["tagtype"] "." snaketocamel(casename) ":\n" \ + "\t{\n" \ + indent(sprintf(CodegenDeserialize[structname], "const s")) \ + "\t\treturn s\n" \ + "\t}\n") +} + +function codegen_union(name, cg, tagvar) { + tagvar = decapitalize(snaketocamel(cg["tagname"])) + + print "" + print "export function deserialize" name "(r) {" + print sprintf(CodegenDeserialize[cg["tagtype"]], "const " tagvar) \ + "\tswitch (" tagvar ") {" + print cg["deserialize"] "\tdefault:" + print "\t\tthrow `Unknown " cg["tagtype"] " (${tagvar})`" + print "\t}" + print "}" + + CodegenDeserialize[name] = "\t%s = deserialize" name "(r)\n" + for (i in cg) + delete cg[i] +} diff --git a/xP/.gitignore b/xP/.gitignore index 68c09f0..ba4d8c3 100644 --- a/xP/.gitignore +++ b/xP/.gitignore @@ -1,3 +1,4 @@ /xP /proto.go +/public/proto.js /public/mithril.js diff --git a/xP/Makefile b/xP/Makefile index 3c52146..eb0c8f5 100644 --- a/xP/Makefile +++ b/xP/Makefile @@ -1,13 +1,15 @@ .POSIX: .SUFFIXES: -outputs = xP proto.go public/mithril.js +outputs = xP proto.go public/proto.js public/mithril.js all: $(outputs) xP: xP.go proto.go go build -o $@ proto.go: ../xC-gen-proto.awk ../xC-gen-proto-go.awk ../xC-proto awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-go.awk ../xC-proto > $@ +public/proto.js: ../xC-gen-proto.awk ../xC-gen-proto-js.awk ../xC-proto + awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-js.awk ../xC-proto > $@ public/mithril.js: curl -Lo $@ https://unpkg.com/mithril/mithril.js clean: diff --git a/xP/public/xP.js b/xP/public/xP.js index d302a86..1362e22 100644 --- a/xP/public/xP.js +++ b/xP/public/xP.js @@ -1,6 +1,6 @@ // Copyright (c) 2022, Přemysl Eric Janouch // SPDX-License-Identifier: 0BSD -'use strict' +import * as Relay from './proto.js' // ---- RPC -------------------------------------------------------------------- @@ -31,6 +31,7 @@ class RelayRpc extends EventTarget { } _initialize() { + this.ws.binaryType = 'arraybuffer' this.ws.onopen = undefined this.ws.onmessage = event => { this._process(event.data) @@ -56,33 +57,30 @@ class RelayRpc extends EventTarget { } _process(data) { - if (typeof data !== 'string') - throw "Binary messages not supported" + if (typeof data === 'string') + throw "JSON messages not supported" - let message = JSON.parse(data) - if (typeof message !== 'object') - throw "Invalid message" + const r = new Relay.Reader(data) + while (!r.empty) + this._processOne(Relay.EventMessage.deserialize(r)) + } + + _processOne(message) { let e = message.data - if (typeof e !== 'object') - throw "Invalid message" - switch (e.event) { - case 'Error': + case Relay.Event.Error: if (this.promised[e.commandSeq] !== undefined) this.promised[e.commandSeq].reject(e.error) else console.error("Unawaited error") break - case 'Response': + case Relay.Event.Response: if (this.promised[e.commandSeq] !== undefined) this.promised[e.commandSeq].resolve(e.data) else console.error("Unawaited response") break default: - if (typeof e.event !== 'string') - throw "Invalid event tag" - e.eventSeq = message.eventSeq this.dispatchEvent(new CustomEvent('event', {detail: e})) return @@ -115,13 +113,6 @@ class RelayRpc extends EventTarget { this.promised[seq] = {resolve, reject} }) } - - base64decode(str) { - const text = atob(str), bytes = new Uint8Array(text.length) - for (let i = 0; i < text.length; i++) - bytes[i] = text.charCodeAt(i) - return new TextDecoder().decode(bytes) - } } // ---- Utilities -------------------------------------------------------------- @@ -183,7 +174,7 @@ function updateIcon(highlighted) { // ---- Event processing ------------------------------------------------------- let rpc = new RelayRpc(proxy) -let rpcEventHandlers = {} +let rpcEventHandlers = new Map() let buffers = new Map() let bufferLast = undefined @@ -221,7 +212,7 @@ function bufferToggleLog() { if (bufferCurrent !== name) return - bufferLog = rpc.base64decode(resp.log) + bufferLog = utf8Decode(resp.log) m.redraw() }) } @@ -236,7 +227,7 @@ rpc.connect().then(result => { servers.clear() - rpc.send({command: 'Hello', version: 1}) + rpc.send({command: 'Hello', version: Relay.version}) connecting = false m.redraw() }).catch(error => { @@ -249,10 +240,11 @@ rpc.addEventListener('close', event => { }) rpc.addEventListener('event', event => { - const handler = rpcEventHandlers[event.detail.event] + const handler = rpcEventHandlers.get(event.detail.event) if (handler !== undefined) { handler(event.detail) - if (bufferCurrent !== undefined || event.detail.event !== 'BufferLine') + if (bufferCurrent !== undefined || + event.detail.event !== Relay.Event.BufferLine) m.redraw() } }) @@ -263,7 +255,7 @@ rpcEventHandlers['Ping'] = e => { // ~~~ Buffer events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -rpcEventHandlers['BufferUpdate'] = e => { +rpcEventHandlers.set(Relay.Event.BufferUpdate, e => { let b = buffers.get(e.bufferName) if (b === undefined) { buffers.set(e.bufferName, (b = { @@ -277,9 +269,9 @@ rpcEventHandlers['BufferUpdate'] = e => { b.hideUnimportant = e.hideUnimportant b.kind = e.context.kind b.server = servers.get(e.context.serverName) -} +}) -rpcEventHandlers['BufferStats'] = e => { +rpcEventHandlers.set(Relay.Event.BufferStats, e => { let b = buffers.get(e.bufferName) if (b === undefined) return @@ -287,20 +279,20 @@ rpcEventHandlers['BufferStats'] = e => { b.newMessages = e.newMessages, b.newUnimportantMessages = e.newUnimportantMessages b.highlighted = e.highlighted -} +}) -rpcEventHandlers['BufferRename'] = e => { +rpcEventHandlers.set(Relay.Event.BufferRename, e => { buffers.set(e.new, buffers.get(e.bufferName)) buffers.delete(e.bufferName) -} +}) -rpcEventHandlers['BufferRemove'] = e => { +rpcEventHandlers.set(Relay.Event.BufferRemove, e => { buffers.delete(e.bufferName) if (e.bufferName === bufferLast) bufferLast = undefined -} +}) -rpcEventHandlers['BufferActivate'] = e => { +rpcEventHandlers.set(Relay.Event.BufferActivate, e => { let old = buffers.get(bufferCurrent) if (old !== undefined) bufferResetStats(old) @@ -333,9 +325,9 @@ rpcEventHandlers['BufferActivate'] = e => { textarea.value = b.input textarea.setSelectionRange(b.inputStart, b.inputEnd, b.inputDirection) } -} +}) -rpcEventHandlers['BufferLine'] = e => { +rpcEventHandlers.set(Relay.Event.BufferLine, e => { let b = buffers.get(e.bufferName), line = {...e} delete line.event delete line.eventSeq @@ -372,37 +364,37 @@ rpcEventHandlers['BufferLine'] = e => { } } - if (line.isHighlight || - (!visible && b.kind === 'PrivateMessage' && !line.isUnimportant)) { + if (line.isHighlight || (!visible && !line.isUnimportant && + b.kind === Relay.BufferKind.PrivateMessage)) { beep() if (!visible) b.highlighted = true } -} +}) -rpcEventHandlers['BufferClear'] = e => { +rpcEventHandlers.set(Relay.Event.BufferClear, e => { let b = buffers.get(e.bufferName) if (b !== undefined) b.lines.length = 0 -} +}) // ~~~ Server events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -rpcEventHandlers['ServerUpdate'] = e => { +rpcEventHandlers.set(Relay.Event.ServerUpdate, e => { let s = servers.get(e.serverName) if (s === undefined) servers.set(e.serverName, (s = {})) s.state = e.state -} +}) -rpcEventHandlers['ServerRename'] = e => { +rpcEventHandlers.set(Relay.Event.ServerRename, e => { servers.set(e.new, servers.get(e.serverName)) servers.delete(e.serverName) -} +}) -rpcEventHandlers['ServerRemove'] = e => { +rpcEventHandlers.set(Relay.Event.ServerRemove, e => { servers.delete(e.serverName) -} +}) // --- Colours ----------------------------------------------------------------- @@ -499,18 +491,19 @@ let Content = { return a }, + makeMark: line => { + switch (line.rendition) { + case Relay.Rendition.Indent: return m('span.mark', {}, '') + case Relay.Rendition.Status: return m('span.mark', {}, '–') + case Relay.Rendition.Error: return m('span.mark.error', {}, '⚠') + case Relay.Rendition.Join: return m('span.mark.join', {}, '→') + case Relay.Rendition.Part: return m('span.mark.part', {}, '←') + case Relay.Rendition.Action: return m('span.mark.action', {}, '✶') + } + }, + view: vnode => { let line = vnode.children[0] - let mark = undefined - switch (line.rendition) { - case 'Indent': mark = m('span.mark', {}, ''); break - case 'Status': mark = m('span.mark', {}, '–'); break - case 'Error': mark = m('span.mark.error', {}, '⚠'); break - case 'Join': mark = m('span.mark.join', {}, '→'); break - case 'Part': mark = m('span.mark.part', {}, '←'); break - case 'Action': mark = m('span.mark.action', {}, '✶'); break - } - let classes = new Set() let flip = c => { if (classes.has(c)) @@ -518,45 +511,49 @@ let Content = { else classes.add(c) } + let fg = -1, bg = -1, inverse = false - return m('.content', vnode.attrs, [mark, line.items.flatMap(item => { - switch (item.kind) { - case 'Text': - return Content.linkify(item.text, { - class: Array.from(classes.keys()).join(' '), - style: Content.applyColor(fg, bg, inverse), - }) - case 'Reset': - classes.clear() - fg = bg = -1 - inverse = false - break - case 'FgColor': - fg = item.color - break - case 'BgColor': - bg = item.color - break - case 'FlipInverse': - inverse = !inverse - break - case 'FlipBold': - flip('b') - break - case 'FlipItalic': - flip('i') - break - case 'FlipUnderline': - flip('u') - break - case 'FlipCrossedOut': - flip('s') - break - case 'FlipMonospace': - flip('m') - break - } - })]) + return m('.content', vnode.attrs, [ + Content.makeMark(line), + line.items.flatMap(item => { + switch (item.kind) { + case Relay.Item.Text: + return Content.linkify(item.text, { + class: Array.from(classes.keys()).join(' '), + style: Content.applyColor(fg, bg, inverse), + }) + case Relay.Item.Reset: + classes.clear() + fg = bg = -1 + inverse = false + break + case Relay.Item.FgColor: + fg = item.color + break + case Relay.Item.BgColor: + bg = item.color + break + case Relay.Item.FlipInverse: + inverse = !inverse + break + case Relay.Item.FlipBold: + flip('b') + break + case Relay.Item.FlipItalic: + flip('i') + break + case Relay.Item.FlipUnderline: + flip('u') + break + case Relay.Item.FlipCrossedOut: + flip('s') + break + case Relay.Item.FlipMonospace: + flip('m') + break + } + }), + ]) }, } @@ -669,8 +666,17 @@ let Status = { let status = `${bufferCurrent}` if (b.hideUnimportant) status += `` - if (b.server !== undefined) - status += ` (${b.server.state})` + + // This should be handled differently, so don't mind the lookup. + if (b.server !== undefined) { + let state = b.server.state + for (const s in Relay.ServerState) + if (Relay.ServerState[s] == b.server.state) { + state = s + break + } + status += ` (${state})` + } return m('.status', {}, status) }, } diff --git a/xP/xP.go b/xP/xP.go index 7e0285c..20117b2 100644 --- a/xP/xP.go +++ b/xP/xP.go @@ -8,6 +8,7 @@ import ( "context" "encoding/binary" "encoding/json" + "flag" "fmt" "html/template" "io" @@ -21,6 +22,8 @@ import ( ) var ( + debug = flag.Bool("debug", false, "enable debug output") + addressBind string addressConnect string addressWS string @@ -28,7 +31,7 @@ var ( // ----------------------------------------------------------------------------- -func relayReadJSON(r io.Reader) []byte { +func relayReadFrame(r io.Reader) []byte { var length uint32 if err := binary.Read(r, binary.BigEndian, &length); err != nil { log.Println("Event receive failed: " + err.Error()) @@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte { return nil } - log.Printf(" %v\n", b) + if *debug { + log.Printf("-> %v\n", b) + } return true } @@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte { "Command receive failed: " + "binary messages are not supported") return nil } - log.Printf("?> %s\n", j) + + if *debug { + log.Printf("?> %s\n", j) + } return j } -func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool { - if err := ws.Write(ctx, websocket.MessageText, j); err != nil { +func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool { + if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil { log.Println("Event send failed: " + err.Error()) return false } - log.Printf("<- %s\n", j) return true } func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { - j, err := (&RelayEventMessage{ + b, ok := (&RelayEventMessage{ EventSeq: 0, Data: RelayEventData{ Interface: RelayEventDataError{ @@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { Error: err.Error(), }, }, - }).MarshalJSON() - if err != nil { - log.Println("Event marshalling failed: " + err.Error()) + }).AppendTo(nil) + if ok { + log.Println("Event serialization failed") return false } - return clientWriteJSON(ctx, ws, j) + return clientWriteBinary(ctx, ws, b) } func handleWS(w http.ResponseWriter, r *http.Request) { @@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) { conn, err := net.Dial("tcp", addressConnect) if err != nil { + log.Println("Connection failed: " + err.Error()) clientWriteError(ctx, ws, err) return } defer conn.Close() + // To decrease latencies, events are received and decoded in parallel + // to their sending, and we try to batch them together. + relayFrames := relayMakeReceiver(ctx, conn) + batchFrames := func() []byte { + batch, ok := <-relayFrames + if !ok { + return nil + } + Batch: + for { + select { + case b, ok := <-relayFrames: + if !ok { + break Batch + } + batch = append(batch, b...) + default: + break Batch + } + } + return batch + } + // We don't need to intervene, so it's just two separate pipes so far. - // However, to decrease latencies, events are received and decoded - // in parallel to their sending. - relayJSON := relayMakeReceiver(ctx, conn) go func() { defer cancel() for { @@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) { go func() { defer cancel() for { - j, ok := <-relayJSON - if !ok { + b := batchFrames() + if b == nil { return } - clientWriteJSON(ctx, ws, j) + clientWriteBinary(ctx, ws, b) } }() <-ctx.Done() @@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(` - `)) @@ -235,13 +269,21 @@ func handleDefault(w http.ResponseWriter, r *http.Request) { } func main() { - if len(os.Args) < 3 || len(os.Args) > 4 { - log.Fatalf("usage: %s BIND CONNECT [WSURI]\n", os.Args[0]) + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), + "Usage: %s [OPTION...] BIND CONNECT [WSURI]\n\n", os.Args[0]) + flag.PrintDefaults() } - addressBind, addressConnect = os.Args[1], os.Args[2] - if len(os.Args) > 3 { - addressWS = os.Args[3] + flag.Parse() + if flag.NArg() < 2 || flag.NArg() > 3 { + flag.Usage() + os.Exit(1) + } + + addressBind, addressConnect = flag.Arg(0), flag.Arg(1) + if flag.NArg() > 2 { + addressWS = flag.Arg(2) } http.Handle("/ws", http.HandlerFunc(handleWS))