From fe81d713e1c59f2175974f0bc3eda5ff7a5f0749 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20Eric=20Janouch?= Date: Sun, 14 Apr 2024 22:24:33 +0200 Subject: [PATCH] Add an enqueue command --- acid.adoc | 6 +- acid.go | 186 ++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 156 insertions(+), 36 deletions(-) diff --git a/acid.adoc b/acid.adoc index e72be60..4c56562 100644 --- a/acid.adoc +++ b/acid.adoc @@ -28,8 +28,12 @@ Options Commands -------- -*restart* _ID_...:: +*enqueue* _OWNER_ _REPO_ _REF_ [_RUNNER_]...:: + Create new or restart existing tasks for the given reference, + which will be resolved to a full commit hash. +*restart* [_ID_]...:: Schedule tasks with the given IDs to be rerun. + Run this command without arguments to pick up external database changes. Configuration ------------- diff --git a/acid.go b/acid.go index 64d69dc..b029c2a 100644 --- a/acid.go +++ b/acid.go @@ -17,6 +17,7 @@ import ( "log" "net" "net/http" + "net/url" "os" "os/exec" "os/signal" @@ -96,7 +97,24 @@ func parseConfig(path string) error { return err } -// --- Task views -------------------------------------------------------------- +// --- Utilities --------------------------------------------------------------- + +func giteaSign(b []byte) string { + payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) + payloadHmac.Write(b) + return hex.EncodeToString(payloadHmac.Sum(nil)) +} + +func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) ( + *http.Request, error) { + req, err := http.NewRequestWithContext( + ctx, method, gConfig.Gitea+path, body) + if req != nil { + req.Header.Set("Authorization", "token "+gConfig.Token) + req.Header.Set("Accept", "application/json") + } + return req, err +} func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { rows, err := gDB.QueryContext(ctx, ` @@ -120,6 +138,8 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { return tasks, rows.Err() } +// --- Task views -------------------------------------------------------------- + var templateTasks = template.Must(template.New("tasks").Parse(` @@ -301,12 +321,6 @@ func createTasks(ctx context.Context, return nil } -func giteaSign(b []byte) string { - payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret)) - payloadHmac.Write(b) - return hex.EncodeToString(payloadHmac.Sum(nil)) -} - func handlePush(w http.ResponseWriter, r *http.Request) { // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. if r.Header.Get("X-Gitea-Event") != "push" { @@ -364,6 +378,120 @@ const rpcHeaderSignature = "X-ACID-Signature" var errWrongUsage = errors.New("wrong usage") +func rpcRestartOne(ctx context.Context, id int64) error { + gRunningMutex.Lock() + defer gRunningMutex.Unlock() + + if _, ok := gRunning[id]; ok { + return fmt.Errorf("%d: not restarting running tasks", id) + } + + // The executor bumps to "running" after inserting into gRunning, + // so we should not need to exclude that state here. + result, err := gDB.ExecContext(ctx, `UPDATE task + SET state = ?, detail = '', notified = 0 WHERE id = ?`, + taskStateNew, id) + if err != nil { + return fmt.Errorf("%d: %w", id, err) + } else if n, _ := result.RowsAffected(); n != 1 { + return fmt.Errorf("%d: no such task", id) + } + + notifierAwaken() + executorAwaken() + return nil +} + +func rpcEnqueueOne(ctx context.Context, + owner, repo, hash, runner string) error { + tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ? + AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner) + if err != nil { + return err + } + + if len(tasks) != 0 { + return rpcRestartOne(ctx, tasks[0].ID) + } else { + return createTasks(ctx, owner, repo, hash, []string{runner}) + } +} + +func giteaResolveRef(ctx context.Context, owner, repo, ref string) ( + string, error) { + req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf( + "/api/v1/repos/%s/%s/git/commits/%s", + url.PathEscape(owner), + url.PathEscape(repo), + url.PathEscape(ref)), nil) + if err != nil { + return "", err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + commit := struct { + SHA string `json:"sha"` + }{} + if resp.StatusCode != http.StatusOK { + return "", errors.New(resp.Status) + } else if err := json.Unmarshal(body, &commit); err != nil { + return "", err + } + return commit.SHA, nil +} + +func rpcEnqueue(ctx context.Context, + w io.Writer, fs *flag.FlagSet, args []string) error { + if err := fs.Parse(args); err != nil { + return err + } + if fs.NArg() < 3 { + return errWrongUsage + } + + owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2) + hash, err := giteaResolveRef(ctx, owner, repo, ref) + if err != nil { + return fmt.Errorf("%s: %w", ref, err) + } + + project, ok := gConfig.Projects[owner+"/"+repo] + if !ok { + return fmt.Errorf("project configuration not found") + } + + runners := fs.Args()[3:] + if len(runners) == 0 { + for runner := range project.Runners { + runners = append(runners, runner) + } + } + sort.Strings(runners) + + for _, runner := range runners { + if _, ok := project.Runners[runner]; !ok { + return fmt.Errorf("project not configured for runner %s", runner) + } + } + for _, runner := range runners { + err := rpcEnqueueOne(ctx, owner, repo, hash, runner) + if err != nil { + fmt.Fprintf(w, "runner %s: %s\n", runner, err) + } + } + return nil +} + func rpcRestart(ctx context.Context, w io.Writer, fs *flag.FlagSet, args []string) error { if err := fs.Parse(args); err != nil { @@ -378,29 +506,17 @@ func rpcRestart(ctx context.Context, } ids = append(ids, id) } - - gRunningMutex.Lock() - defer gRunningMutex.Unlock() - for _, id := range ids { - if _, ok := gRunning[id]; ok { - fmt.Fprintf(w, "%d: not restarting running tasks\n", id) - continue - } - - // The executor bumps to "running" after inserting into gRunning, - // so we should not need to exclude that state here. - result, err := gDB.ExecContext(ctx, `UPDATE task - SET state = ?, detail = '', notified = 0 WHERE id = ?`, - taskStateNew, id) - if err != nil { - fmt.Fprintf(w, "%d: %s\n", id, err) - } else if n, _ := result.RowsAffected(); n != 1 { - fmt.Fprintf(w, "%d: no such task\n", id) + if err := rpcRestartOne(ctx, id); err != nil { + fmt.Fprintln(w, err) } } - notifierAwaken() - executorAwaken() + + // Mainly to allow scripts to touch the database directly. + if len(ids) == 0 { + notifierAwaken() + executorAwaken() + } return nil } @@ -412,7 +528,9 @@ var rpcCommands = map[string]struct { usage string function string }{ - "restart": {rpcRestart, "ID...", + "enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...", + "Create or restart tasks for the given reference."}, + "restart": {rpcRestart, "[ID]...", "Schedule tasks with the given IDs to be rerun."}, } @@ -565,18 +683,16 @@ func notifierNotify(ctx context.Context, task Task) error { task.ID, task.FullName(), task.Hash, payload.Context, payload.State, payload.Description) - uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s", - gConfig.Gitea, task.Owner, task.Repo, task.Hash) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, - bytes.NewReader(body)) + req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf( + "/api/v1/repos/%s/%s/statuses/%s", + url.PathEscape(task.Owner), + url.PathEscape(task.Repo), + url.PathEscape(task.Hash)), bytes.NewReader(body)) if err != nil { return err } - req.Header.Set("Accept", "application/json") - req.Header.Set("Authorization", "token "+gConfig.Token) req.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) if err != nil { return err