xP: parallelize event reception and sending

Still trying to make the frontend load tolerably fast,
still unsuccessfully.
This commit is contained in:
Přemysl Eric Janouch 2022-09-14 00:47:46 +02:00
parent b728235b6c
commit fa85ea8208
Signed by: p
GPG Key ID: A0420B94F92B9493
1 changed files with 92 additions and 54 deletions

146
xP/xP.go
View File

@ -25,23 +25,61 @@ var (
addressWS string addressWS string
) )
func clientToRelay( // -----------------------------------------------------------------------------
ctx context.Context, ws *websocket.Conn, conn net.Conn) bool {
t, b, err := ws.Read(ctx) func relayReadJSON(conn net.Conn) []byte {
var length uint32
if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
log.Println("Event receive failed: " + err.Error())
return nil
}
b := make([]byte, length)
if _, err := io.ReadFull(conn, b); err != nil {
log.Println("Event receive failed: " + err.Error())
return nil
}
log.Printf("<? %v\n", b)
var m RelayEventMessage
if after, ok := m.ConsumeFrom(b); !ok {
log.Println("Event deserialization failed")
return nil
} else if len(after) != 0 {
log.Println("Event deserialization failed: trailing data")
return nil
}
j, err := json.Marshal(&m)
if err != nil { if err != nil {
log.Println("Command receive failed: " + err.Error()) log.Println("Event marshalling failed: " + err.Error())
return false return nil
}
if t != websocket.MessageText {
log.Println("Command receive failed: " +
"binary messages are not supported")
return false
} }
return j
}
log.Printf("?> %s\n", b) func relayMakeReceiver(ctx context.Context, conn net.Conn) <-chan []byte {
p := make(chan []byte, 1)
go func() {
defer close(p)
for {
j := relayReadJSON(conn)
if j == nil {
return
}
select {
case p <- j:
case <-ctx.Done():
return
}
}
}()
return p
}
func relayWriteJSON(conn net.Conn, j []byte) bool {
var m RelayCommandMessage var m RelayCommandMessage
if err := json.Unmarshal(b, &m); err != nil { if err := json.Unmarshal(j, &m); err != nil {
log.Println("Command unmarshalling failed: " + err.Error()) log.Println("Command unmarshalling failed: " + err.Error())
return false return false
} }
@ -61,45 +99,33 @@ func clientToRelay(
return true return true
} }
func relayToClient( // -----------------------------------------------------------------------------
ctx context.Context, ws *websocket.Conn, conn net.Conn) bool {
var length uint32
if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
log.Println("Event receive failed: " + err.Error())
return false
}
b := make([]byte, length)
if _, err := io.ReadFull(conn, b); err != nil {
log.Println("Event receive failed: " + err.Error())
return false
}
log.Printf("<? %v\n", b) func clientReadJSON(ctx context.Context, ws *websocket.Conn) []byte {
t, j, err := ws.Read(ctx)
var m RelayEventMessage
if after, ok := m.ConsumeFrom(b); !ok {
log.Println("Event deserialization failed")
return false
} else if len(after) != 0 {
log.Println("Event deserialization failed: trailing data")
return false
}
j, err := json.Marshal(&m)
if err != nil { if err != nil {
log.Println("Event marshalling failed: " + err.Error()) log.Println("Command receive failed: " + err.Error())
return false return nil
} }
if t != websocket.MessageText {
log.Println(
"Command receive failed: " + "binary messages are not supported")
return nil
}
log.Printf("?> %s\n", j)
return j
}
func clientWriteJSON(ctx context.Context, ws *websocket.Conn, j []byte) bool {
if err := ws.Write(ctx, websocket.MessageText, j); err != nil { if err := ws.Write(ctx, websocket.MessageText, j); err != nil {
log.Println("Event send failed: " + err.Error()) log.Println("Event send failed: " + err.Error())
return false return false
} }
log.Printf("<- %s\n", j) log.Printf("<- %s\n", j)
return true return true
} }
func errorToClient(ctx context.Context, ws *websocket.Conn, err error) bool { func clientWriteError(ctx context.Context, ws *websocket.Conn, err error) bool {
j, err := json.Marshal(&RelayEventMessage{ j, err := json.Marshal(&RelayEventMessage{
EventSeq: 0, EventSeq: 0,
Data: RelayEventData{ Data: RelayEventData{
@ -114,25 +140,21 @@ func errorToClient(ctx context.Context, ws *websocket.Conn, err error) bool {
log.Println("Event marshalling failed: " + err.Error()) log.Println("Event marshalling failed: " + err.Error())
return false return false
} }
if err := ws.Write(ctx, websocket.MessageText, j); err != nil { return clientWriteJSON(ctx, ws, j)
log.Println("Event send failed: " + err.Error())
return false
}
return true
} }
func handleWS(w http.ResponseWriter, r *http.Request) { func handleWS(w http.ResponseWriter, r *http.Request) {
ws, err := websocket.Accept(w, r, &websocket.AcceptOptions{ ws, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true, InsecureSkipVerify: true,
CompressionMode: websocket.CompressionContextTakeover, // Note that Safari can be broken with compression.
// This is for the payload, and happens to trigger on all messages. CompressionMode: websocket.CompressionContextTakeover,
CompressionThreshold: 16, // This is for the payload; set higher to avoid overhead.
CompressionThreshold: 64 << 10,
}) })
if err != nil { if err != nil {
log.Println("Client rejected: " + err.Error()) log.Println("Client rejected: " + err.Error())
return return
} }
defer ws.Close(websocket.StatusGoingAway, "Goodbye") defer ws.Close(websocket.StatusGoingAway, "Goodbye")
ctx, cancel := context.WithCancel(r.Context()) ctx, cancel := context.WithCancel(r.Context())
@ -140,24 +162,40 @@ func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := net.Dial("tcp", addressConnect) conn, err := net.Dial("tcp", addressConnect)
if err != nil { if err != nil {
errorToClient(ctx, ws, err) clientWriteError(ctx, ws, err)
return return
} }
defer conn.Close()
// We don't need to intervene, so it's just two separate pipes so far. // We don't need to intervene, so it's just two separate pipes so far.
// However, to decrease latencies, events are received and decoded
// in parallel to their sending.
relayJSON := relayMakeReceiver(ctx, conn)
go func() { go func() {
for clientToRelay(ctx, ws, conn) { defer cancel()
for {
j := clientReadJSON(ctx, ws)
if j == nil {
return
}
relayWriteJSON(conn, j)
} }
cancel()
}() }()
go func() { go func() {
for relayToClient(ctx, ws, conn) { defer cancel()
for {
j, ok := <-relayJSON
if !ok {
return
}
clientWriteJSON(ctx, ws, j)
} }
cancel()
}() }()
<-ctx.Done() <-ctx.Done()
} }
// -----------------------------------------------------------------------------
var staticHandler = http.FileServer(http.Dir(".")) var staticHandler = http.FileServer(http.Dir("."))
var page = template.Must(template.New("/").Parse(`<!DOCTYPE html> var page = template.Must(template.New("/").Parse(`<!DOCTYPE html>