Add an enqueue command

This commit is contained in:
Přemysl Eric Janouch 2024-04-14 22:24:33 +02:00
parent eda0f22f07
commit fe81d713e1
Signed by: p
GPG Key ID: A0420B94F92B9493
2 changed files with 156 additions and 36 deletions

View File

@ -28,8 +28,12 @@ Options
Commands Commands
-------- --------
*restart* _ID_...:: *enqueue* _OWNER_ _REPO_ _REF_ [_RUNNER_]...::
Create new or restart existing tasks for the given reference,
which will be resolved to a full commit hash.
*restart* [_ID_]...::
Schedule tasks with the given IDs to be rerun. Schedule tasks with the given IDs to be rerun.
Run this command without arguments to pick up external database changes.
Configuration Configuration
------------- -------------

182
acid.go
View File

@ -17,6 +17,7 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"net/url"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@ -96,7 +97,24 @@ func parseConfig(path string) error {
return err return err
} }
// --- Task views -------------------------------------------------------------- // --- Utilities ---------------------------------------------------------------
func giteaSign(b []byte) string {
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil))
}
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
*http.Request, error) {
req, err := http.NewRequestWithContext(
ctx, method, gConfig.Gitea+path, body)
if req != nil {
req.Header.Set("Authorization", "token "+gConfig.Token)
req.Header.Set("Accept", "application/json")
}
return req, err
}
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, ` rows, err := gDB.QueryContext(ctx, `
@ -120,6 +138,8 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
return tasks, rows.Err() return tasks, rows.Err()
} }
// --- Task views --------------------------------------------------------------
var templateTasks = template.Must(template.New("tasks").Parse(` var templateTasks = template.Must(template.New("tasks").Parse(`
<!DOCTYPE html> <!DOCTYPE html>
<html> <html>
@ -301,12 +321,6 @@ func createTasks(ctx context.Context,
return nil return nil
} }
func giteaSign(b []byte) string {
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil))
}
func handlePush(w http.ResponseWriter, r *http.Request) { func handlePush(w http.ResponseWriter, r *http.Request) {
// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
if r.Header.Get("X-Gitea-Event") != "push" { if r.Header.Get("X-Gitea-Event") != "push" {
@ -364,6 +378,120 @@ const rpcHeaderSignature = "X-ACID-Signature"
var errWrongUsage = errors.New("wrong usage") var errWrongUsage = errors.New("wrong usage")
func rpcRestartOne(ctx context.Context, id int64) error {
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
if _, ok := gRunning[id]; ok {
return fmt.Errorf("%d: not restarting running tasks", id)
}
// The executor bumps to "running" after inserting into gRunning,
// so we should not need to exclude that state here.
result, err := gDB.ExecContext(ctx, `UPDATE task
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id)
if err != nil {
return fmt.Errorf("%d: %w", id, err)
} else if n, _ := result.RowsAffected(); n != 1 {
return fmt.Errorf("%d: no such task", id)
}
notifierAwaken()
executorAwaken()
return nil
}
func rpcEnqueueOne(ctx context.Context,
owner, repo, hash, runner string) error {
tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ?
AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner)
if err != nil {
return err
}
if len(tasks) != 0 {
return rpcRestartOne(ctx, tasks[0].ID)
} else {
return createTasks(ctx, owner, repo, hash, []string{runner})
}
}
func giteaResolveRef(ctx context.Context, owner, repo, ref string) (
string, error) {
req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf(
"/api/v1/repos/%s/%s/git/commits/%s",
url.PathEscape(owner),
url.PathEscape(repo),
url.PathEscape(ref)), nil)
if err != nil {
return "", err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
commit := struct {
SHA string `json:"sha"`
}{}
if resp.StatusCode != http.StatusOK {
return "", errors.New(resp.Status)
} else if err := json.Unmarshal(body, &commit); err != nil {
return "", err
}
return commit.SHA, nil
}
func rpcEnqueue(ctx context.Context,
w io.Writer, fs *flag.FlagSet, args []string) error {
if err := fs.Parse(args); err != nil {
return err
}
if fs.NArg() < 3 {
return errWrongUsage
}
owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2)
hash, err := giteaResolveRef(ctx, owner, repo, ref)
if err != nil {
return fmt.Errorf("%s: %w", ref, err)
}
project, ok := gConfig.Projects[owner+"/"+repo]
if !ok {
return fmt.Errorf("project configuration not found")
}
runners := fs.Args()[3:]
if len(runners) == 0 {
for runner := range project.Runners {
runners = append(runners, runner)
}
}
sort.Strings(runners)
for _, runner := range runners {
if _, ok := project.Runners[runner]; !ok {
return fmt.Errorf("project not configured for runner %s", runner)
}
}
for _, runner := range runners {
err := rpcEnqueueOne(ctx, owner, repo, hash, runner)
if err != nil {
fmt.Fprintf(w, "runner %s: %s\n", runner, err)
}
}
return nil
}
func rpcRestart(ctx context.Context, func rpcRestart(ctx context.Context,
w io.Writer, fs *flag.FlagSet, args []string) error { w io.Writer, fs *flag.FlagSet, args []string) error {
if err := fs.Parse(args); err != nil { if err := fs.Parse(args); err != nil {
@ -378,29 +506,17 @@ func rpcRestart(ctx context.Context,
} }
ids = append(ids, id) ids = append(ids, id)
} }
gRunningMutex.Lock()
defer gRunningMutex.Unlock()
for _, id := range ids { for _, id := range ids {
if _, ok := gRunning[id]; ok { if err := rpcRestartOne(ctx, id); err != nil {
fmt.Fprintf(w, "%d: not restarting running tasks\n", id) fmt.Fprintln(w, err)
continue }
} }
// The executor bumps to "running" after inserting into gRunning, // Mainly to allow scripts to touch the database directly.
// so we should not need to exclude that state here. if len(ids) == 0 {
result, err := gDB.ExecContext(ctx, `UPDATE task
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id)
if err != nil {
fmt.Fprintf(w, "%d: %s\n", id, err)
} else if n, _ := result.RowsAffected(); n != 1 {
fmt.Fprintf(w, "%d: no such task\n", id)
}
}
notifierAwaken() notifierAwaken()
executorAwaken() executorAwaken()
}
return nil return nil
} }
@ -412,7 +528,9 @@ var rpcCommands = map[string]struct {
usage string usage string
function string function string
}{ }{
"restart": {rpcRestart, "ID...", "enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...",
"Create or restart tasks for the given reference."},
"restart": {rpcRestart, "[ID]...",
"Schedule tasks with the given IDs to be rerun."}, "Schedule tasks with the given IDs to be rerun."},
} }
@ -565,18 +683,16 @@ func notifierNotify(ctx context.Context, task Task) error {
task.ID, task.FullName(), task.Hash, task.ID, task.FullName(), task.Hash,
payload.Context, payload.State, payload.Description) payload.Context, payload.State, payload.Description)
uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s", req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf(
gConfig.Gitea, task.Owner, task.Repo, task.Hash) "/api/v1/repos/%s/%s/statuses/%s",
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, url.PathEscape(task.Owner),
bytes.NewReader(body)) url.PathEscape(task.Repo),
url.PathEscape(task.Hash)), bytes.NewReader(body))
if err != nil { if err != nil {
return err return err
} }
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", "token "+gConfig.Token)
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err