It's about time to make this work

Luckily BBC is /still/ using HLS for their streams,
and even the external metadata format hasn't changed.
This commit is contained in:
Přemysl Eric Janouch 2018-10-01 00:36:49 +02:00
parent ae26e5a8ea
commit dddbc5556e
Signed by: p
GPG Key ID: A0420B94F92B9493
5 changed files with 184 additions and 86 deletions

View File

@ -1,4 +1,4 @@
Copyright (c) 2016 - 2017, Přemysl Janouch <p@janouch.name> Copyright (c) 2016 - 2018, Přemysl Janouch <p@janouch.name>
Permission to use, copy, modify, and/or distribute this software for any Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted. purpose with or without fee is hereby granted.

View File

@ -11,22 +11,28 @@ a package with the latest development version from Archlinux's AUR.
Building and Running Building and Running
-------------------- --------------------
Build dependencies: CMake, go Build dependencies: go
$ git clone --recursive https://git.janouch.name/p/bbc-on-ice.git $ git clone https://git.janouch.name/p/bbc-on-ice.git
$ mkdir bbc-on-ice/build $ cd bbc-on-ice
$ cd bbc-on-ice/build $ go build
$ cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=Debug
$ make
To install the application, you can do either the usual: To run the local server:
# make install $ ./bbc-on-ice :8000
Or you can try telling CMake to make a package for you. For Debian it is: Streams have URLs in the following form:
$ cpack -G DEB $ mpv http://localhost:8000/nonuk/sbr_low/bbc_radio_one
# dpkg -i bbc-on-ice-*.deb $ mpv http://localhost:8000/uk/sbr_high/bbc_1xtra
Socket activation
-----------------
The provided bbc-on-ice.service and bbc-on-ice.socket should do, just change
the `ExecStart` path as needed and place the files appropriately. Then:
$ systemctl enable bbc-on-ice.socket
$ systemctl start bbc-on-ice.socket
Contributing and Support Contributing and Support
------------------------ ------------------------

5
bbc-on-ice.service Normal file
View File

@ -0,0 +1,5 @@
[Unit]
Description=bbc-on-ice
[Service]
ExecStart=/usr/bin/bbc-on-ice

5
bbc-on-ice.socket Normal file
View File

@ -0,0 +1,5 @@
[Socket]
ListenStream=[::1]:8000
[Install]
WantedBy=sockets.target

222
main.go
View File

@ -20,14 +20,19 @@ import (
"unicode/utf8" "unicode/utf8"
) )
const (
targetURI = "http://a.files.bbci.co.uk/media/live/manifesto/" +
"audio/simulcast/hls/%s/%s/ak/%s.m3u8"
metaBaseURI = "http://polling.bbc.co.uk/radio/nhppolling/"
)
type meta struct { type meta struct {
title string // What's playing right now title string // what's playing right now
timeout uint // Timeout for the next poll in ms timeout uint // timeout for the next poll in ms
} }
// Retrieve and decode metadata information from an independent webservice // getMeta retrieves and decodes metadata info from an independent webservice.
func getMeta(name string) (*meta, error) { func getMeta(name string) (*meta, error) {
const metaBaseURI = "http://polling.bbc.co.uk/radio/nhppolling/"
resp, err := http.Get(metaBaseURI + name) resp, err := http.Get(metaBaseURI + name)
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
@ -41,10 +46,9 @@ func getMeta(name string) (*meta, error) {
return nil, errors.New("invalid metadata response") return nil, errors.New("invalid metadata response")
} }
// TODO: also retrieve richtracks/is_now_playing, see example file
type broadcast struct { type broadcast struct {
Title string // Title of the broadcast Title string // title of the broadcast
Percentage int // How far we're in Percentage int // how far we're in
} }
var v struct { var v struct {
Packages struct { Packages struct {
@ -52,6 +56,11 @@ func getMeta(name string) (*meta, error) {
Broadcasts []broadcast Broadcasts []broadcast
BroadcastNowIndex uint BroadcastNowIndex uint
} `json:"on-air"` } `json:"on-air"`
Richtracks []struct {
Artist string
Title string
IsNowPlaying bool `json:"is_now_playing"`
}
} }
Timeouts struct { Timeouts struct {
PollingTimeout uint `json:"polling_timeout"` PollingTimeout uint `json:"polling_timeout"`
@ -65,13 +74,17 @@ func getMeta(name string) (*meta, error) {
if onAir.BroadcastNowIndex >= uint(len(onAir.Broadcasts)) { if onAir.BroadcastNowIndex >= uint(len(onAir.Broadcasts)) {
return nil, errors.New("no active broadcast") return nil, errors.New("no active broadcast")
} }
return &meta{ title := onAir.Broadcasts[onAir.BroadcastNowIndex].Title
timeout: v.Timeouts.PollingTimeout, for _, rt := range v.Packages.Richtracks {
title: onAir.Broadcasts[onAir.BroadcastNowIndex].Title, if rt.IsNowPlaying {
}, nil title = rt.Artist + " - " + rt.Title + " / " + title
}
}
return &meta{timeout: v.Timeouts.PollingTimeout, title: title}, nil
} }
// Resolve an M3U8 playlist to the first link that seems to be playable // resolveM3U8 resolves an M3U8 playlist to the first link that seems to
// be playable, possibly recursing.
func resolveM3U8(target string) (out []string, err error) { func resolveM3U8(target string) (out []string, err error) {
resp, err := http.Get(target) resp, err := http.Get(target)
if resp != nil { if resp != nil {
@ -90,12 +103,13 @@ func resolveM3U8(target string) (out []string, err error) {
continue continue
} }
if !strings.Contains(line, "/") { if !strings.Contains(line, "/") {
// Seems to be a relative link, let's make it absolute // Seems to be a relative link, let's make it absolute.
dir, _ := path.Split(target) dir, _ := path.Split(target)
line = dir + line line = dir + line
} }
if strings.HasSuffix(line, "m3u8") { if strings.HasSuffix(line, "m3u8") {
// The playlist seems to recurse, and so do we // The playlist seems to recurse, and so will we.
// XXX: This should be bounded, not just by the stack.
return resolveM3U8(line) return resolveM3U8(line)
} }
out = append(out, line) out = append(out, line)
@ -103,6 +117,8 @@ func resolveM3U8(target string) (out []string, err error) {
return out, nil return out, nil
} }
// metaProc periodically polls the sub-URL given by name for titles and sends
// them out the given channel. Never returns prematurely.
func metaProc(ctx context.Context, name string, out chan<- string) { func metaProc(ctx context.Context, name string, out chan<- string) {
defer close(out) defer close(out)
@ -116,10 +132,14 @@ func metaProc(ctx context.Context, name string, out chan<- string) {
} else { } else {
current = meta.title current = meta.title
interval = time.Duration(meta.timeout) interval = time.Duration(meta.timeout)
// It seems to normally use 25 seconds which is a lot,
// especially considering all the possible additional buffering.
if interval > 5000 {
interval = 5000
}
} }
if current != last { if current != last {
// TODO: see https://blog.golang.org/pipelines
// find out if we can do this better
select { select {
case out <- current: case out <- current:
case <-ctx.Done(): case <-ctx.Done():
@ -136,12 +156,77 @@ func metaProc(ctx context.Context, name string, out chan<- string) {
} }
} }
// urlProc periodically checks the playlist for yet unseen URLs and sends them
// over the channel. Assumes that URLs are incremental for simplicity, although
// there doesn't seem to be any such gaurantee by the HLS protocol.
func urlProc(ctx context.Context, playlistURL string, out chan<- string) {
defer close(out)
highest := ""
for {
target, err := resolveM3U8(playlistURL)
if err != nil {
return
}
for _, url := range target {
if url <= highest {
continue
}
select {
case out <- url:
highest = url
case <-ctx.Done():
return
}
}
// I expect this to be mainly driven by the buffered channel but
// a small (less than target duration) additional pause will not hurt.
time.Sleep(1 * time.Second)
}
}
// https://tools.ietf.org/html/rfc8216
// http://www.gpac-licensing.com/2014/12/08/apple-hls-technical-depth/
func dataProc(ctx context.Context, playlistURL string, maxChunk int,
out chan<- []byte) {
defer close(out)
// The channel is buffered so that the urlProc can fetch in advance.
urls := make(chan string, 3)
go urlProc(ctx, playlistURL, urls)
for url := range urls {
resp, err := http.Get(url)
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
return
}
for {
chunk := make([]byte, maxChunk)
n, err := resp.Body.Read(chunk)
select {
case out <- chunk[:n]:
case <-ctx.Done():
return
}
if err == io.EOF {
break
}
if err != nil {
return
}
}
}
}
var pathRE = regexp.MustCompile(`^/(.*?)/(.*?)/(.*?)$`) var pathRE = regexp.MustCompile(`^/(.*?)/(.*?)/(.*?)$`)
func proxy(w http.ResponseWriter, req *http.Request) { func proxy(w http.ResponseWriter, req *http.Request) {
const targetURI = "http://a.files.bbci.co.uk/media/live/manifesto/" + const metaint = 1 << 15
"audio/simulcast/hls/%s/%s/ak/%s.m3u8"
const metaint = 1 << 16
m := pathRE.FindStringSubmatch(req.URL.Path) m := pathRE.FindStringSubmatch(req.URL.Path)
if m == nil { if m == nil {
http.NotFound(w, req) http.NotFound(w, req)
@ -149,14 +234,18 @@ func proxy(w http.ResponseWriter, req *http.Request) {
} }
hijacker, ok := w.(http.Hijacker) hijacker, ok := w.(http.Hijacker)
if !ok { if !ok {
// We're not using TLS where HTTP/2 could have caused this // We're not using TLS where HTTP/2 could have caused this.
panic("cannot hijack connection") panic("cannot hijack connection")
} }
// E.g. `nonuk`, `sbr_low` `bbc_radio_one`, or `uk`, `sbr_high`, `bbc_1xtra` // E.g. `nonuk`, `sbr_low` `bbc_radio_one`, or `uk`, `sbr_high`, `bbc_1xtra`
region, quality, name := m[1], m[2], m[3] region, quality, name := m[1], m[2], m[3]
// This validates the params as a side-effect
target, err := resolveM3U8(fmt.Sprintf(targetURI, region, quality, name)) // TODO: We probably shouldn't poll the top level playlist.
mainPlaylistURL := fmt.Sprintf(targetURI, region, quality, name)
// This validates the parameters as a side-effect.
target, err := resolveM3U8(mainPlaylistURL)
if err == nil && len(target) == 0 { if err == nil && len(target) == 0 {
err = errors.New("cannot resolve playlist") err = errors.New("cannot resolve playlist")
} }
@ -165,11 +254,8 @@ func proxy(w http.ResponseWriter, req *http.Request) {
return return
} }
wantMeta := false wantMeta := req.Header.Get("Icy-MetaData") == "1"
if icyMeta, ok := req.Header["Icy-MetaData"]; ok { resp, err := http.Head(target[0])
wantMeta = len(icyMeta) == 1 && icyMeta[0] == "1"
}
resp, err := http.Get(target[0])
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -182,11 +268,12 @@ func proxy(w http.ResponseWriter, req *http.Request) {
} }
defer conn.Close() defer conn.Close()
// TODO: retrieve some general information from somewhere? // TODO: Retrieve some general information from somewhere?
// There's nothing interesting in the playlist files. // There's nothing interesting in the playlist files.
fmt.Fprintf(bufrw, "ICY 200 OK\r\n") fmt.Fprintf(bufrw, "ICY 200 OK\r\n")
fmt.Fprintf(bufrw, "icy-name:%s\r\n", name) fmt.Fprintf(bufrw, "icy-name:%s\r\n", name)
// BBC marks this as a video type, maybe just force audio/mpeg // BBC marks this as a video type, maybe just force audio/mpeg.
fmt.Fprintf(bufrw, "content-type:%s\r\n", resp.Header["Content-Type"][0]) fmt.Fprintf(bufrw, "content-type:%s\r\n", resp.Header["Content-Type"][0])
fmt.Fprintf(bufrw, "icy-pub:%d\r\n", 0) fmt.Fprintf(bufrw, "icy-pub:%d\r\n", 0)
if wantMeta { if wantMeta {
@ -197,50 +284,44 @@ func proxy(w http.ResponseWriter, req *http.Request) {
metaChan := make(chan string) metaChan := make(chan string)
go metaProc(req.Context(), name, metaChan) go metaProc(req.Context(), name, metaChan)
// TODO: move to a normal function
// FIXME: this will load a few seconds (one URL) and die
// - we can either try to implement this and hope for the best
// https://tools.ietf.org/html/draft-pantos-http-live-streaming-20
// then like https://github.com/kz26/gohls/blob/master/main.go
// - or we can become more of a proxy, which complicates ICY
chunkChan := make(chan []byte) chunkChan := make(chan []byte)
go func() { go dataProc(req.Context(), mainPlaylistURL, metaint, chunkChan)
defer resp.Body.Close()
defer close(chunkChan)
for {
chunk := make([]byte, metaint)
n, err := io.ReadFull(resp.Body, chunk)
chunkChan <- chunk[:n]
if err != nil {
return
}
select { // dataProc may return less data near the end of a subfile, so we give it
default: // a maximum count of bytes to return at once and do our own buffering.
case <-req.Context().Done(): var queuedMetaUpdate, queuedChunk []byte
return
}
}
}()
var queuedMeta []byte
for { for {
select { select {
case title := <-metaChan: case title := <-metaChan:
queuedMeta = []byte(fmt.Sprintf("StreamTitle='%s'", title)) queuedMetaUpdate = []byte(fmt.Sprintf("StreamTitle='%s'", title))
case chunk, ok := <-chunkChan: case chunk, ok := <-chunkChan:
if !ok { if !ok {
return return
} }
space := metaint - len(queuedChunk)
if space > len(chunk) {
space = len(chunk)
}
queuedChunk = append(queuedChunk, chunk[:space]...)
if len(queuedChunk) < metaint {
break
}
if _, err := bufrw.Write(queuedChunk); err != nil {
return
}
queuedChunk = chunk[space:]
if wantMeta { if wantMeta {
var meta [1 + 16*255]byte var meta [1 + 16*255]byte
meta[0] = byte((copy(meta[1:], queuedMeta) + 15) / 16) meta[0] = byte((copy(meta[1:], queuedMetaUpdate) + 15) / 16)
chunk = append(chunk, meta[:1+int(meta[0])*16]...) queuedMetaUpdate = nil
queuedMeta = nil
} if _, err := bufrw.Write(meta[:1+int(meta[0])*16]); err != nil {
if _, err := bufrw.Write(chunk); err != nil {
return return
} }
}
if err := bufrw.Flush(); err != nil { if err := bufrw.Flush(); err != nil {
return return
} }
@ -257,6 +338,7 @@ func socketActivationListener() net.Listener {
nfds, err := strconv.Atoi(os.Getenv("LISTEN_FDS")) nfds, err := strconv.Atoi(os.Getenv("LISTEN_FDS"))
if err != nil || nfds == 0 { if err != nil || nfds == 0 {
log.Println("LISTEN_FDS unworkable")
return nil return nil
} else if nfds > 1 { } else if nfds > 1 {
log.Fatalln("not supporting more than one listening socket") log.Fatalln("not supporting more than one listening socket")
@ -271,7 +353,7 @@ func socketActivationListener() net.Listener {
return ln return ln
} }
// Had to copy this from Server.ListenAndServe() // Had to copy this from Server.ListenAndServe.
type tcpKeepAliveListener struct{ *net.TCPListener } type tcpKeepAliveListener struct{ *net.TCPListener }
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
@ -285,22 +367,22 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
} }
func main() { func main() {
listenAddr := ":8000"
if len(os.Args) == 2 {
listenAddr = os.Args[1]
}
var listener net.Listener var listener net.Listener
if ln := socketActivationListener(); listener != nil { if ln := socketActivationListener(); ln != nil {
// Keepalives can be set in the systemd unit, see systemd.socket(5) // Keepalives can be set in the systemd unit, see systemd.socket(5).
listener = ln listener = ln
} else if ln, err := net.Listen("tcp", listenAddr); err != nil { } else {
if len(os.Args) < 2 {
log.Fatalf("usage: %s LISTEN-ADDR\n", os.Args[0])
}
if ln, err := net.Listen("tcp", os.Args[1]); err != nil {
log.Fatalln(err) log.Fatalln(err)
} else { } else {
listener = tcpKeepAliveListener{ln.(*net.TCPListener)} listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
} }
}
http.HandleFunc("/", proxy) http.HandleFunc("/", proxy)
// We don't need to clean up properly since we store no data // We don't need to clean up properly since we store no data.
log.Fatalln(http.Serve(listener, nil)) log.Fatalln(http.Serve(listener, nil))
} }