package main import ( "bytes" "context" "crypto/hmac" "crypto/sha256" "database/sql" "encoding/hex" "encoding/json" "errors" "flag" "fmt" "html/template" "io" "io/ioutil" "log" "net" "net/http" "net/url" "os" "os/exec" "os/signal" "path/filepath" "sort" "strconv" "strings" "sync" "sync/atomic" "syscall" ttemplate "text/template" "time" _ "github.com/mattn/go-sqlite3" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" "gopkg.in/yaml.v3" ) var ( projectName = "acid" projectVersion = "?" gConfigPath string gConfig atomic.Pointer[Config] gDB *sql.DB gNotifierSignal = make(chan struct{}, 1) gExecutorSignal = make(chan struct{}, 1) // The mutex is at least supposed to lock over the tasks as well. gRunningMutex sync.Mutex gRunning = make(map[int64]*RunningTask) ) func getConfig() *Config { return gConfig.Load() } // --- Config ------------------------------------------------------------------ type Config struct { DB string `yaml:"db"` // database file path Listen string `yaml:"listen"` // HTTP listener address Root string `yaml:"root"` // HTTP root URI Gitea string `yaml:"gitea"` // Gitea base URL Secret string `yaml:"secret"` // Gitea hook secret Token string `yaml:"token"` // Gitea API token Notify string `yaml:"notify"` // notifier script Runners map[string]ConfigRunner `yaml:"runners"` // script runners Projects map[string]ConfigProject `yaml:"projects"` // configured projects notifyTemplate *ttemplate.Template } type ConfigRunner struct { Name string `yaml:"name"` // descriptive name Manual bool `yaml:"manual"` // only run on request Run string `yaml:"run"` // runner executable Setup string `yaml:"setup"` // runner setup script (SSH) SSH struct { User string `yaml:"user"` // remote username Address string `yaml:"address"` // TCP host:port Identity string `yaml:"identity"` // private key path } `yaml:"ssh"` // shell access } type ConfigProject struct { Runners map[string]ConfigProjectRunner `yaml:"runners"` } func (cf *ConfigProject) AutomaticRunners() (runners []string) { // We pass through unknown runner names, // so that they can cause reference errors later. config := getConfig() for runner := range cf.Runners { if r, _ := config.Runners[runner]; !r.Manual { runners = append(runners, runner) } } sort.Strings(runners) return } type ConfigProjectRunner struct { Setup string `yaml:"setup"` // project setup script (SSH) Build string `yaml:"build"` // project build script (SSH) Deploy string `yaml:"deploy"` // project deploy script (local) Timeout string `yaml:"timeout"` // timeout duration } // loadConfig reloads configuration. // Beware that changes do not get applied globally at the same moment. func loadConfig() error { new := &Config{} if f, err := os.Open(gConfigPath); err != nil { return err } else if err = yaml.NewDecoder(f).Decode(new); err != nil { return err } if old := getConfig(); old != nil && old.DB != new.DB { return fmt.Errorf("the database file cannot be changed in runtime") } var err error new.notifyTemplate, err = ttemplate.New("notify").Funcs(shellFuncs).Parse(new.Notify) if err != nil { return err } gConfig.Store(new) return nil } var shellFuncs = ttemplate.FuncMap{ "quote": func(word string) string { // History expansion is annoying, don't let it cut us. if strings.IndexRune(word, '!') >= 0 { return "'" + strings.ReplaceAll(word, "'", `'\''`) + "'" } const special = "$`\"\\" quoted := []rune{'"'} for _, r := range word { if strings.IndexRune(special, r) >= 0 { quoted = append(quoted, '\\') } quoted = append(quoted, r) } return string(append(quoted, '"')) }, } // --- Utilities --------------------------------------------------------------- func localShell() string { if shell := os.Getenv("SHELL"); shell != "" { return shell } // The os/user package doesn't store the parsed out shell field. return "/bin/sh" } func giteaSign(b []byte) string { payloadHmac := hmac.New(sha256.New, []byte(getConfig().Secret)) payloadHmac.Write(b) return hex.EncodeToString(payloadHmac.Sum(nil)) } func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) ( *http.Request, error) { req, err := http.NewRequestWithContext( ctx, method, getConfig().Gitea+path, body) if req != nil { req.Header.Set("Authorization", "token "+getConfig().Token) req.Header.Set("Accept", "application/json") } return req, err } func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { rows, err := gDB.QueryContext(ctx, ` SELECT id, owner, repo, hash, runner, created, changed, duration, state, detail, notified, runlog, tasklog, deploylog FROM task `+query, args...) if err != nil { return nil, err } defer rows.Close() tasks := []Task{} for rows.Next() { var t Task err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner, &t.CreatedUnix, &t.ChangedUnix, &t.DurationSeconds, &t.State, &t.Detail, &t.Notified, &t.RunLog, &t.TaskLog, &t.DeployLog) if err != nil { return nil, err } // We could also update some fields from gRunning. tasks = append(tasks, t) } return tasks, rows.Err() } // --- Task views -------------------------------------------------------------- var templateTasks = template.Must(template.New("tasks").Parse(`
ID | Created | Changed | Repository | Hash | Runner | State | Detail | Notified |
---|---|---|---|---|---|---|---|---|
{{.ID}} | {{.CreatedAgo}} | {{.ChangedAgo}} | {{.FullName}} | {{.Hash}} | {{.RunnerName}} | {{.State}} | {{.Detail}} | {{.Notified}} |
{{printf "%s" .RunLog}}
{{printf "%s" .TaskLog}}
{{printf "%s" .DeployLog}}{{if .IsRunning -}} {{end -}} `)) // handlerTask serves as the data for JSON encoding and the task HTML template. // It needs to explicitly include many convenience method results. type handlerTask struct { Task IsRunning bool Created *string // Task.Created?.String() Changed *string // Task.Changed?.String() CreatedAgo string // Task.CreatedAgo() ChangedAgo string // Task.ChangedAgo() Duration *string // Task.Duration?.String() State string // Task.State.String() RunLog string RunLogTop int TaskLog string TaskLogTop int DeployLog string DeployLogTop int } func toNilableString[T fmt.Stringer](stringer *T) *string { if stringer == nil { return nil } s := (*stringer).String() return &s } func newHandlerTask(task Task) handlerTask { return handlerTask{ Task: task, RunLog: string(task.RunLog), TaskLog: string(task.TaskLog), DeployLog: string(task.DeployLog), Created: toNilableString(task.Created()), Changed: toNilableString(task.Changed()), CreatedAgo: task.CreatedAgo(), ChangedAgo: task.ChangedAgo(), Duration: toNilableString(task.Duration()), State: task.State.String(), } } func (ht *handlerTask) updateFromRunning( rt *RunningTask, lastRun, lastTask, lastDeploy int) { ht.IsRunning = true ht.Task.DurationSeconds = rt.elapsed() ht.Duration = toNilableString(ht.Task.Duration()) rt.RunLog.Lock() defer rt.RunLog.Unlock() rt.TaskLog.Lock() defer rt.TaskLog.Unlock() rt.DeployLog.Lock() defer rt.DeployLog.Unlock() ht.RunLog, ht.RunLogTop = rt.RunLog.SerializeUpdates(lastRun) ht.TaskLog, ht.TaskLogTop = rt.TaskLog.SerializeUpdates(lastTask) ht.DeployLog, ht.DeployLogTop = rt.DeployLog.SerializeUpdates(lastDeploy) } func handleTask(w http.ResponseWriter, r *http.Request) { id, err := strconv.Atoi(r.PathValue("id")) if err != nil { http.Error(w, "Invalid ID: "+err.Error(), http.StatusBadRequest) return } tasks, err := getTasks(r.Context(), `WHERE id = ?`, id) if err != nil { http.Error(w, "Error retrieving task: "+err.Error(), http.StatusInternalServerError) return } if len(tasks) == 0 { http.NotFound(w, r) return } // These are intended for running tasks, // so don't reprocess DB logs, which would only help the last update. q := r.URL.Query() lastRun, _ := strconv.Atoi(q.Get("run")) lastTask, _ := strconv.Atoi(q.Get("task")) lastDeploy, _ := strconv.Atoi(q.Get("deploy")) task := newHandlerTask(tasks[0]) func() { gRunningMutex.Lock() defer gRunningMutex.Unlock() if rt, ok := gRunning[task.ID]; ok { task.updateFromRunning( rt, int(lastRun), int(lastTask), int(lastDeploy)) } }() if q.Has("json") { w.Header().Set("Content-Type", "application/json") err = json.NewEncoder(w).Encode(&task) } else { err = templateTask.Execute(w, &task) } if err != nil { log.Println(err) } } // --- Push hook --------------------------------------------------------------- type GiteaPushEvent struct { HeadCommit struct { ID string `json:"id"` } `json:"head_commit"` Repository struct { Name string `json:"name"` FullName string `json:"full_name"` Owner struct { Username string `json:"username"` } `json:"owner"` } `json:"repository"` } func createTasks(ctx context.Context, owner, repo, hash string, runners []string) error { tx, err := gDB.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() stmt, err := tx.Prepare( `INSERT INTO task(owner, repo, hash, runner, created, changed) VALUES (?, ?, ?, ?, unixepoch('now'), unixepoch('now'))`) if err != nil { return err } for _, runner := range runners { if _, err := stmt.Exec(owner, repo, hash, runner); err != nil { return err } } if err := tx.Commit(); err != nil { return err } notifierAwaken() executorAwaken() return nil } func handlePush(w http.ResponseWriter, r *http.Request) { // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. if r.Header.Get("X-Gitea-Event") != "push" { http.Error(w, "Expected a Gitea push event", http.StatusBadRequest) return } body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Error reading request body", http.StatusInternalServerError) return } if r.Header.Get("X-Gitea-Signature") != giteaSign(body) { http.Error(w, "Signature mismatch", http.StatusBadRequest) return } var event GiteaPushEvent if err := json.Unmarshal(body, &event); err != nil { http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest) return } log.Printf("received push: %s %s\n", event.Repository.FullName, event.HeadCommit.ID) project, ok := getConfig().Projects[event.Repository.FullName] if !ok { // This is okay, don't set any commit statuses. fmt.Fprintf(w, "The project is not configured.") return } runners := project.AutomaticRunners() if err := createTasks(r.Context(), event.Repository.Owner.Username, event.Repository.Name, event.HeadCommit.ID, runners); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } // --- RPC --------------------------------------------------------------------- const rpcHeaderSignature = "X-ACID-Signature" var errWrongUsage = errors.New("wrong usage") func rpcRestartOne(ctx context.Context, id int64) error { gRunningMutex.Lock() defer gRunningMutex.Unlock() if _, ok := gRunning[id]; ok { return fmt.Errorf("%d: not restarting running tasks", id) } // The executor bumps to "running" after inserting into gRunning, // so we should not need to exclude that state here. // // We deliberately do not clear previous run data (duration, *log). result, err := gDB.ExecContext(ctx, `UPDATE task SET changed = unixepoch('now'), state = ?, detail = '', notified = 0 WHERE id = ?`, taskStateNew, id) if err != nil { return fmt.Errorf("%d: %w", id, err) } else if n, _ := result.RowsAffected(); n != 1 { return fmt.Errorf("%d: no such task", id) } notifierAwaken() executorAwaken() return nil } func rpcEnqueueOne(ctx context.Context, owner, repo, hash, runner string) error { tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ? AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner) if err != nil { return err } if len(tasks) != 0 { return rpcRestartOne(ctx, tasks[0].ID) } else { return createTasks(ctx, owner, repo, hash, []string{runner}) } } func giteaResolveRef(ctx context.Context, owner, repo, ref string) ( string, error) { req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf( "/api/v1/repos/%s/%s/git/commits/%s", url.PathEscape(owner), url.PathEscape(repo), url.PathEscape(ref)), nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return "", err } commit := struct { SHA string `json:"sha"` }{} if resp.StatusCode != http.StatusOK { return "", errors.New(resp.Status) } else if err := json.Unmarshal(body, &commit); err != nil { return "", err } return commit.SHA, nil } func rpcEnqueue(ctx context.Context, w io.Writer, fs *flag.FlagSet, args []string) error { if err := fs.Parse(args); err != nil { return err } if fs.NArg() < 3 { return errWrongUsage } owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2) hash, err := giteaResolveRef(ctx, owner, repo, ref) if err != nil { return fmt.Errorf("%s: %w", ref, err) } project, ok := getConfig().Projects[owner+"/"+repo] if !ok { return fmt.Errorf("project configuration not found") } runners := fs.Args()[3:] if len(runners) == 0 { runners = project.AutomaticRunners() } for _, runner := range runners { if _, ok := project.Runners[runner]; !ok { return fmt.Errorf("project not configured for runner %s", runner) } } for _, runner := range runners { err := rpcEnqueueOne(ctx, owner, repo, hash, runner) if err != nil { fmt.Fprintf(w, "runner %s: %s\n", runner, err) } } return nil } func rpcRestart(ctx context.Context, w io.Writer, fs *flag.FlagSet, args []string) error { if err := fs.Parse(args); err != nil { return err } ids := []int64{} for _, arg := range fs.Args() { id, err := strconv.ParseInt(arg, 10, 64) if err != nil { return fmt.Errorf("%w: %s", errWrongUsage, err) } ids = append(ids, id) } for _, id := range ids { if err := rpcRestartOne(ctx, id); err != nil { fmt.Fprintln(w, err) } } // Mainly to allow scripts to touch the database directly. if len(ids) == 0 { notifierAwaken() executorAwaken() } return nil } func rpcReload(ctx context.Context, w io.Writer, fs *flag.FlagSet, args []string) error { if err := fs.Parse(args); err != nil { return err } if fs.NArg() > 0 { return errWrongUsage } return loadConfig() } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - var rpcCommands = map[string]struct { // handler must not write anything when returning an error. handler func(context.Context, io.Writer, *flag.FlagSet, []string) error usage string function string }{ "enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...", "Create or restart tasks for the given reference."}, "restart": {rpcRestart, "[ID]...", "Schedule tasks with the given IDs to be rerun."}, "reload": {rpcReload, "", "Reload configuration."}, } func rpcPrintCommands(w io.Writer) { // The alphabetic ordering is unfortunate, but tolerable. keys := []string{} for key := range rpcCommands { keys = append(keys, key) } sort.Strings(keys) fmt.Fprintf(w, "Commands:\n") for _, key := range keys { cmd := rpcCommands[key] fmt.Fprintf(w, " %s [OPTION...] %s\n \t%s\n", key, cmd.usage, cmd.function) } } func handleRPC(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Error reading request body", http.StatusInternalServerError) return } if r.Header.Get(rpcHeaderSignature) != giteaSign(body) { http.Error(w, "Signature mismatch", http.StatusBadRequest) return } var args []string if err := json.Unmarshal(body, &args); err != nil { http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest) return } if len(args) == 0 { http.Error(w, "Missing command", http.StatusBadRequest) return } // Our handling closely follows what the flag package does internally. command, args := args[0], args[1:] cmd, ok := rpcCommands[command] if !ok { http.Error(w, "unknown command: "+command, http.StatusBadRequest) rpcPrintCommands(w) return } // If we redirected the FlagSet straight to the response, // we would be unable to set our own HTTP status. b := bytes.NewBuffer(nil) fs := flag.NewFlagSet(command, flag.ContinueOnError) fs.SetOutput(b) fs.Usage = func() { fmt.Fprintf(fs.Output(), "Usage: %s [OPTION...] %s\n%s\n", fs.Name(), cmd.usage, cmd.function) fs.PrintDefaults() } err = cmd.handler(r.Context(), w, fs, args) // Wrap this error to make it as if fs.Parse discovered the issue. if errors.Is(err, errWrongUsage) { fmt.Fprintln(fs.Output(), err) fs.Usage() } // The flag package first prints all errors that it returns. // If the buffer ends up not being empty, flush it into the request. if b.Len() != 0 { http.Error(w, strings.TrimSpace(b.String()), http.StatusBadRequest) } else if err != nil { http.Error(w, err.Error(), http.StatusUnprocessableEntity) } } // --- Notifier ---------------------------------------------------------------- func notifierRunCommand(ctx context.Context, task Task) { script := bytes.NewBuffer(nil) if err := getConfig().notifyTemplate.Execute(script, &task); err != nil { log.Printf("error: notify: %s", err) return } cmd := exec.CommandContext(ctx, localShell()) cmd.Stdin = script cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Run(); err != nil { log.Printf("error: notify: %s", err) } } func notifierNotify(ctx context.Context, task Task) error { // Loosely assuming that this only runs on state changes. if task.State != taskStateNew && task.State != taskStateRunning { go notifierRunCommand(ctx, task) } payload := struct { Context string `json:"context"` Description string `json:"description"` State string `json:"state"` TargetURL string `json:"target_url"` }{} config := getConfig() runner, ok := config.Runners[task.Runner] if !ok { log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner) return nil } payload.Context = runner.Name payload.TargetURL = fmt.Sprintf("%s/task/%d", config.Root, task.ID) switch task.State { case taskStateNew: payload.State, payload.Description = "pending", "Pending" case taskStateRunning: payload.State, payload.Description = "pending", "Running" case taskStateError: payload.State, payload.Description = "error", "Error" case taskStateFailed: payload.State, payload.Description = "failure", "Failure" case taskStateSuccess: payload.State, payload.Description = "success", "Success" default: log.Printf("task %d is in unknown state %d\n", task.ID, task.State) return nil } // We should only fill this in case we have some specific information. if task.Detail != "" { payload.Description = task.Detail } body, err := json.Marshal(payload) if err != nil { return err } log.Printf("task %d for %s: notifying: %s: %s: %s (%s)\n", task.ID, task.FullName(), task.Hash, payload.Context, payload.State, payload.Description) req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf( "/api/v1/repos/%s/%s/statuses/%s", url.PathEscape(task.Owner), url.PathEscape(task.Repo), url.PathEscape(task.Hash)), bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() body, err = ioutil.ReadAll(resp.Body) if err != nil { return err } _, err = gDB.ExecContext(ctx, `UPDATE task SET notified = 1 WHERE id = ? AND state = ? AND detail = ? AND notified = 0`, task.ID, task.State, task.Detail) return err } func notifierRun(ctx context.Context) error { tasks, err := getTasks(ctx, `WHERE notified = 0 ORDER BY id ASC`) if err != nil { return err } for _, task := range tasks { if err := notifierNotify(ctx, task); err != nil { return fmt.Errorf( "task %d for %s: %w", task.ID, task.FullName(), err) } } return nil } func notifier(ctx context.Context) { for { select { case <-gNotifierSignal: case <-ctx.Done(): return } if err := notifierRun(ctx); err != nil { log.Printf("error: notifier: %s\n", err) } } } func notifierAwaken() { select { case gNotifierSignal <- struct{}{}: default: } } // --- Executor ---------------------------------------------------------------- // ~~~ Running task ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // RunningTask stores all data pertaining to a currently running task. type RunningTask struct { DB Task Runner ConfigRunner ProjectRunner ConfigProjectRunner RunLog terminalWriter TaskLog terminalWriter DeployLog terminalWriter wd string // acid working directory timeout time.Duration // time limit on task execution signer ssh.Signer // SSH private key tmplScript *ttemplate.Template // remote build script tmplDeploy *ttemplate.Template // local deployment script } // newRunningTask prepares a task for running, without executing anything yet. func newRunningTask(task Task) (*RunningTask, error) { rt := &RunningTask{DB: task} config := getConfig() // This is for our own tracking, not actually written to database. rt.DB.ChangedUnix = time.Now().Unix() var ok bool rt.Runner, ok = config.Runners[rt.DB.Runner] if !ok { return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner) } project, ok := config.Projects[rt.DB.FullName()] if !ok { return nil, fmt.Errorf("project configuration not found") } rt.ProjectRunner, ok = project.Runners[rt.DB.Runner] if !ok { return nil, fmt.Errorf( "project not configured for runner %s", rt.DB.Runner) } var err error if rt.wd, err = os.Getwd(); err != nil { return nil, err } // Lenient or not, some kind of a time limit is desirable. rt.timeout = time.Hour if rt.ProjectRunner.Timeout != "" { rt.timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout) if err != nil { return nil, fmt.Errorf("timeout: %w", err) } } privateKey, err := os.ReadFile(rt.Runner.SSH.Identity) if err != nil { return nil, fmt.Errorf( "cannot read SSH identity for runner %s: %w", rt.DB.Runner, err) } rt.signer, err = ssh.ParsePrivateKey(privateKey) if err != nil { return nil, fmt.Errorf( "cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err) } // The runner setup script may change the working directory, // so do everything in one go. However, this approach also makes it // difficult to distinguish project-independent runner failures. // (For that, we can start multiple ssh.Sessions.) // // We could pass variables through SSH environment variables, // which would require enabling PermitUserEnvironment in sshd_config, // or through prepending script lines, but templates are a bit simpler. // // We let the runner itself clone the repository: // - it is a more flexible in that it can test AUR packages more normally, // - we might have to clone submodules as well. // Otherwise, we could download a source archive from Gitea, // and use SFTP to upload it to the runner. rt.tmplScript, err = ttemplate.New("script").Funcs(shellFuncs). Parse(rt.Runner.Setup + "\n" + rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build) if err != nil { return nil, fmt.Errorf("script/build: %w", err) } rt.tmplDeploy, err = ttemplate.New("deploy").Funcs(shellFuncs). Parse(rt.ProjectRunner.Deploy) if err != nil { return nil, fmt.Errorf("script/deploy: %w", err) } if os.Getenv("ACID_TERMINAL_DEBUG") != "" { base := filepath.Join(executorTmpDir("/tmp"), fmt.Sprintf("acid-%d-%s-%s-%s-", task.ID, task.Owner, task.Repo, task.Runner)) rt.RunLog.Tee, _ = os.Create(base + "runlog") rt.TaskLog.Tee, _ = os.Create(base + "tasklog") // The deployment log should not be interesting. } return rt, nil } func (rt *RunningTask) close() { for _, tee := range []io.WriteCloser{ rt.RunLog.Tee, rt.TaskLog.Tee, rt.DeployLog.Tee} { if tee != nil { tee.Close() } } } // localEnv creates a process environment for locally run executables. func (rt *RunningTask) localEnv() []string { return append(os.Environ(), "ACID_ROOT="+rt.wd, "ACID_RUNNER="+rt.DB.Runner, ) } func (rt *RunningTask) elapsed() int64 { return int64(time.Since(time.Unix(rt.DB.ChangedUnix, 0)).Seconds()) } // update stores the running task's state in the database. func (rt *RunningTask) update() error { for _, i := range []struct { tw *terminalWriter log *[]byte }{ {&rt.RunLog, &rt.DB.RunLog}, {&rt.TaskLog, &rt.DB.TaskLog}, {&rt.DeployLog, &rt.DB.DeployLog}, } { i.tw.Lock() defer i.tw.Unlock() if *i.log = i.tw.Serialize(0); *i.log == nil { *i.log = []byte{} } } rt.DB.DurationSeconds = rt.elapsed() return rt.DB.update() } // ~~~ Deploy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ func executorDownloadNode(sc *sftp.Client, remotePath, localPath string, info os.FileInfo) error { if info.IsDir() { // Hoping that caller invokes us on parents first. return os.MkdirAll(localPath, info.Mode().Perm()) } src, err := sc.Open(remotePath) if err != nil { return fmt.Errorf("failed to open remote file %s: %w", remotePath, err) } defer src.Close() dst, err := os.OpenFile( localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm()) if err != nil { return fmt.Errorf("failed to create local file: %w", err) } defer dst.Close() if _, err = io.Copy(dst, src); err != nil { return fmt.Errorf("failed to copy file from remote %s to local %s: %w", remotePath, localPath, err) } return nil } func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error { sc, err := sftp.NewClient(client) if err != nil { return err } defer sc.Close() walker := sc.Walk(remoteRoot) for walker.Step() { if walker.Err() != nil { return walker.Err() } relativePath, err := filepath.Rel(remoteRoot, walker.Path()) if err != nil { return err } if err = executorDownloadNode(sc, walker.Path(), filepath.Join(localRoot, relativePath), walker.Stat()); err != nil { return err } } return nil } func executorTmpDir(fallback string) string { // See also: https://systemd.io/TEMPORARY_DIRECTORIES/ if tmp := os.Getenv("TMPDIR"); tmp != "" { return tmp } return fallback } func executorDeploy( ctx context.Context, client *ssh.Client, rt *RunningTask) error { script := bytes.NewBuffer(nil) if err := rt.tmplDeploy.Execute(script, &rt.DB); err != nil { return &executorError{"Deploy template failed", err} } // Thus the deployment directory must exist iff the script is not empty. if script.Len() == 0 { return nil } // We expect the files to be moved elsewhere on the filesystem, // and they may get very large, so avoid /tmp. dir := filepath.Join(executorTmpDir("/var/tmp"), "acid-deploy") if err := os.RemoveAll(dir); err != nil { return err } if err := os.Mkdir(dir, 0755); err != nil { return err } // The passed remoteRoot is relative to sc.Getwd. if err := executorDownload(client, "acid-deploy", dir); err != nil { return err } cmd := exec.CommandContext(ctx, localShell()) cmd.Env = rt.localEnv() cmd.Dir = dir cmd.Stdin = script cmd.Stdout = &rt.DeployLog cmd.Stderr = &rt.DeployLog return cmd.Run() } // ~~~ Build ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ func executorBuild( ctx context.Context, client *ssh.Client, rt *RunningTask) error { // This is here to fail early, though logically it is misplaced. script := bytes.NewBuffer(nil) if err := rt.tmplScript.Execute(script, &rt.DB); err != nil { return &executorError{"Script template failed", err} } session, err := client.NewSession() if err != nil { return &executorError{"SSH failure", err} } defer session.Close() modes := ssh.TerminalModes{ssh.ECHO: 0} if err := session.RequestPty("dumb", 24, 80, modes); err != nil { return &executorError{"SSH failure", err} } log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName()) session.Stdout = &rt.TaskLog session.Stderr = &rt.TaskLog // Although passing the script directly takes away the option to specify // a particular shell (barring here-documents!), it is simple and reliable. // // Passing the script over Stdin to sh tended to end up with commands // eating the script during processing, and resulted in a hang, // because closing the Stdin does not result in remote processes // getting a stream of EOF. // // Piping the script into `cat | sh` while appending a ^D to the end of it // appeared to work, but it seems likely that commands might still steal // script bytes from the cat program if they choose to read from the tty // and the script is longer than the buffer. chSession := make(chan error, 1) go func() { chSession <- session.Run(script.String()) close(chSession) }() select { case <-ctx.Done(): // Either shutdown, or early runner termination. // The runner is not supposed to finish before the session. err = context.Cause(ctx) case err = <-chSession: // Killing a runner may perfectly well trigger this first, // in particular when it's on the same machine. } return err } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // executorError describes a taskStateError. type executorError struct { Detail string Err error } func (e *executorError) Unwrap() error { return e.Err } func (e *executorError) Error() string { return fmt.Sprintf("%s: %s", e.Detail, e.Err) } func executorConnect( ctx context.Context, config *ssh.ClientConfig, address string) ( *ssh.Client, error) { deadline := time.Now().Add(3 * time.Minute) ctxDeadlined, cancel := context.WithDeadline(ctx, deadline) defer cancel() var d net.Dialer for { // net.DNSError eats the cause, as in it cannot be unwrapped // and tested for a particular subtype. conn, err := d.DialContext(ctxDeadlined, "tcp", address) if e := ctxDeadlined.Err(); e != nil { // This may provide a little bit more information. if err != nil { return nil, err } return nil, e } if err != nil { time.Sleep(1 * time.Second) continue } // We ignore the parent context, but at least we try. conn.SetDeadline(deadline) sc, chans, reqs, err := ssh.NewClientConn(conn, address, config) conn.SetDeadline(time.Time{}) // cloud-init-enabled machines, such as OpenBSD, // may have a race condition between sshd starting for the first time, // and having a configured user. // // Authentication may therefore regularly fail, // and we need to ignore all errors whatsoever, // not just spurious partial successes resulting in RST or FIN. var neterr net.Error if errors.As(err, &neterr) || errors.Is(err, io.EOF) || err != nil { time.Sleep(1 * time.Second) continue } return ssh.NewClient(sc, chans, reqs), nil } } func executorRunTask(ctx context.Context, task Task) error { rt, err := newRunningTask(task) if err != nil { task.DurationSeconds = 0 task.State, task.Detail = taskStateError, "Misconfigured" task.Notified = 0 task.RunLog = []byte(err.Error()) task.TaskLog = []byte{} task.DeployLog = []byte{} return task.update() } defer rt.close() ctx, cancelTimeout := context.WithTimeout(ctx, rt.timeout) defer cancelTimeout() // RunningTasks can be concurrently accessed by HTTP handlers. locked := func(f func()) { gRunningMutex.Lock() defer gRunningMutex.Unlock() f() } locked(func() { rt.DB.DurationSeconds = 0 rt.DB.State, rt.DB.Detail = taskStateRunning, "" rt.DB.Notified = 0 rt.DB.RunLog = []byte{} rt.DB.TaskLog = []byte{} rt.DB.DeployLog = []byte{} gRunning[rt.DB.ID] = rt }) defer locked(func() { delete(gRunning, rt.DB.ID) }) if err := rt.update(); err != nil { return fmt.Errorf("SQL: %w", err) } // Errors happening while trying to write an error are unfortunate, // but not important enough to abort entirely. setError := func(detail string) { rt.DB.State, rt.DB.Detail = taskStateError, detail if err := rt.update(); err != nil { log.Printf("error: task %d for %s: SQL: %s", rt.DB.ID, rt.DB.FullName(), err) } } cmd := exec.CommandContext(ctx, rt.Runner.Run) cmd.Env = rt.localEnv() // Pushing the runner into a new process group that can be killed at once // with all its children isn't bullet-proof, it messes with job control // when acid is run from an interactive shell, and it also seems avoidable // (use "exec" in runner scripts, so that VMs take over the process). // Maybe this is something that could be opt-in. /* cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} cmd.Cancel = func() error { err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) if err == syscall.ESRCH { return os.ErrProcessDone } return err } */ log.Printf("task %d for %s: starting %s\n", rt.DB.ID, rt.DB.FullName(), rt.Runner.Name) cmd.Stdout = &rt.RunLog cmd.Stderr = &rt.RunLog if err := cmd.Start(); err != nil { fmt.Fprintf(&rt.TaskLog, "%s\n", err) locked(func() { setError("Runner failed to start") }) return err } ctxRunner, cancelRunner := context.WithCancelCause(ctx) defer cancelRunner(context.Canceled) go func() { if err := cmd.Wait(); err != nil { cancelRunner(err) } else { cancelRunner(errors.New("runner exited successfully but early")) } }() defer func() { _ = cmd.Process.Signal(os.Interrupt) select { case <-ctxRunner.Done(): // This doesn't leave the runner almost any time on our shutdown, // but whatever--they're supposed to be ephemeral. case <-time.After(5 * time.Second): } _ = cmd.Cancel() }() client, err := executorConnect(ctxRunner, &ssh.ClientConfig{ User: rt.Runner.SSH.User, Auth: []ssh.AuthMethod{ssh.PublicKeys(rt.signer)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), }, rt.Runner.SSH.Address) if err != nil { fmt.Fprintf(&rt.TaskLog, "%s\n", err) locked(func() { setError("SSH failure") }) return err } defer client.Close() var ( eeSSH *ssh.ExitError eeExec *exec.ExitError ee3 *executorError ) err = executorBuild(ctxRunner, client, rt) if err != nil { locked(func() { if errors.As(err, &eeSSH) { rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed" fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) } else if errors.As(err, &ee3) { rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee3.Err) } else { rt.DB.State, rt.DB.Detail = taskStateError, "" fmt.Fprintf(&rt.TaskLog, "\n%s\n", err) } }) return rt.update() } // This is so that it doesn't stay hanging within the sftp package, // which uses context.Background() everywhere. go func() { <-ctxRunner.Done() client.Close() }() err = executorDeploy(ctxRunner, client, rt) locked(func() { if err == nil { rt.DB.State, rt.DB.Detail = taskStateSuccess, "" } else if errors.As(err, &eeExec) { rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed" fmt.Fprintf(&rt.DeployLog, "\n%s\n", err) } else if errors.As(err, &ee3) { rt.DB.State, rt.DB.Detail = taskStateError, ee3.Detail fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee3.Err) } else { rt.DB.State, rt.DB.Detail = taskStateError, "" fmt.Fprintf(&rt.DeployLog, "\n%s\n", err) } }) return rt.update() } func executorRun(ctx context.Context) error { tasks, err := getTasks(ctx, `WHERE state = ? OR state = ? ORDER BY id ASC`, taskStateNew, taskStateRunning) if err != nil { return err } for _, task := range tasks { if err := executorRunTask(ctx, task); err != nil { return fmt.Errorf("task %d for %s: %w", task.ID, task.FullName(), err) } } return nil } func executor(ctx context.Context) { for { select { case <-gExecutorSignal: case <-ctx.Done(): return } if err := executorRun(ctx); err != nil { log.Printf("error: executor: %s\n", err) } } } func executorAwaken() { select { case gExecutorSignal <- struct{}{}: default: } } // --- Main -------------------------------------------------------------------- type taskState int64 const ( taskStateNew taskState = iota // → · pending (queued) taskStateRunning // → · pending (running) taskStateError // → ! error (internal issue) taskStateFailed // → × failure (runner issue) taskStateSuccess // → ✓ success (runner finished) ) func (ts taskState) String() string { switch ts { case taskStateNew: return "New" case taskStateRunning: return "Running" case taskStateError: return "Error" case taskStateFailed: return "Failed" case taskStateSuccess: return "Success" default: return fmt.Sprintf("%d", ts) } } // Task mirrors SQL task table records, adding a few convenience methods. type Task struct { ID int64 Owner string Repo string Hash string Runner string // True database names for these are occupied by accessors. CreatedUnix int64 ChangedUnix int64 DurationSeconds int64 State taskState Detail string Notified int64 RunLog []byte TaskLog []byte DeployLog []byte } func (t *Task) FullName() string { return t.Owner + "/" + t.Repo } func (t *Task) RunnerName() string { if runner, ok := getConfig().Runners[t.Runner]; !ok { return t.Runner } else { return runner.Name } } func (t *Task) URL() string { return fmt.Sprintf("%s/task/%d", getConfig().Root, t.ID) } func (t *Task) RepoURL() string { return fmt.Sprintf("%s/%s/%s", getConfig().Gitea, t.Owner, t.Repo) } func (t *Task) CommitURL() string { return fmt.Sprintf("%s/%s/%s/commit/%s", getConfig().Gitea, t.Owner, t.Repo, t.Hash) } func (t *Task) CloneURL() string { return fmt.Sprintf("%s/%s/%s.git", getConfig().Gitea, t.Owner, t.Repo) } func shortDurationString(d time.Duration) string { if d.Abs() >= 24*time.Hour { return strconv.FormatInt(int64(d/time.Hour/24), 10) + "d" } else if d.Abs() >= time.Hour { return strconv.FormatInt(int64(d/time.Hour), 10) + "h" } else if d.Abs() >= time.Minute { return strconv.FormatInt(int64(d/time.Minute), 10) + "m" } else { return strconv.FormatInt(int64(d/time.Second), 10) + "s" } } func (t *Task) Created() *time.Time { if t.CreatedUnix == 0 { return nil } tt := time.Unix(t.CreatedUnix, 0) return &tt } func (t *Task) Changed() *time.Time { if t.ChangedUnix == 0 { return nil } tt := time.Unix(t.ChangedUnix, 0) return &tt } func (t *Task) CreatedAgo() string { if t.CreatedUnix == 0 { return "" } return shortDurationString(time.Since(*t.Created())) } func (t *Task) ChangedAgo() string { if t.ChangedUnix == 0 { return "" } return shortDurationString(time.Since(*t.Changed())) } func (t *Task) Duration() *time.Duration { if t.DurationSeconds == 0 { return nil } td := time.Duration(t.DurationSeconds * int64(time.Second)) return &td } func (t *Task) update() error { _, err := gDB.ExecContext(context.Background(), `UPDATE task SET changed = unixepoch('now'), duration = ?, state = ?, detail = ?, notified = ?, runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`, t.DurationSeconds, t.State, t.Detail, t.Notified, t.RunLog, t.TaskLog, t.DeployLog, t.ID) if err == nil { notifierAwaken() } return err } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - const initializeSQL = ` PRAGMA application_id = 0x61636964; -- "acid" in big endian CREATE TABLE IF NOT EXISTS task( id INTEGER NOT NULL, -- unique ID owner TEXT NOT NULL, -- Gitea username repo TEXT NOT NULL, -- Gitea repository name hash TEXT NOT NULL, -- commit hash runner TEXT NOT NULL, -- the runner to use created INTEGER NOT NULL DEFAULT 0, -- creation timestamp changed INTEGER NOT NULL DEFAULT 0, -- last state change timestamp duration INTEGER NOT NULL DEFAULT 0, -- duration of last run state INTEGER NOT NULL DEFAULT 0, -- task state detail TEXT NOT NULL DEFAULT '', -- task state detail notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output deploylog BLOB NOT NULL DEFAULT x'', -- deployment output PRIMARY KEY (id) ) STRICT; ` func dbEnsureColumn(tx *sql.Tx, table, column, definition string) error { var count int64 if err := tx.QueryRow( `SELECT count(*) FROM pragma_table_info(?) WHERE name = ?`, table, column).Scan(&count); err != nil { return err } else if count == 1 { return nil } _, err := tx.Exec( `ALTER TABLE ` + table + ` ADD COLUMN ` + column + ` ` + definition) return err } func dbOpen(path string) error { var err error gDB, err = sql.Open("sqlite3", "file:"+path+"?_foreign_keys=1&_busy_timeout=1000") if err != nil { return err } tx, err := gDB.BeginTx(context.Background(), nil) if err != nil { return err } defer tx.Rollback() var version int64 if err = tx.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil { return err } switch version { case 0: if _, err = tx.Exec(initializeSQL); err != nil { return err } // We had not initially set a database schema version, // so we're stuck checking this column even on new databases. if err = dbEnsureColumn(tx, `task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil { return err } case 1: if err = dbEnsureColumn(tx, `task`, `created`, `INTEGER NOT NULL DEFAULT 0`); err != nil { return err } if err = dbEnsureColumn(tx, `task`, `changed`, `INTEGER NOT NULL DEFAULT 0`); err != nil { return err } if err = dbEnsureColumn(tx, `task`, `duration`, `INTEGER NOT NULL DEFAULT 0`); err != nil { return err } case 2: // The next migration goes here, remember to increment the number below. } if _, err = tx.Exec( `PRAGMA user_version = ` + strconv.Itoa(2)); err != nil { return err } return tx.Commit() } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // callRPC forwards command line commands to a running server. func callRPC(args []string) error { body, err := json.Marshal(args) if err != nil { return err } req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/rpc", getConfig().Root), bytes.NewReader(body)) if err != nil { return err } req.Header.Set(rpcHeaderSignature, giteaSign(body)) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if _, err = io.Copy(os.Stdout, resp.Body); err != nil { return err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { os.Exit(1) } return nil } // filterTTY exposes the internal virtual terminal filter. func filterTTY(path string) { var r io.Reader = os.Stdin if path != "-" { if f, err := os.Open(path); err != nil { log.Println(err) } else { r = f defer f.Close() } } var tw terminalWriter if _, err := io.Copy(&tw, r); err != nil { log.Printf("%s: %s\n", path, err) } if _, err := os.Stdout.Write(tw.Serialize(0)); err != nil { log.Printf("%s: %s\n", path, err) } } func main() { version := flag.Bool("version", false, "show version and exit") tty := flag.Bool("tty", false, "run the internal virtual terminal filter") flag.Usage = func() { f := flag.CommandLine.Output() fmt.Fprintf(f, "Usage: %s [OPTION]... CONFIG [COMMAND...]\n", os.Args[0]) flag.PrintDefaults() } flag.Parse() if flag.NArg() < 1 { flag.Usage() os.Exit(2) } if *version { fmt.Printf("%s %s\n", projectName, projectVersion) return } if *tty { for _, path := range flag.Args() { filterTTY(path) } return } gConfigPath = flag.Arg(0) if err := loadConfig(); err != nil { log.Fatalln(err) } if flag.NArg() > 1 { if err := callRPC(flag.Args()[1:]); err != nil { log.Fatalln(err) } return } if err := dbOpen(getConfig().DB); err != nil { log.Fatalln(err) } defer gDB.Close() var wg sync.WaitGroup ctx, stop := signal.NotifyContext( context.Background(), syscall.SIGINT, syscall.SIGTERM) server := &http.Server{Addr: getConfig().Listen} http.HandleFunc("/{$}", handleTasks) http.HandleFunc("/task/{id}", handleTask) http.HandleFunc("/push", handlePush) http.HandleFunc("/rpc", handleRPC) ln, err := (&net.ListenConfig{}).Listen(ctx, "tcp", server.Addr) if err != nil { log.Fatalln(err) } notifierAwaken() wg.Add(1) go func() { defer wg.Done() notifier(ctx) }() executorAwaken() wg.Add(1) go func() { defer wg.Done() executor(ctx) }() wg.Add(1) go func() { defer wg.Done() defer stop() if err := server.Serve(ln); err != http.ErrServerClosed { log.Println(err) } }() // Wait until we either receive a signal, or get a server failure. <-ctx.Done() log.Println("shutting down") wg.Add(1) go func() { defer wg.Done() if err := server.Shutdown(context.Background()); err != nil { log.Println(err) } }() // Repeated signal deliveries during shutdown assume default behaviour. // This might or might not be desirable. stop() wg.Wait() }