Compare commits

...

12 Commits

Author SHA1 Message Date
b766c9ef20 Improve current Arch Linux run log
All checks were successful
Alpine 3.22 Success
2025-11-23 19:38:51 +01:00
6868bde5e6 Reject unknown DB versions
All checks were successful
Alpine 3.21 Success
2025-09-06 09:12:27 +02:00
d3a046d85d Avoid disaster with DB migrations
All checks were successful
Alpine 3.21 Success
2025-09-04 10:38:31 +02:00
6622ea0e1c Improve formatting of durations
All checks were successful
Alpine 3.20 Success
Since "m" could stand for both "minute" and "month",
and months vary in length, let's stop at days.
2025-01-02 00:36:03 +01:00
a492b3b668 Clean up 2024-12-28 00:27:46 +01:00
280114a5d3 Unify our usage of the local shell 2024-12-27 02:16:14 +01:00
d83517f67b Refresh task view dynamically with Javascript
All checks were successful
Alpine 3.20 Success
This is more efficient, responsive, and user friendly.
2024-12-27 00:25:49 +01:00
4f2c2dc8da Order tasks by change date first
All checks were successful
Alpine 3.20 Success
The user presumably does not want to look everywhere for recent tasks.
2024-12-26 16:24:54 +01:00
55a6693942 Fix deployment error processing 2024-12-26 16:18:37 +01:00
d5981249b1 Add time information 2024-12-26 16:17:45 +01:00
4a7fc55c92 Runtime configuration changes
All checks were successful
Alpine 3.20 Success
Through an RPC command, because systemd documentation told us to.
2024-12-26 12:03:00 +01:00
2bd231b84f Fix Makefile dependencies, extend tests
All checks were successful
Alpine 3.20 Success
2024-12-26 00:40:58 +01:00
7 changed files with 475 additions and 110 deletions

View File

@@ -1,4 +1,4 @@
Copyright (c) 2024, Přemysl Eric Janouch <p@janouch.name> Copyright (c) 2024 - 2025, Přemysl Eric Janouch <p@janouch.name>
Permission to use, copy, modify, and/or distribute this software for any Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted. purpose with or without fee is hereby granted.

View File

@@ -5,7 +5,7 @@ version = dev
outputs = acid acid.1 outputs = acid acid.1
all: $(outputs) all: $(outputs)
acid: acid.go acid: acid.go terminal.go
go build -ldflags "-X 'main.projectVersion=$(version)'" -o $@ go build -ldflags "-X 'main.projectVersion=$(version)'" -o $@
acid.1: acid.adoc acid.1: acid.adoc
asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc || \ asciidoctor -b manpage -a release-version=$(version) -o $@ acid.adoc || \

View File

@@ -37,6 +37,8 @@ Commands
*restart* [_ID_]...:: *restart* [_ID_]...::
Schedule tasks with the given IDs to be rerun. Schedule tasks with the given IDs to be rerun.
Run this command without arguments to pick up external database changes. Run this command without arguments to pick up external database changes.
*reload*::
Reload configuration.
Configuration Configuration
------------- -------------
@@ -65,6 +67,17 @@ which has the following fields:
*RunnerName*:: *RunnerName*::
Descriptive name of the runner. Descriptive name of the runner.
// Intentionally not documenting CreatedUnix, ChangedUnix, DurationSeconds,
// which can be derived from the objects.
*Created*, *Changed*::
`*time.Time` of task creation and last task state change respectively,
or nil if not known.
*CreatedAgo*, *ChangedAgo*::
Abbreviated human-friendly relative elapsed time duration
since *Created* and *Changed* respectively.
*Duration*::
`*time.Duration` of the last run in seconds, or nil if not known.
*URL*:: *URL*::
*acid* link to the task, where its log output can be seen. *acid* link to the task, where its log output can be seen.
*RepoURL*:: *RepoURL*::

467
acid.go
View File

@@ -26,6 +26,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"syscall" "syscall"
ttemplate "text/template" ttemplate "text/template"
"time" "time"
@@ -40,9 +41,9 @@ var (
projectName = "acid" projectName = "acid"
projectVersion = "?" projectVersion = "?"
gConfig Config = Config{Listen: ":http"} gConfigPath string
gNotifyScript *ttemplate.Template gConfig atomic.Pointer[Config]
gDB *sql.DB gDB *sql.DB
gNotifierSignal = make(chan struct{}, 1) gNotifierSignal = make(chan struct{}, 1)
gExecutorSignal = make(chan struct{}, 1) gExecutorSignal = make(chan struct{}, 1)
@@ -52,6 +53,8 @@ var (
gRunning = make(map[int64]*RunningTask) gRunning = make(map[int64]*RunningTask)
) )
func getConfig() *Config { return gConfig.Load() }
// --- Config ------------------------------------------------------------------ // --- Config ------------------------------------------------------------------
type Config struct { type Config struct {
@@ -65,6 +68,8 @@ type Config struct {
Runners map[string]ConfigRunner `yaml:"runners"` // script runners Runners map[string]ConfigRunner `yaml:"runners"` // script runners
Projects map[string]ConfigProject `yaml:"projects"` // configured projects Projects map[string]ConfigProject `yaml:"projects"` // configured projects
notifyTemplate *ttemplate.Template
} }
type ConfigRunner struct { type ConfigRunner struct {
@@ -86,8 +91,9 @@ type ConfigProject struct {
func (cf *ConfigProject) AutomaticRunners() (runners []string) { func (cf *ConfigProject) AutomaticRunners() (runners []string) {
// We pass through unknown runner names, // We pass through unknown runner names,
// so that they can cause reference errors later. // so that they can cause reference errors later.
config := getConfig()
for runner := range cf.Runners { for runner := range cf.Runners {
if r, _ := gConfig.Runners[runner]; !r.Manual { if r, _ := config.Runners[runner]; !r.Manual {
runners = append(runners, runner) runners = append(runners, runner)
} }
} }
@@ -102,17 +108,28 @@ type ConfigProjectRunner struct {
Timeout string `yaml:"timeout"` // timeout duration Timeout string `yaml:"timeout"` // timeout duration
} }
func parseConfig(path string) error { // loadConfig reloads configuration.
if f, err := os.Open(path); err != nil { // Beware that changes do not get applied globally at the same moment.
func loadConfig() error {
new := &Config{}
if f, err := os.Open(gConfigPath); err != nil {
return err return err
} else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil { } else if err = yaml.NewDecoder(f).Decode(new); err != nil {
return err return err
} }
if old := getConfig(); old != nil && old.DB != new.DB {
return fmt.Errorf("the database file cannot be changed in runtime")
}
var err error var err error
gNotifyScript, err = new.notifyTemplate, err =
ttemplate.New("notify").Funcs(shellFuncs).Parse(gConfig.Notify) ttemplate.New("notify").Funcs(shellFuncs).Parse(new.Notify)
return err if err != nil {
return err
}
gConfig.Store(new)
return nil
} }
var shellFuncs = ttemplate.FuncMap{ var shellFuncs = ttemplate.FuncMap{
@@ -136,8 +153,16 @@ var shellFuncs = ttemplate.FuncMap{
// --- Utilities --------------------------------------------------------------- // --- Utilities ---------------------------------------------------------------
func localShell() string {
if shell := os.Getenv("SHELL"); shell != "" {
return shell
}
// The os/user package doesn't store the parsed out shell field.
return "/bin/sh"
}
func giteaSign(b []byte) string { func giteaSign(b []byte) string {
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) payloadHmac := hmac.New(sha256.New, []byte(getConfig().Secret))
payloadHmac.Write(b) payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil)) return hex.EncodeToString(payloadHmac.Sum(nil))
} }
@@ -145,9 +170,9 @@ func giteaSign(b []byte) string {
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) ( func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
*http.Request, error) { *http.Request, error) {
req, err := http.NewRequestWithContext( req, err := http.NewRequestWithContext(
ctx, method, gConfig.Gitea+path, body) ctx, method, getConfig().Gitea+path, body)
if req != nil { if req != nil {
req.Header.Set("Authorization", "token "+gConfig.Token) req.Header.Set("Authorization", "token "+getConfig().Token)
req.Header.Set("Accept", "application/json") req.Header.Set("Accept", "application/json")
} }
return req, err return req, err
@@ -156,6 +181,7 @@ func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, ` rows, err := gDB.QueryContext(ctx, `
SELECT id, owner, repo, hash, runner, SELECT id, owner, repo, hash, runner,
created, changed, duration,
state, detail, notified, state, detail, notified,
runlog, tasklog, deploylog FROM task `+query, args...) runlog, tasklog, deploylog FROM task `+query, args...)
if err != nil { if err != nil {
@@ -167,11 +193,13 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
for rows.Next() { for rows.Next() {
var t Task var t Task
err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner, err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner,
&t.CreatedUnix, &t.ChangedUnix, &t.DurationSeconds,
&t.State, &t.Detail, &t.Notified, &t.State, &t.Detail, &t.Notified,
&t.RunLog, &t.TaskLog, &t.DeployLog) &t.RunLog, &t.TaskLog, &t.DeployLog)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// We could also update some fields from gRunning.
tasks = append(tasks, t) tasks = append(tasks, t)
} }
return tasks, rows.Err() return tasks, rows.Err()
@@ -192,6 +220,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
<thead> <thead>
<tr> <tr>
<th>ID</th> <th>ID</th>
<th>Created</th>
<th>Changed</th>
<th>Repository</th> <th>Repository</th>
<th>Hash</th> <th>Hash</th>
<th>Runner</th> <th>Runner</th>
@@ -204,6 +234,8 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
{{range .}} {{range .}}
<tr> <tr>
<td><a href="task/{{.ID}}">{{.ID}}</a></td> <td><a href="task/{{.ID}}">{{.ID}}</a></td>
<td align="right"><span title="{{.Created}}">{{.CreatedAgo}}</span></td>
<td align="right"><span title="{{.Changed}}">{{.ChangedAgo}}</span></td>
<td><a href="{{.RepoURL}}">{{.FullName}}</a></td> <td><a href="{{.RepoURL}}">{{.FullName}}</a></td>
<td><a href="{{.CommitURL}}">{{.Hash}}</a></td> <td><a href="{{.CommitURL}}">{{.Hash}}</a></td>
<td>{{.RunnerName}}</td> <td>{{.RunnerName}}</td>
@@ -219,7 +251,7 @@ var templateTasks = template.Must(template.New("tasks").Parse(`
`)) `))
func handleTasks(w http.ResponseWriter, r *http.Request) { func handleTasks(w http.ResponseWriter, r *http.Request) {
tasks, err := getTasks(r.Context(), `ORDER BY id DESC`) tasks, err := getTasks(r.Context(), `ORDER BY changed DESC, id DESC`)
if err != nil { if err != nil {
http.Error(w, http.Error(w,
"Error retrieving tasks: "+err.Error(), "Error retrieving tasks: "+err.Error(),
@@ -238,41 +270,179 @@ var templateTask = template.Must(template.New("tasks").Parse(`
<head> <head>
<title>Task {{.ID}}</title> <title>Task {{.ID}}</title>
<meta charset="utf-8"> <meta charset="utf-8">
{{if .IsRunning}}
<meta http-equiv="refresh" content="5">
{{end}}
</head> </head>
<body> <body>
<h1><a href="..">Tasks</a> &raquo; {{.ID}}</h1> <h1><a href="..">Tasks</a> &raquo; {{.ID}}</h1>
<dl> <dl>
<!-- Remember to synchronise these lists with Javascript updates. -->
{{if .Created -}}
<dt>Created</dt>
<dd><span id="created" title="{{.Created}}">{{.CreatedAgo}} ago</span></dd>
{{end -}}
{{if .Changed -}}
<dt>Changed</dt>
<dd><span id="changed" title="{{.Changed}}">{{.ChangedAgo}} ago</span></dd>
{{end -}}
<dt>Project</dt> <dt>Project</dt>
<dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd> <dd id="project"><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
<dt>Commit</dt> <dt>Commit</dt>
<dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd> <dd id="commit"><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
<dt>Runner</dt> <dt>Runner</dt>
<dd>{{.RunnerName}}</dd> <dd id="runner">{{.RunnerName}}</dd>
<dt>State</dt> <dt>State</dt>
<dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd> <dd id="state">{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
<dt>Notified</dt> <dt>Notified</dt>
<dd>{{.Notified}}</dd> <dd id="notified">{{.Notified}}</dd>
<dt>Duration</dt>
<dd id="duration">{{if .Duration}}{{.Duration}}{{else}}&mdash;{{end}}</dd>
</dl> </dl>
{{if .RunLog}}
<h2>Runner log</h2> <h2 id="run"{{if not .RunLog}} hidden{{end}}>Runner log</h2>
<pre>{{printf "%s" .RunLog}}</pre> <pre id="runlog"{{if not .RunLog}} hidden{{
{{end}} end}}>{{printf "%s" .RunLog}}</pre>
{{if .TaskLog}} <h2 id="task"{{if not .TaskLog}} hidden{{end}}>Task log</h2>
<h2>Task log</h2> <pre id="tasklog"{{if not .TaskLog}} hidden{{
<pre>{{printf "%s" .TaskLog}}</pre> end}}>{{printf "%s" .TaskLog}}</pre>
{{end}} <h2 id="deploy"{{if not .DeployLog}} hidden{{end}}>Deploy log</h2>
{{if .DeployLog}} <pre id="deploylog"{{if not .DeployLog}} hidden{{
<h2>Deploy log</h2> end}}>{{printf "%s" .DeployLog}}</pre>
<pre>{{printf "%s" .DeployLog}}</pre>
{{end}} {{if .IsRunning -}}
</table> <script>
function get(id) {
return document.getElementById(id)
}
function getLog(id) {
const header = get(id), log = get(id + 'log'), text = log.textContent
// lines[-1] is an implementation detail of terminalWriter.Serialize,
// lines[-2] is the actual last line.
const last = Math.max(0, text.split('\n').length - 2)
return {header, log, text, last}
}
function refreshLog(log, top, changed) {
if (top <= 0)
log.log.textContent = changed
else
log.log.textContent =
log.text.split('\n').slice(0, top).join('\n') + '\n' + changed
const empty = log.log.textContent === ''
log.header.hidden = empty
log.log.hidden = empty
}
let refresher = setInterval(() => {
const run = getLog('run'), task = getLog('task'), deploy = getLog('deploy')
const url = new URL(window.location.href)
url.search = ''
url.searchParams.set('json', '')
url.searchParams.set('run', run.last)
url.searchParams.set('task', task.last)
url.searchParams.set('deploy', deploy.last)
fetch(url.toString()).then(response => {
if (!response.ok)
throw response.statusText
return response.json()
}).then(data => {
const scroll = window.scrollY + window.innerHeight
>= document.documentElement.scrollHeight
if (data.Created) {
get('created').title = data.Created
get('created').textContent = data.CreatedAgo + " ago"
}
if (data.Changed) {
get('changed').title = data.Changed
get('changed').textContent = data.ChangedAgo + " ago"
}
get('state').textContent = data.State
if (data.Detail !== '')
get('state').textContent += " (" + data.Detail + ")"
get('notified').textContent = String(data.Notified)
if (data.Duration)
get('duration').textContent = data.Duration
refreshLog(run, data.RunLogTop, data.RunLog)
refreshLog(task, data.TaskLogTop, data.TaskLog)
refreshLog(deploy, data.DeployLogTop, data.DeployLog)
if (scroll)
document.documentElement.scrollTop =
document.documentElement.scrollHeight
if (!data.IsRunning)
clearInterval(refresher)
}).catch(error => {
clearInterval(refresher)
alert(error)
})
}, 1000 /* For faster updates than this, we should use WebSockets. */)
</script>
{{end -}}
</body> </body>
</html> </html>
`)) `))
// handlerTask serves as the data for JSON encoding and the task HTML template.
// It needs to explicitly include many convenience method results.
type handlerTask struct {
Task
IsRunning bool
Created *string // Task.Created?.String()
Changed *string // Task.Changed?.String()
CreatedAgo string // Task.CreatedAgo()
ChangedAgo string // Task.ChangedAgo()
Duration *string // Task.Duration?.String()
State string // Task.State.String()
RunLog string
RunLogTop int
TaskLog string
TaskLogTop int
DeployLog string
DeployLogTop int
}
func toNilableString[T fmt.Stringer](stringer *T) *string {
if stringer == nil {
return nil
}
s := (*stringer).String()
return &s
}
func newHandlerTask(task Task) handlerTask {
return handlerTask{
Task: task,
RunLog: string(task.RunLog),
TaskLog: string(task.TaskLog),
DeployLog: string(task.DeployLog),
Created: toNilableString(task.Created()),
Changed: toNilableString(task.Changed()),
CreatedAgo: task.CreatedAgo(),
ChangedAgo: task.ChangedAgo(),
Duration: toNilableString(task.Duration()),
State: task.State.String(),
}
}
func (ht *handlerTask) updateFromRunning(
rt *RunningTask, lastRun, lastTask, lastDeploy int) {
ht.IsRunning = true
ht.Task.DurationSeconds = rt.elapsed()
ht.Duration = toNilableString(ht.Task.Duration())
rt.RunLog.Lock()
defer rt.RunLog.Unlock()
rt.TaskLog.Lock()
defer rt.TaskLog.Unlock()
rt.DeployLog.Lock()
defer rt.DeployLog.Unlock()
ht.RunLog, ht.RunLogTop = rt.RunLog.SerializeUpdates(lastRun)
ht.TaskLog, ht.TaskLogTop = rt.TaskLog.SerializeUpdates(lastTask)
ht.DeployLog, ht.DeployLogTop = rt.DeployLog.SerializeUpdates(lastDeploy)
}
func handleTask(w http.ResponseWriter, r *http.Request) { func handleTask(w http.ResponseWriter, r *http.Request) {
id, err := strconv.Atoi(r.PathValue("id")) id, err := strconv.Atoi(r.PathValue("id"))
if err != nil { if err != nil {
@@ -293,34 +463,32 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
return return
} }
task := struct { // These are intended for running tasks,
Task // so don't reprocess DB logs, which would only help the last update.
IsRunning bool q := r.URL.Query()
}{Task: tasks[0]} lastRun, _ := strconv.Atoi(q.Get("run"))
lastTask, _ := strconv.Atoi(q.Get("task"))
lastDeploy, _ := strconv.Atoi(q.Get("deploy"))
task := newHandlerTask(tasks[0])
func() { func() {
gRunningMutex.Lock() gRunningMutex.Lock()
defer gRunningMutex.Unlock() defer gRunningMutex.Unlock()
rt, ok := gRunning[task.ID] if rt, ok := gRunning[task.ID]; ok {
task.IsRunning = ok task.updateFromRunning(
if !ok { rt, int(lastRun), int(lastTask), int(lastDeploy))
return
} }
rt.RunLog.Lock()
defer rt.RunLog.Unlock()
rt.TaskLog.Lock()
defer rt.TaskLog.Unlock()
rt.DeployLog.Lock()
defer rt.DeployLog.Unlock()
task.RunLog = rt.RunLog.Serialize(0)
task.TaskLog = rt.TaskLog.Serialize(0)
task.DeployLog = rt.DeployLog.Serialize(0)
}() }()
if err := templateTask.Execute(w, &task); err != nil { if q.Has("json") {
http.Error(w, err.Error(), http.StatusInternalServerError) w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(&task)
} else {
err = templateTask.Execute(w, &task)
}
if err != nil {
log.Println(err)
} }
} }
@@ -347,8 +515,9 @@ func createTasks(ctx context.Context,
} }
defer tx.Rollback() defer tx.Rollback()
stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner) stmt, err := tx.Prepare(
VALUES (?, ?, ?, ?)`) `INSERT INTO task(owner, repo, hash, runner, created, changed)
VALUES (?, ?, ?, ?, unixepoch('now'), unixepoch('now'))`)
if err != nil { if err != nil {
return err return err
} }
@@ -397,7 +566,7 @@ func handlePush(w http.ResponseWriter, r *http.Request) {
log.Printf("received push: %s %s\n", log.Printf("received push: %s %s\n",
event.Repository.FullName, event.HeadCommit.ID) event.Repository.FullName, event.HeadCommit.ID)
project, ok := gConfig.Projects[event.Repository.FullName] project, ok := getConfig().Projects[event.Repository.FullName]
if !ok { if !ok {
// This is okay, don't set any commit statuses. // This is okay, don't set any commit statuses.
fmt.Fprintf(w, "The project is not configured.") fmt.Fprintf(w, "The project is not configured.")
@@ -429,8 +598,11 @@ func rpcRestartOne(ctx context.Context, id int64) error {
// The executor bumps to "running" after inserting into gRunning, // The executor bumps to "running" after inserting into gRunning,
// so we should not need to exclude that state here. // so we should not need to exclude that state here.
result, err := gDB.ExecContext(ctx, `UPDATE task //
SET state = ?, detail = '', notified = 0 WHERE id = ?`, // We deliberately do not clear previous run data (duration, *log).
result, err := gDB.ExecContext(ctx,
`UPDATE task SET changed = unixepoch('now'),
state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id) taskStateNew, id)
if err != nil { if err != nil {
return fmt.Errorf("%d: %w", id, err) return fmt.Errorf("%d: %w", id, err)
@@ -506,7 +678,7 @@ func rpcEnqueue(ctx context.Context,
return fmt.Errorf("%s: %w", ref, err) return fmt.Errorf("%s: %w", ref, err)
} }
project, ok := gConfig.Projects[owner+"/"+repo] project, ok := getConfig().Projects[owner+"/"+repo]
if !ok { if !ok {
return fmt.Errorf("project configuration not found") return fmt.Errorf("project configuration not found")
} }
@@ -558,6 +730,17 @@ func rpcRestart(ctx context.Context,
return nil return nil
} }
func rpcReload(ctx context.Context,
w io.Writer, fs *flag.FlagSet, args []string) error {
if err := fs.Parse(args); err != nil {
return err
}
if fs.NArg() > 0 {
return errWrongUsage
}
return loadConfig()
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
var rpcCommands = map[string]struct { var rpcCommands = map[string]struct {
@@ -570,6 +753,8 @@ var rpcCommands = map[string]struct {
"Create or restart tasks for the given reference."}, "Create or restart tasks for the given reference."},
"restart": {rpcRestart, "[ID]...", "restart": {rpcRestart, "[ID]...",
"Schedule tasks with the given IDs to be rerun."}, "Schedule tasks with the given IDs to be rerun."},
"reload": {rpcReload, "",
"Reload configuration."},
} }
func rpcPrintCommands(w io.Writer) { func rpcPrintCommands(w io.Writer) {
@@ -656,12 +841,12 @@ func handleRPC(w http.ResponseWriter, r *http.Request) {
func notifierRunCommand(ctx context.Context, task Task) { func notifierRunCommand(ctx context.Context, task Task) {
script := bytes.NewBuffer(nil) script := bytes.NewBuffer(nil)
if err := gNotifyScript.Execute(script, &task); err != nil { if err := getConfig().notifyTemplate.Execute(script, &task); err != nil {
log.Printf("error: notify: %s", err) log.Printf("error: notify: %s", err)
return return
} }
cmd := exec.CommandContext(ctx, "sh") cmd := exec.CommandContext(ctx, localShell())
cmd.Stdin = script cmd.Stdin = script
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
@@ -683,13 +868,14 @@ func notifierNotify(ctx context.Context, task Task) error {
TargetURL string `json:"target_url"` TargetURL string `json:"target_url"`
}{} }{}
runner, ok := gConfig.Runners[task.Runner] config := getConfig()
runner, ok := config.Runners[task.Runner]
if !ok { if !ok {
log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner) log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner)
return nil return nil
} }
payload.Context = runner.Name payload.Context = runner.Name
payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID) payload.TargetURL = fmt.Sprintf("%s/task/%d", config.Root, task.ID)
switch task.State { switch task.State {
case taskStateNew: case taskStateNew:
@@ -807,13 +993,17 @@ type RunningTask struct {
// newRunningTask prepares a task for running, without executing anything yet. // newRunningTask prepares a task for running, without executing anything yet.
func newRunningTask(task Task) (*RunningTask, error) { func newRunningTask(task Task) (*RunningTask, error) {
rt := &RunningTask{DB: task} rt := &RunningTask{DB: task}
config := getConfig()
// This is for our own tracking, not actually written to database.
rt.DB.ChangedUnix = time.Now().Unix()
var ok bool var ok bool
rt.Runner, ok = gConfig.Runners[rt.DB.Runner] rt.Runner, ok = config.Runners[rt.DB.Runner]
if !ok { if !ok {
return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner) return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner)
} }
project, ok := gConfig.Projects[rt.DB.FullName()] project, ok := config.Projects[rt.DB.FullName()]
if !ok { if !ok {
return nil, fmt.Errorf("project configuration not found") return nil, fmt.Errorf("project configuration not found")
} }
@@ -903,6 +1093,10 @@ func (rt *RunningTask) localEnv() []string {
) )
} }
func (rt *RunningTask) elapsed() int64 {
return int64(time.Since(time.Unix(rt.DB.ChangedUnix, 0)).Seconds())
}
// update stores the running task's state in the database. // update stores the running task's state in the database.
func (rt *RunningTask) update() error { func (rt *RunningTask) update() error {
for _, i := range []struct { for _, i := range []struct {
@@ -919,6 +1113,7 @@ func (rt *RunningTask) update() error {
*i.log = []byte{} *i.log = []byte{}
} }
} }
rt.DB.DurationSeconds = rt.elapsed()
return rt.DB.update() return rt.DB.update()
} }
@@ -975,14 +1170,6 @@ func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error {
return nil return nil
} }
func executorLocalShell() string {
if shell := os.Getenv("SHELL"); shell != "" {
return shell
}
// The os/user package doesn't store the parsed out shell field.
return "/bin/sh"
}
func executorTmpDir(fallback string) string { func executorTmpDir(fallback string) string {
// See also: https://systemd.io/TEMPORARY_DIRECTORIES/ // See also: https://systemd.io/TEMPORARY_DIRECTORIES/
if tmp := os.Getenv("TMPDIR"); tmp != "" { if tmp := os.Getenv("TMPDIR"); tmp != "" {
@@ -1018,9 +1205,10 @@ func executorDeploy(
return err return err
} }
cmd := exec.CommandContext(ctx, executorLocalShell(), "-c", script.String()) cmd := exec.CommandContext(ctx, localShell())
cmd.Env = rt.localEnv() cmd.Env = rt.localEnv()
cmd.Dir = dir cmd.Dir = dir
cmd.Stdin = script
cmd.Stdout = &rt.DeployLog cmd.Stdout = &rt.DeployLog
cmd.Stderr = &rt.DeployLog cmd.Stderr = &rt.DeployLog
return cmd.Run() return cmd.Run()
@@ -1143,6 +1331,7 @@ func executorConnect(
func executorRunTask(ctx context.Context, task Task) error { func executorRunTask(ctx context.Context, task Task) error {
rt, err := newRunningTask(task) rt, err := newRunningTask(task)
if err != nil { if err != nil {
task.DurationSeconds = 0
task.State, task.Detail = taskStateError, "Misconfigured" task.State, task.Detail = taskStateError, "Misconfigured"
task.Notified = 0 task.Notified = 0
task.RunLog = []byte(err.Error()) task.RunLog = []byte(err.Error())
@@ -1162,6 +1351,7 @@ func executorRunTask(ctx context.Context, task Task) error {
f() f()
} }
locked(func() { locked(func() {
rt.DB.DurationSeconds = 0
rt.DB.State, rt.DB.Detail = taskStateRunning, "" rt.DB.State, rt.DB.Detail = taskStateRunning, ""
rt.DB.Notified = 0 rt.DB.Notified = 0
rt.DB.RunLog = []byte{} rt.DB.RunLog = []byte{}
@@ -1249,19 +1439,20 @@ func executorRunTask(ctx context.Context, task Task) error {
defer client.Close() defer client.Close()
var ( var (
ee1 *ssh.ExitError eeSSH *ssh.ExitError
ee2 *executorError eeExec *exec.ExitError
ee3 *executorError
) )
err = executorBuild(ctxRunner, client, rt) err = executorBuild(ctxRunner, client, rt)
if err != nil { if err != nil {
locked(func() { locked(func() {
if errors.As(err, &ee1) { if errors.As(err, &eeSSH) {
rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
} else if errors.As(err, &ee2) { } else if errors.As(err, &ee3) {
rt.DB.State, rt.DB.Detail = taskStateError, ee2.Detail rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail
fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee2.Err) fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee3.Err)
} else { } else {
rt.DB.State, rt.DB.Detail = taskStateError, "" rt.DB.State, rt.DB.Detail = taskStateError, ""
fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
@@ -1281,12 +1472,12 @@ func executorRunTask(ctx context.Context, task Task) error {
locked(func() { locked(func() {
if err == nil { if err == nil {
rt.DB.State, rt.DB.Detail = taskStateSuccess, "" rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
} else if errors.As(err, &ee1) { } else if errors.As(err, &eeExec) {
rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed" rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed"
fmt.Fprintf(&rt.DeployLog, "\n%s\n", err) fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
} else if errors.As(err, &ee2) { } else if errors.As(err, &ee3) {
rt.DB.State, rt.DB.Detail = taskStateError, ee2.Detail rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail
fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee2.Err) fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee3.Err)
} else { } else {
rt.DB.State, rt.DB.Detail = taskStateError, "" rt.DB.State, rt.DB.Detail = taskStateError, ""
fmt.Fprintf(&rt.DeployLog, "\n%s\n", err) fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
@@ -1370,6 +1561,11 @@ type Task struct {
Hash string Hash string
Runner string Runner string
// True database names for these are occupied by accessors.
CreatedUnix int64
ChangedUnix int64
DurationSeconds int64
State taskState State taskState
Detail string Detail string
Notified int64 Notified int64
@@ -1381,7 +1577,7 @@ type Task struct {
func (t *Task) FullName() string { return t.Owner + "/" + t.Repo } func (t *Task) FullName() string { return t.Owner + "/" + t.Repo }
func (t *Task) RunnerName() string { func (t *Task) RunnerName() string {
if runner, ok := gConfig.Runners[t.Runner]; !ok { if runner, ok := getConfig().Runners[t.Runner]; !ok {
return t.Runner return t.Runner
} else { } else {
return runner.Name return runner.Name
@@ -1389,26 +1585,77 @@ func (t *Task) RunnerName() string {
} }
func (t *Task) URL() string { func (t *Task) URL() string {
return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID) return fmt.Sprintf("%s/task/%d", getConfig().Root, t.ID)
} }
func (t *Task) RepoURL() string { func (t *Task) RepoURL() string {
return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo) return fmt.Sprintf("%s/%s/%s", getConfig().Gitea, t.Owner, t.Repo)
} }
func (t *Task) CommitURL() string { func (t *Task) CommitURL() string {
return fmt.Sprintf("%s/%s/%s/commit/%s", return fmt.Sprintf("%s/%s/%s/commit/%s",
gConfig.Gitea, t.Owner, t.Repo, t.Hash) getConfig().Gitea, t.Owner, t.Repo, t.Hash)
} }
func (t *Task) CloneURL() string { func (t *Task) CloneURL() string {
return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo) return fmt.Sprintf("%s/%s/%s.git", getConfig().Gitea, t.Owner, t.Repo)
}
func shortDurationString(d time.Duration) string {
if d.Abs() >= 24*time.Hour {
return strconv.FormatInt(int64(d/time.Hour/24), 10) + "d"
} else if d.Abs() >= time.Hour {
return strconv.FormatInt(int64(d/time.Hour), 10) + "h"
} else if d.Abs() >= time.Minute {
return strconv.FormatInt(int64(d/time.Minute), 10) + "m"
} else {
return strconv.FormatInt(int64(d/time.Second), 10) + "s"
}
}
func (t *Task) Created() *time.Time {
if t.CreatedUnix == 0 {
return nil
}
tt := time.Unix(t.CreatedUnix, 0)
return &tt
}
func (t *Task) Changed() *time.Time {
if t.ChangedUnix == 0 {
return nil
}
tt := time.Unix(t.ChangedUnix, 0)
return &tt
}
func (t *Task) CreatedAgo() string {
if t.CreatedUnix == 0 {
return ""
}
return shortDurationString(time.Since(*t.Created()))
}
func (t *Task) ChangedAgo() string {
if t.ChangedUnix == 0 {
return ""
}
return shortDurationString(time.Since(*t.Changed()))
}
func (t *Task) Duration() *time.Duration {
if t.DurationSeconds == 0 {
return nil
}
td := time.Duration(t.DurationSeconds * int64(time.Second))
return &td
} }
func (t *Task) update() error { func (t *Task) update() error {
_, err := gDB.ExecContext(context.Background(), `UPDATE task _, err := gDB.ExecContext(context.Background(),
SET state = ?, detail = ?, notified = ?, `UPDATE task SET changed = unixepoch('now'), duration = ?,
state = ?, detail = ?, notified = ?,
runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`, runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`,
t.DurationSeconds,
t.State, t.Detail, t.Notified, t.State, t.Detail, t.Notified,
t.RunLog, t.TaskLog, t.DeployLog, t.ID) t.RunLog, t.TaskLog, t.DeployLog, t.ID)
if err == nil { if err == nil {
@@ -1430,6 +1677,10 @@ CREATE TABLE IF NOT EXISTS task(
hash TEXT NOT NULL, -- commit hash hash TEXT NOT NULL, -- commit hash
runner TEXT NOT NULL, -- the runner to use runner TEXT NOT NULL, -- the runner to use
created INTEGER NOT NULL DEFAULT 0, -- creation timestamp
changed INTEGER NOT NULL DEFAULT 0, -- last state change timestamp
duration INTEGER NOT NULL DEFAULT 0, -- duration of last run
state INTEGER NOT NULL DEFAULT 0, -- task state state INTEGER NOT NULL DEFAULT 0, -- task state
detail TEXT NOT NULL DEFAULT '', -- task state detail detail TEXT NOT NULL DEFAULT '', -- task state detail
notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
@@ -1487,13 +1738,28 @@ func dbOpen(path string) error {
`task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil { `task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil {
return err return err
} }
break
case 1: case 1:
if err = dbEnsureColumn(tx,
`task`, `created`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
return err
}
if err = dbEnsureColumn(tx,
`task`, `changed`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
return err
}
if err = dbEnsureColumn(tx,
`task`, `duration`, `INTEGER NOT NULL DEFAULT 0`); err != nil {
return err
}
fallthrough
case 2:
// The next migration goes here, remember to increment the number below. // The next migration goes here, remember to increment the number below.
default:
return fmt.Errorf("unsupported database version: %d", version)
} }
if _, err = tx.Exec( if _, err = tx.Exec(
`PRAGMA user_version = ` + strconv.Itoa(1)); err != nil { `PRAGMA user_version = ` + strconv.Itoa(2)); err != nil {
return err return err
} }
return tx.Commit() return tx.Commit()
@@ -1509,7 +1775,7 @@ func callRPC(args []string) error {
} }
req, err := http.NewRequest(http.MethodPost, req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body)) fmt.Sprintf("%s/rpc", getConfig().Root), bytes.NewReader(body))
if err != nil { if err != nil {
return err return err
} }
@@ -1580,7 +1846,8 @@ func main() {
return return
} }
if err := parseConfig(flag.Arg(0)); err != nil { gConfigPath = flag.Arg(0)
if err := loadConfig(); err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
if flag.NArg() > 1 { if flag.NArg() > 1 {
@@ -1590,7 +1857,7 @@ func main() {
return return
} }
if err := dbOpen(gConfig.DB); err != nil { if err := dbOpen(getConfig().DB); err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
defer gDB.Close() defer gDB.Close()
@@ -1599,7 +1866,7 @@ func main() {
ctx, stop := signal.NotifyContext( ctx, stop := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM) context.Background(), syscall.SIGINT, syscall.SIGTERM)
server := &http.Server{Addr: gConfig.Listen} server := &http.Server{Addr: getConfig().Listen}
http.HandleFunc("/{$}", handleTasks) http.HandleFunc("/{$}", handleTasks)
http.HandleFunc("/task/{id}", handleTask) http.HandleFunc("/task/{id}", handleTask)
http.HandleFunc("/push", handlePush) http.HandleFunc("/push", handlePush)

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"testing" "testing"
ttemplate "text/template" ttemplate "text/template"
"time"
) )
func TestTemplateQuote(t *testing.T) { func TestTemplateQuote(t *testing.T) {
@@ -30,3 +31,19 @@ func TestTemplateQuote(t *testing.T) {
} }
} }
} }
func TestShortDurationString(t *testing.T) {
for _, test := range []struct {
d time.Duration
expect string
}{
{72 * time.Hour, "3d"},
{-3 * time.Hour, "-3h"},
{12 * time.Minute, "12m"},
{time.Millisecond, "0s"},
} {
if sd := shortDurationString(test.d); sd != test.expect {
t.Errorf("%s = %s; want %s\n", test.d, sd, test.expect)
}
}
}

View File

@@ -47,6 +47,16 @@ func (tw *terminalWriter) log(format string, v ...interface{}) {
} }
} }
// SerializeUpdates returns an update block for a client with a given last line,
// and the index of the first line in the update block.
func (tw *terminalWriter) SerializeUpdates(last int) (string, int) {
if last < 0 || last >= len(tw.lines) {
return "", last
}
top := tw.lines[last].updateGroup
return string(tw.Serialize(top)), top
}
func (tw *terminalWriter) Serialize(top int) []byte { func (tw *terminalWriter) Serialize(top int) []byte {
var b bytes.Buffer var b bytes.Buffer
for i := top; i < len(tw.lines); i++ { for i := top; i < len(tw.lines); i++ {
@@ -104,7 +114,7 @@ func (tw *terminalWriter) processPrint(r rune) {
// Refresh update trackers, if necessary. // Refresh update trackers, if necessary.
if tw.lines[len(tw.lines)-1].updateGroup > tw.line { if tw.lines[len(tw.lines)-1].updateGroup > tw.line {
for i := tw.line; i < len(tw.lines); i++ { for i := tw.line; i < len(tw.lines); i++ {
tw.lines[i].updateGroup = tw.line tw.lines[i].updateGroup = min(tw.lines[i].updateGroup, tw.line)
} }
} }
@@ -200,11 +210,13 @@ func (tw *terminalWriter) processParsedCSI(
if len(params) == 0 { if len(params) == 0 {
tw.line = tw.lineTop tw.line = tw.lineTop
tw.column = 0 tw.column = 0
} else if len(params) >= 2 && params[0] != 0 && params[1] != 0 { } else if len(params) < 2 || params[0] <= 0 || params[1] <= 0 {
return false
} else if params[0] >= 32766 && params[1] >= 32766 {
// Ignore attempts to scan terminal bounds.
} else {
tw.line = tw.lineTop + params[0] - 1 tw.line = tw.lineTop + params[0] - 1
tw.column = params[1] - 1 tw.column = params[1] - 1
} else {
return false
} }
return true return true
case final == 'J': // Erase in Display case final == 'J': // Erase in Display

View File

@@ -1,14 +1,43 @@
package main package main
import "testing" import (
"slices"
"testing"
)
// TODO(p): Add a lot more test cases. // This could be way more extensive, but we're not aiming for perfection.
var tests = []struct { var tests = []struct {
push, want string push, want string
}{ }{
{ {
"\x1bc\x1b[?7l\x1b[2J\x1b[0mSeaBIOS\r", // Escaping and UTF-8.
"SeaBIOS\n", "\x03\x1bž\bř",
"^C^[ř\n",
},
{
// Several kinds of sequences to be ignored.
"\x1bc\x1b[?7l\x1b[2J\x1b[0;1mSeaBIOS\rTea",
"TeaBIOS\n",
},
{
// New origin and absolute positioning.
"Line 1\n\x1bcWine B\nFine 3\x1b[1;6H2\x1b[HL\nL",
"Line 1\nLine 2\nLine 3\n",
},
{
// In-line positioning (without corner cases).
"A\x1b[CB\x1b[2C?\x1b[DC\x1b[2D\b->",
"A B->C\n",
},
{
// Up and down.
"\nB\x1bMA\v\vC" + "\x1b[4EG" + "\x1b[FF" + "\x1b[2FD" + "\x1b[EE",
" A\nB\nC\nD\nE\nF\nG\n",
},
{
// In-line erasing.
"1234\b\b\x1b[K\n5678\b\b\x1b[0K\n" + "abcd\b\b\x1b[1K\nefgh\x1b[2K",
"12\n56\n cd\n\n",
}, },
} }
@@ -42,3 +71,30 @@ Loop:
} }
} }
} }
func TestTerminalUpdateGroups(t *testing.T) {
tw := terminalWriter{}
collect := func() (have []int) {
for _, line := range tw.lines {
have = append(have, line.updateGroup)
}
return
}
// 0: A 0 0 0
// 1: B X 1 1 1
// 2: C Y 1 2 1 1
// 3: Z 2 3 2
// 4: 3 4
tw.Write([]byte("A\nB\nC\x1b[FX\nY\nZ"))
have, want := collect(), []int{0, 1, 1, 3}
if !slices.Equal(want, have) {
t.Errorf("update groups: %+v; want: %+v", have, want)
}
tw.Write([]byte("\x1b[F1\n2\n3"))
have, want = collect(), []int{0, 1, 1, 2, 4}
if !slices.Equal(want, have) {
t.Errorf("update groups: %+v; want: %+v", have, want)
}
}