xP: use the binary protocol for incoming events

And batch event messages together as much as possible.

JSON has proven itself to be really slow
(for example, encoding/json.Marshaler is a slow interface),
and browsers have significant overhead per WS message.

Commands are still sent as JSON, sending them in binary
would be a laborious rewrite without measurable merits.

The xP server now only prints debug output when requested,
because that was another source of major slowdowns.
This commit is contained in:
Přemysl Eric Janouch 2022-09-15 22:45:14 +02:00
parent e87cc90b5e
commit 6f39aa6615
Signed by: p
GPG Key ID: A0420B94F92B9493
5 changed files with 413 additions and 139 deletions

223
xC-gen-proto-js.awk Normal file
View File

@ -0,0 +1,223 @@
# xC-gen-proto-js.awk: Javascript backend for xC-gen-proto.awk.
#
# Copyright (c) 2022, Přemysl Eric Janouch <p@janouch.name>
# SPDX-License-Identifier: 0BSD
#
# This backend is currently for decoding the binary format only.
# (JSON is way too expensive to process and transfer.)
#
# Import the resulting script as a Javascript module.
function define_internal(name) {
Types[name] = "internal"
}
function define_sint(size, shortname) {
shortname = "i" size
define_internal(shortname)
CodegenDeserialize[shortname] = "\t%s = r." shortname "()\n"
print ""
print "\t" shortname "() {"
if (size == "64") {
# XXX: 2^53 - 1 must be enough for anyone. BigInts are a PITA.
print "\t\tconst " shortname \
" = Number(this.getBigInt" size "(this.offset))"
} else {
print "\t\tconst " shortname " = this.getInt" size "(this.offset)"
}
print "\t\tthis.offset += " (size / 8)
print "\t\treturn " shortname
print "\t}"
}
function define_uint(size, shortname) {
shortname = "u" size
define_internal(shortname)
CodegenDeserialize[shortname] = "\t%s = r." shortname "()\n"
print ""
print "\t" shortname "() {"
if (size == "64") {
# XXX: 2^53 - 1 must be enough for anyone. BigInts are a PITA.
print "\t\tconst " shortname \
" = Number(this.getBigUint" size "(this.offset))"
} else {
print "\t\tconst " shortname " = this.getUint" size "(this.offset)"
}
print "\t\tthis.offset += " (size / 8)
print "\t\treturn " shortname
print "\t}"
}
function codegen_begin() {
print "export class Reader extends DataView {"
print "\tconstructor() {"
print "\t\tsuper(...arguments)"
print "\t\tthis.offset = 0"
print "\t\tthis.decoder = new TextDecoder('utf-8', {fatal: true})"
print "\t}"
print ""
print "\tget empty() {"
print "\t\treturn this.byteLength <= this.offset"
print "\t}"
print ""
print "\trequire(len) {"
print "\t\tif (this.byteLength - this.offset < len)"
print "\t\t\tthrow `Premature end of data`"
print "\t\treturn this.byteOffset + this.offset"
print "\t}"
define_internal("string")
CodegenDeserialize["string"] = "\t%s = r.string()\n"
print ""
print "\tstring() {"
print "\t\tconst len = this.getUint32(this.offset)"
print "\t\tthis.offset += 4"
print "\t\tconst array = new Uint8Array("
print "\t\t\tthis.buffer, this.require(len), len)"
print "\t\tthis.offset += len"
print "\t\treturn this.decoder.decode(array)"
print "\t}"
define_internal("bool")
CodegenDeserialize["bool"] = "\t%s = r.bool()\n"
print ""
print "\tbool() {"
print "\t\tconst u8 = this.getUint8(this.offset)"
print "\t\tthis.offset += 1"
print "\t\treturn u8 != 0"
print "\t}"
define_sint("8")
define_sint("16")
define_sint("32")
define_sint("64")
define_uint("8")
define_uint("16")
define_uint("32")
define_uint("64")
print "}"
}
function codegen_constant(name, value) {
print ""
print "export const " decapitalize(snaketocamel(name)) " = " value
}
function codegen_enum_value(name, subname, value, cg) {
append(cg, "fields", "\t" snaketocamel(subname) ": " value ",\n")
}
function codegen_enum(name, cg) {
print ""
print "export const " name " = Object.freeze({"
print cg["fields"] "})"
CodegenDeserialize[name] = "\t%s = r.i8()\n"
for (i in cg)
delete cg[i]
}
function codegen_struct_field(d, cg, camel, f, deserialize) {
camel = decapitalize(snaketocamel(d["name"]))
f = "s." camel
append(cg, "fields", "\t" camel "\n")
deserialize = CodegenDeserialize[d["type"]]
if (!d["isarray"]) {
append(cg, "deserialize", sprintf(deserialize, f))
return
}
append(cg, "deserialize",
"\t{\n" \
indent(sprintf(CodegenDeserialize["u32"], "const len")))
if (d["type"] == "u8") {
append(cg, "deserialize",
"\t\t" f " = new Uint8Array(\n" \
"\t\t\tr.buffer, r.require(len), len)\n" \
"\t\tr.offset += len\n" \
"\t}\n")
return
}
if (d["type"] == "i8") {
append(cg, "deserialize",
"\t\t" f " = new Int8Array(\n" \
"\t\t\tr.buffer, r.require(len), len)\n" \
"\t\tr.offset += len\n" \
"\t}\n")
return
}
append(cg, "deserialize",
"\t\t" f " = new Array(len)\n" \
"\t}\n" \
"\tfor (let i = 0; i < " f ".length; i++)\n" \
indent(sprintf(deserialize, f "[i]")))
}
function codegen_struct_tag(d, cg) {
append(cg, "fields", "\t" decapitalize(snaketocamel(d["name"])) "\n")
# Do not deserialize here, that is already done by the containing union.
}
function codegen_struct(name, cg) {
print ""
print "export class " name " {"
print cg["fields"] cg["methods"]
print "\tstatic deserialize(r) {"
print "\t\tconst s = new " name "()"
print indent(cg["deserialize"]) "\t\treturn s"
print "\t}"
print "}"
CodegenDeserialize[name] = "\t%s = " name ".deserialize(r)\n"
for (i in cg)
delete cg[i]
}
function codegen_union_tag(d, cg) {
cg["tagtype"] = d["type"]
cg["tagname"] = d["name"]
}
function codegen_union_struct(name, casename, cg, scg, structname) {
append(scg, "methods",
"\n" \
"\tconstructor() {\n" \
"\t\tthis." decapitalize(snaketocamel(cg["tagname"])) \
" = " cg["tagtype"] "." snaketocamel(casename) "\n" \
"\t}\n")
# And thus not all generated structs are present in Types.
structname = name snaketocamel(casename)
codegen_struct(structname, scg)
append(cg, "deserialize",
"\tcase " cg["tagtype"] "." snaketocamel(casename) ":\n" \
"\t{\n" \
indent(sprintf(CodegenDeserialize[structname], "const s")) \
"\t\treturn s\n" \
"\t}\n")
}
function codegen_union(name, cg, tagvar) {
tagvar = decapitalize(snaketocamel(cg["tagname"]))
print ""
print "export function deserialize" name "(r) {"
print sprintf(CodegenDeserialize[cg["tagtype"]], "const " tagvar) \
"\tswitch (" tagvar ") {"
print cg["deserialize"] "\tdefault:"
print "\t\tthrow `Unknown " cg["tagtype"] " (${tagvar})`"
print "\t}"
print "}"
CodegenDeserialize[name] = "\t%s = deserialize" name "(r)\n"
for (i in cg)
delete cg[i]
}

1
xP/.gitignore vendored
View File

@ -1,3 +1,4 @@
/xP /xP
/proto.go /proto.go
/public/proto.js
/public/mithril.js /public/mithril.js

View File

@ -1,13 +1,15 @@
.POSIX: .POSIX:
.SUFFIXES: .SUFFIXES:
outputs = xP proto.go public/mithril.js outputs = xP proto.go public/proto.js public/mithril.js
all: $(outputs) all: $(outputs)
xP: xP.go proto.go xP: xP.go proto.go
go build -o $@ go build -o $@
proto.go: ../xC-gen-proto.awk ../xC-gen-proto-go.awk ../xC-proto proto.go: ../xC-gen-proto.awk ../xC-gen-proto-go.awk ../xC-proto
awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-go.awk ../xC-proto > $@ awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-go.awk ../xC-proto > $@
public/proto.js: ../xC-gen-proto.awk ../xC-gen-proto-js.awk ../xC-proto
awk -f ../xC-gen-proto.awk -f ../xC-gen-proto-js.awk ../xC-proto > $@
public/mithril.js: public/mithril.js:
curl -Lo $@ https://unpkg.com/mithril/mithril.js curl -Lo $@ https://unpkg.com/mithril/mithril.js
clean: clean:

View File

@ -1,6 +1,6 @@
// Copyright (c) 2022, Přemysl Eric Janouch <p@janouch.name> // Copyright (c) 2022, Přemysl Eric Janouch <p@janouch.name>
// SPDX-License-Identifier: 0BSD // SPDX-License-Identifier: 0BSD
'use strict' import * as Relay from './proto.js'
// ---- RPC -------------------------------------------------------------------- // ---- RPC --------------------------------------------------------------------
@ -31,6 +31,7 @@ class RelayRpc extends EventTarget {
} }
_initialize() { _initialize() {
this.ws.binaryType = 'arraybuffer'
this.ws.onopen = undefined this.ws.onopen = undefined
this.ws.onmessage = event => { this.ws.onmessage = event => {
this._process(event.data) this._process(event.data)
@ -56,33 +57,30 @@ class RelayRpc extends EventTarget {
} }
_process(data) { _process(data) {
if (typeof data !== 'string') if (typeof data === 'string')
throw "Binary messages not supported" throw "JSON messages not supported"
let message = JSON.parse(data) const r = new Relay.Reader(data)
if (typeof message !== 'object') while (!r.empty)
throw "Invalid message" this._processOne(Relay.EventMessage.deserialize(r))
}
_processOne(message) {
let e = message.data let e = message.data
if (typeof e !== 'object')
throw "Invalid message"
switch (e.event) { switch (e.event) {
case 'Error': case Relay.Event.Error:
if (this.promised[e.commandSeq] !== undefined) if (this.promised[e.commandSeq] !== undefined)
this.promised[e.commandSeq].reject(e.error) this.promised[e.commandSeq].reject(e.error)
else else
console.error("Unawaited error") console.error("Unawaited error")
break break
case 'Response': case Relay.Event.Response:
if (this.promised[e.commandSeq] !== undefined) if (this.promised[e.commandSeq] !== undefined)
this.promised[e.commandSeq].resolve(e.data) this.promised[e.commandSeq].resolve(e.data)
else else
console.error("Unawaited response") console.error("Unawaited response")
break break
default: default:
if (typeof e.event !== 'string')
throw "Invalid event tag"
e.eventSeq = message.eventSeq e.eventSeq = message.eventSeq
this.dispatchEvent(new CustomEvent('event', {detail: e})) this.dispatchEvent(new CustomEvent('event', {detail: e}))
return return
@ -115,13 +113,6 @@ class RelayRpc extends EventTarget {
this.promised[seq] = {resolve, reject} this.promised[seq] = {resolve, reject}
}) })
} }
base64decode(str) {
const text = atob(str), bytes = new Uint8Array(text.length)
for (let i = 0; i < text.length; i++)
bytes[i] = text.charCodeAt(i)
return new TextDecoder().decode(bytes)
}
} }
// ---- Utilities -------------------------------------------------------------- // ---- Utilities --------------------------------------------------------------
@ -183,7 +174,7 @@ function updateIcon(highlighted) {
// ---- Event processing ------------------------------------------------------- // ---- Event processing -------------------------------------------------------
let rpc = new RelayRpc(proxy) let rpc = new RelayRpc(proxy)
let rpcEventHandlers = {} let rpcEventHandlers = new Map()
let buffers = new Map() let buffers = new Map()
let bufferLast = undefined let bufferLast = undefined
@ -221,7 +212,7 @@ function bufferToggleLog() {
if (bufferCurrent !== name) if (bufferCurrent !== name)
return return
bufferLog = rpc.base64decode(resp.log) bufferLog = utf8Decode(resp.log)
m.redraw() m.redraw()
}) })
} }
@ -236,7 +227,7 @@ rpc.connect().then(result => {
servers.clear() servers.clear()
rpc.send({command: 'Hello', version: 1}) rpc.send({command: 'Hello', version: Relay.version})
connecting = false connecting = false
m.redraw() m.redraw()
}).catch(error => { }).catch(error => {
@ -249,10 +240,11 @@ rpc.addEventListener('close', event => {
}) })
rpc.addEventListener('event', event => { rpc.addEventListener('event', event => {
const handler = rpcEventHandlers[event.detail.event] const handler = rpcEventHandlers.get(event.detail.event)
if (handler !== undefined) { if (handler !== undefined) {
handler(event.detail) handler(event.detail)
if (bufferCurrent !== undefined || event.detail.event !== 'BufferLine') if (bufferCurrent !== undefined ||
event.detail.event !== Relay.Event.BufferLine)
m.redraw() m.redraw()
} }
}) })
@ -263,7 +255,7 @@ rpcEventHandlers['Ping'] = e => {
// ~~~ Buffer events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // ~~~ Buffer events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rpcEventHandlers['BufferUpdate'] = e => { rpcEventHandlers.set(Relay.Event.BufferUpdate, e => {
let b = buffers.get(e.bufferName) let b = buffers.get(e.bufferName)
if (b === undefined) { if (b === undefined) {
buffers.set(e.bufferName, (b = { buffers.set(e.bufferName, (b = {
@ -277,9 +269,9 @@ rpcEventHandlers['BufferUpdate'] = e => {
b.hideUnimportant = e.hideUnimportant b.hideUnimportant = e.hideUnimportant
b.kind = e.context.kind b.kind = e.context.kind
b.server = servers.get(e.context.serverName) b.server = servers.get(e.context.serverName)
} })
rpcEventHandlers['BufferStats'] = e => { rpcEventHandlers.set(Relay.Event.BufferStats, e => {
let b = buffers.get(e.bufferName) let b = buffers.get(e.bufferName)
if (b === undefined) if (b === undefined)
return return
@ -287,20 +279,20 @@ rpcEventHandlers['BufferStats'] = e => {
b.newMessages = e.newMessages, b.newMessages = e.newMessages,
b.newUnimportantMessages = e.newUnimportantMessages b.newUnimportantMessages = e.newUnimportantMessages
b.highlighted = e.highlighted b.highlighted = e.highlighted
} })
rpcEventHandlers['BufferRename'] = e => { rpcEventHandlers.set(Relay.Event.BufferRename, e => {
buffers.set(e.new, buffers.get(e.bufferName)) buffers.set(e.new, buffers.get(e.bufferName))
buffers.delete(e.bufferName) buffers.delete(e.bufferName)
} })
rpcEventHandlers['BufferRemove'] = e => { rpcEventHandlers.set(Relay.Event.BufferRemove, e => {
buffers.delete(e.bufferName) buffers.delete(e.bufferName)
if (e.bufferName === bufferLast) if (e.bufferName === bufferLast)
bufferLast = undefined bufferLast = undefined
} })
rpcEventHandlers['BufferActivate'] = e => { rpcEventHandlers.set(Relay.Event.BufferActivate, e => {
let old = buffers.get(bufferCurrent) let old = buffers.get(bufferCurrent)
if (old !== undefined) if (old !== undefined)
bufferResetStats(old) bufferResetStats(old)
@ -333,9 +325,9 @@ rpcEventHandlers['BufferActivate'] = e => {
textarea.value = b.input textarea.value = b.input
textarea.setSelectionRange(b.inputStart, b.inputEnd, b.inputDirection) textarea.setSelectionRange(b.inputStart, b.inputEnd, b.inputDirection)
} }
} })
rpcEventHandlers['BufferLine'] = e => { rpcEventHandlers.set(Relay.Event.BufferLine, e => {
let b = buffers.get(e.bufferName), line = {...e} let b = buffers.get(e.bufferName), line = {...e}
delete line.event delete line.event
delete line.eventSeq delete line.eventSeq
@ -372,37 +364,37 @@ rpcEventHandlers['BufferLine'] = e => {
} }
} }
if (line.isHighlight || if (line.isHighlight || (!visible && !line.isUnimportant &&
(!visible && b.kind === 'PrivateMessage' && !line.isUnimportant)) { b.kind === Relay.BufferKind.PrivateMessage)) {
beep() beep()
if (!visible) if (!visible)
b.highlighted = true b.highlighted = true
} }
} })
rpcEventHandlers['BufferClear'] = e => { rpcEventHandlers.set(Relay.Event.BufferClear, e => {
let b = buffers.get(e.bufferName) let b = buffers.get(e.bufferName)
if (b !== undefined) if (b !== undefined)
b.lines.length = 0 b.lines.length = 0
} })
// ~~~ Server events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // ~~~ Server events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rpcEventHandlers['ServerUpdate'] = e => { rpcEventHandlers.set(Relay.Event.ServerUpdate, e => {
let s = servers.get(e.serverName) let s = servers.get(e.serverName)
if (s === undefined) if (s === undefined)
servers.set(e.serverName, (s = {})) servers.set(e.serverName, (s = {}))
s.state = e.state s.state = e.state
} })
rpcEventHandlers['ServerRename'] = e => { rpcEventHandlers.set(Relay.Event.ServerRename, e => {
servers.set(e.new, servers.get(e.serverName)) servers.set(e.new, servers.get(e.serverName))
servers.delete(e.serverName) servers.delete(e.serverName)
} })
rpcEventHandlers['ServerRemove'] = e => { rpcEventHandlers.set(Relay.Event.ServerRemove, e => {
servers.delete(e.serverName) servers.delete(e.serverName)
} })
// --- Colours ----------------------------------------------------------------- // --- Colours -----------------------------------------------------------------
@ -499,18 +491,19 @@ let Content = {
return a return a
}, },
makeMark: line => {
switch (line.rendition) {
case Relay.Rendition.Indent: return m('span.mark', {}, '')
case Relay.Rendition.Status: return m('span.mark', {}, '')
case Relay.Rendition.Error: return m('span.mark.error', {}, '⚠')
case Relay.Rendition.Join: return m('span.mark.join', {}, '→')
case Relay.Rendition.Part: return m('span.mark.part', {}, '←')
case Relay.Rendition.Action: return m('span.mark.action', {}, '✶')
}
},
view: vnode => { view: vnode => {
let line = vnode.children[0] let line = vnode.children[0]
let mark = undefined
switch (line.rendition) {
case 'Indent': mark = m('span.mark', {}, ''); break
case 'Status': mark = m('span.mark', {}, ''); break
case 'Error': mark = m('span.mark.error', {}, '⚠'); break
case 'Join': mark = m('span.mark.join', {}, '→'); break
case 'Part': mark = m('span.mark.part', {}, '←'); break
case 'Action': mark = m('span.mark.action', {}, '✶'); break
}
let classes = new Set() let classes = new Set()
let flip = c => { let flip = c => {
if (classes.has(c)) if (classes.has(c))
@ -518,45 +511,49 @@ let Content = {
else else
classes.add(c) classes.add(c)
} }
let fg = -1, bg = -1, inverse = false let fg = -1, bg = -1, inverse = false
return m('.content', vnode.attrs, [mark, line.items.flatMap(item => { return m('.content', vnode.attrs, [
switch (item.kind) { Content.makeMark(line),
case 'Text': line.items.flatMap(item => {
return Content.linkify(item.text, { switch (item.kind) {
class: Array.from(classes.keys()).join(' '), case Relay.Item.Text:
style: Content.applyColor(fg, bg, inverse), return Content.linkify(item.text, {
}) class: Array.from(classes.keys()).join(' '),
case 'Reset': style: Content.applyColor(fg, bg, inverse),
classes.clear() })
fg = bg = -1 case Relay.Item.Reset:
inverse = false classes.clear()
break fg = bg = -1
case 'FgColor': inverse = false
fg = item.color break
break case Relay.Item.FgColor:
case 'BgColor': fg = item.color
bg = item.color break
break case Relay.Item.BgColor:
case 'FlipInverse': bg = item.color
inverse = !inverse break
break case Relay.Item.FlipInverse:
case 'FlipBold': inverse = !inverse
flip('b') break
break case Relay.Item.FlipBold:
case 'FlipItalic': flip('b')
flip('i') break
break case Relay.Item.FlipItalic:
case 'FlipUnderline': flip('i')
flip('u') break
break case Relay.Item.FlipUnderline:
case 'FlipCrossedOut': flip('u')
flip('s') break
break case Relay.Item.FlipCrossedOut:
case 'FlipMonospace': flip('s')
flip('m') break
break case Relay.Item.FlipMonospace:
} flip('m')
})]) break
}
}),
])
}, },
} }
@ -669,8 +666,17 @@ let Status = {
let status = `${bufferCurrent}` let status = `${bufferCurrent}`
if (b.hideUnimportant) if (b.hideUnimportant)
status += `<H>` status += `<H>`
if (b.server !== undefined)
status += ` (${b.server.state})` // This should be handled differently, so don't mind the lookup.
if (b.server !== undefined) {
let state = b.server.state
for (const s in Relay.ServerState)
if (Relay.ServerState[s] == b.server.state) {
state = s
break
}
status += ` (${state})`
}
return m('.status', {}, status) return m('.status', {}, status)
}, },
} }

122
xP/xP.go
View File

@ -8,6 +8,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"html/template" "html/template"
"io" "io"
@ -21,6 +22,8 @@ import (
) )
var ( var (
debug = flag.Bool("debug", false, "enable debug output")
addressBind string addressBind string
addressConnect string addressConnect string
addressWS string addressWS string
@ -28,7 +31,7 @@ var (
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
func relayReadJSON(r io.Reader) []byte { func relayReadFrame(r io.Reader) []byte {
var length uint32 var length uint32
if err := binary.Read(r, binary.BigEndian, &length); err != nil { if err := binary.Read(r, binary.BigEndian, &length); err != nil {
log.Println("Event receive failed: " + err.Error()) log.Println("Event receive failed: " + err.Error())
@ -40,32 +43,38 @@ func relayReadJSON(r io.Reader) []byte {
return nil return nil
} }
log.Printf("<? %v\n", b) if *debug {
log.Printf("<? %v\n", b)
var m RelayEventMessage var m RelayEventMessage
if after, ok := m.ConsumeFrom(b); !ok { if after, ok := m.ConsumeFrom(b); !ok {
log.Println("Event deserialization failed") log.Println("Event deserialization failed")
return nil return nil
} else if len(after) != 0 { } else if len(after) != 0 {
log.Println("Event deserialization failed: trailing data") log.Println("Event deserialization failed: trailing data")
return nil return nil
} }
j, err := m.MarshalJSON() j, err := m.MarshalJSON()
if err != nil { if err != nil {
log.Println("Event marshalling failed: " + err.Error()) log.Println("Event marshalling failed: " + err.Error())
return nil return nil
}
log.Printf("<- %s\n", j)
} }
return j return b
} }
func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte { func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte {
p := make(chan []byte, 1) // The usual event message rarely gets above 1 kilobyte,
r := bufio.NewReader(conn) // thus this is set to buffer up at most 1 megabyte or so.
p := make(chan []byte, 1000)
r := bufio.NewReaderSize(conn, 65536)
go func() { go func() {
defer close(p) defer close(p)
for { for {
j := relayReadJSON(r) j := relayReadFrame(r)
if j == nil { if j == nil {
return return
} }
@ -97,7 +106,9 @@ func relayWriteJSON(conn net.Conn, j []byte) bool {
return false return false
} }
log.Printf("-> %v\n", b) if *debug {
log.Printf("-> %v\n", b)
}
return true return true
} }
@ -114,21 +125,23 @@ func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte {
"Command receive failed: " + "binary messages are not supported") "Command receive failed: " + "binary messages are not supported")
return nil return nil
} }
log.Printf("?> %s\n", j)
if *debug {
log.Printf("?> %s\n", j)
}
return j return j
} }
func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool { func clientWriteBinary(ctx context.Context, ws *websocket.Conn, b []byte) bool {
if err := ws.Write(ctx, websocket.MessageText, j); err != nil { if err := ws.Write(ctx, websocket.MessageBinary, b); err != nil {
log.Println("Event send failed: " + err.Error()) log.Println("Event send failed: " + err.Error())
return false return false
} }
log.Printf("<- %s\n", j)
return true return true
} }
func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool { func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
j, err := (&RelayEventMessage{ b, ok := (&RelayEventMessage{
EventSeq: 0, EventSeq: 0,
Data: RelayEventData{ Data: RelayEventData{
Interface: RelayEventDataError{ Interface: RelayEventDataError{
@ -137,12 +150,12 @@ func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
Error: err.Error(), Error: err.Error(),
}, },
}, },
}).MarshalJSON() }).AppendTo(nil)
if err != nil { if ok {
log.Println("Event marshalling failed: " + err.Error()) log.Println("Event serialization failed")
return false return false
} }
return clientWriteJSON(ctx, ws, j) return clientWriteBinary(ctx, ws, b)
} }
func handleWS(w http.ResponseWriter, r *http.Request) { func handleWS(w http.ResponseWriter, r *http.Request) {
@ -164,15 +177,36 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := net.Dial("tcp", addressConnect) conn, err := net.Dial("tcp", addressConnect)
if err != nil { if err != nil {
log.Println("Connection failed: " + err.Error())
clientWriteError(ctx, ws, err) clientWriteError(ctx, ws, err)
return return
} }
defer conn.Close() defer conn.Close()
// To decrease latencies, events are received and decoded in parallel
// to their sending, and we try to batch them together.
relayFrames := relayMakeReceiver(ctx, conn)
batchFrames := func() []byte {
batch, ok := <-relayFrames
if !ok {
return nil
}
Batch:
for {
select {
case b, ok := <-relayFrames:
if !ok {
break Batch
}
batch = append(batch, b...)
default:
break Batch
}
}
return batch
}
// We don't need to intervene, so it's just two separate pipes so far. // We don't need to intervene, so it's just two separate pipes so far.
// However, to decrease latencies, events are received and decoded
// in parallel to their sending.
relayJSON := relayMakeReceiver(ctx, conn)
go func() { go func() {
defer cancel() defer cancel()
for { for {
@ -186,11 +220,11 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
go func() { go func() {
defer cancel() defer cancel()
for { for {
j, ok := <-relayJSON b := batchFrames()
if !ok { if b == nil {
return return
} }
clientWriteJSON(ctx, ws, j) clientWriteBinary(ctx, ws, b)
} }
}() }()
<-ctx.Done() <-ctx.Done()
@ -214,7 +248,7 @@ var page = template.Must(template.New("/").Parse(`<!DOCTYPE html>
<script> <script>
let proxy = '{{ . }}' let proxy = '{{ . }}'
</script> </script>
<script src="xP.js"> <script type="module" src="xP.js">
</script> </script>
</body> </body>
</html>`)) </html>`))
@ -235,13 +269,21 @@ func handleDefault(w http.ResponseWriter, r *http.Request) {
} }
func main() { func main() {
if len(os.Args) < 3 || len(os.Args) > 4 { flag.Usage = func() {
log.Fatalf("usage: %s BIND CONNECT [WSURI]\n", os.Args[0]) fmt.Fprintf(flag.CommandLine.Output(),
"Usage: %s [OPTION...] BIND CONNECT [WSURI]\n\n", os.Args[0])
flag.PrintDefaults()
} }
addressBind, addressConnect = os.Args[1], os.Args[2] flag.Parse()
if len(os.Args) > 3 { if flag.NArg() < 2 || flag.NArg() > 3 {
addressWS = os.Args[3] flag.Usage()
os.Exit(1)
}
addressBind, addressConnect = flag.Arg(0), flag.Arg(1)
if flag.NArg() > 2 {
addressWS = flag.Arg(2)
} }
http.Handle("/ws", http.HandlerFunc(handleWS)) http.Handle("/ws", http.HandlerFunc(handleWS))