Compare commits

...

2 Commits

2 changed files with 234 additions and 46 deletions

View File

@ -28,8 +28,12 @@ Options
Commands Commands
-------- --------
*restart* _ID_...:: *enqueue* _OWNER_ _REPO_ _REF_ [_RUNNER_]...::
Create new or restart existing tasks for the given reference,
which will be resolved to a full commit hash.
*restart* [_ID_]...::
Schedule tasks with the given IDs to be rerun. Schedule tasks with the given IDs to be rerun.
Run this command without arguments to pick up external database changes.
Configuration Configuration
------------- -------------

274
acid.go
View File

@ -17,11 +17,13 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"net/url"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"sort" "sort"
"strconv" "strconv"
"strings"
"sync" "sync"
"syscall" "syscall"
ttemplate "text/template" ttemplate "text/template"
@ -95,7 +97,24 @@ func parseConfig(path string) error {
return err return err
} }
// --- Task views -------------------------------------------------------------- // --- Utilities ---------------------------------------------------------------
func giteaSign(b []byte) string {
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil))
}
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
*http.Request, error) {
req, err := http.NewRequestWithContext(
ctx, method, gConfig.Gitea+path, body)
if req != nil {
req.Header.Set("Authorization", "token "+gConfig.Token)
req.Header.Set("Accept", "application/json")
}
return req, err
}
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) { func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
rows, err := gDB.QueryContext(ctx, ` rows, err := gDB.QueryContext(ctx, `
@ -119,6 +138,8 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
return tasks, rows.Err() return tasks, rows.Err()
} }
// --- Task views --------------------------------------------------------------
var templateTasks = template.Must(template.New("tasks").Parse(` var templateTasks = template.Must(template.New("tasks").Parse(`
<!DOCTYPE html> <!DOCTYPE html>
<html> <html>
@ -300,12 +321,6 @@ func createTasks(ctx context.Context,
return nil return nil
} }
func giteaSign(b []byte) string {
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
payloadHmac.Write(b)
return hex.EncodeToString(payloadHmac.Sum(nil))
}
func handlePush(w http.ResponseWriter, r *http.Request) { func handlePush(w http.ResponseWriter, r *http.Request) {
// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks. // X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
if r.Header.Get("X-Gitea-Event") != "push" { if r.Header.Get("X-Gitea-Event") != "push" {
@ -361,29 +376,178 @@ func handlePush(w http.ResponseWriter, r *http.Request) {
const rpcHeaderSignature = "X-ACID-Signature" const rpcHeaderSignature = "X-ACID-Signature"
func rpcRestart(w io.Writer, ids []int64) { var errWrongUsage = errors.New("wrong usage")
func rpcRestartOne(ctx context.Context, id int64) error {
gRunningMutex.Lock() gRunningMutex.Lock()
defer gRunningMutex.Unlock() defer gRunningMutex.Unlock()
for _, id := range ids { if _, ok := gRunning[id]; ok {
if _, ok := gRunning[id]; ok { return fmt.Errorf("%d: not restarting running tasks", id)
fmt.Fprintf(w, "%d: not restarting running tasks\n", id)
continue
}
// The executor bumps to "running" after inserting into gRunning,
// so we should not need to exclude that state here.
result, err := gDB.ExecContext(context.Background(), `UPDATE task
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id)
if err != nil {
fmt.Fprintf(w, "%d: %s\n", id, err)
} else if n, _ := result.RowsAffected(); n != 1 {
fmt.Fprintf(w, "%d: no such task\n", id)
}
} }
// The executor bumps to "running" after inserting into gRunning,
// so we should not need to exclude that state here.
result, err := gDB.ExecContext(ctx, `UPDATE task
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
taskStateNew, id)
if err != nil {
return fmt.Errorf("%d: %w", id, err)
} else if n, _ := result.RowsAffected(); n != 1 {
return fmt.Errorf("%d: no such task", id)
}
notifierAwaken() notifierAwaken()
executorAwaken() executorAwaken()
return nil
}
func rpcEnqueueOne(ctx context.Context,
owner, repo, hash, runner string) error {
tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ?
AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner)
if err != nil {
return err
}
if len(tasks) != 0 {
return rpcRestartOne(ctx, tasks[0].ID)
} else {
return createTasks(ctx, owner, repo, hash, []string{runner})
}
}
func giteaResolveRef(ctx context.Context, owner, repo, ref string) (
string, error) {
req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf(
"/api/v1/repos/%s/%s/git/commits/%s",
url.PathEscape(owner),
url.PathEscape(repo),
url.PathEscape(ref)), nil)
if err != nil {
return "", err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
commit := struct {
SHA string `json:"sha"`
}{}
if resp.StatusCode != http.StatusOK {
return "", errors.New(resp.Status)
} else if err := json.Unmarshal(body, &commit); err != nil {
return "", err
}
return commit.SHA, nil
}
func rpcEnqueue(ctx context.Context,
w io.Writer, fs *flag.FlagSet, args []string) error {
if err := fs.Parse(args); err != nil {
return err
}
if fs.NArg() < 3 {
return errWrongUsage
}
owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2)
hash, err := giteaResolveRef(ctx, owner, repo, ref)
if err != nil {
return fmt.Errorf("%s: %w", ref, err)
}
project, ok := gConfig.Projects[owner+"/"+repo]
if !ok {
return fmt.Errorf("project configuration not found")
}
runners := fs.Args()[3:]
if len(runners) == 0 {
for runner := range project.Runners {
runners = append(runners, runner)
}
}
sort.Strings(runners)
for _, runner := range runners {
if _, ok := project.Runners[runner]; !ok {
return fmt.Errorf("project not configured for runner %s", runner)
}
}
for _, runner := range runners {
err := rpcEnqueueOne(ctx, owner, repo, hash, runner)
if err != nil {
fmt.Fprintf(w, "runner %s: %s\n", runner, err)
}
}
return nil
}
func rpcRestart(ctx context.Context,
w io.Writer, fs *flag.FlagSet, args []string) error {
if err := fs.Parse(args); err != nil {
return err
}
ids := []int64{}
for _, arg := range fs.Args() {
id, err := strconv.ParseInt(arg, 10, 64)
if err != nil {
return fmt.Errorf("%w: %s", errWrongUsage, err)
}
ids = append(ids, id)
}
for _, id := range ids {
if err := rpcRestartOne(ctx, id); err != nil {
fmt.Fprintln(w, err)
}
}
// Mainly to allow scripts to touch the database directly.
if len(ids) == 0 {
notifierAwaken()
executorAwaken()
}
return nil
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
var rpcCommands = map[string]struct {
// handler must not write anything when returning an error.
handler func(context.Context, io.Writer, *flag.FlagSet, []string) error
usage string
function string
}{
"enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...",
"Create or restart tasks for the given reference."},
"restart": {rpcRestart, "[ID]...",
"Schedule tasks with the given IDs to be rerun."},
}
func rpcPrintCommands(w io.Writer) {
// The alphabetic ordering is unfortunate, but tolerable.
keys := []string{}
for key := range rpcCommands {
keys = append(keys, key)
}
sort.Strings(keys)
fmt.Fprintf(w, "Commands:\n")
for _, key := range keys {
cmd := rpcCommands[key]
fmt.Fprintf(w, " %s [OPTION...] %s\n \t%s\n",
key, cmd.usage, cmd.function)
}
} }
func handleRPC(w http.ResponseWriter, r *http.Request) { func handleRPC(w http.ResponseWriter, r *http.Request) {
@ -410,21 +574,43 @@ func handleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
// Our handling closely follows what the flag package does internally.
command, args := args[0], args[1:] command, args := args[0], args[1:]
switch command { cmd, ok := rpcCommands[command]
case "restart": if !ok {
ids := []int64{} http.Error(w, "unknown command: "+command, http.StatusBadRequest)
for _, arg := range args { rpcPrintCommands(w)
id, err := strconv.ParseInt(arg, 10, 64) return
if err != nil { }
http.Error(w, err.Error(), http.StatusBadRequest)
return // If we redirected the FlagSet straight to the response,
} // we would be unable to set our own HTTP status.
ids = append(ids, id) b := bytes.NewBuffer(nil)
}
rpcRestart(w, ids) fs := flag.NewFlagSet(command, flag.ContinueOnError)
default: fs.SetOutput(b)
http.Error(w, "Unknown command: "+command, http.StatusBadRequest) fs.Usage = func() {
fmt.Fprintf(fs.Output(),
"Usage: %s [OPTION...] %s\n%s\n",
fs.Name(), cmd.usage, cmd.function)
fs.PrintDefaults()
}
err = cmd.handler(r.Context(), w, fs, args)
// Wrap this error to make it as if fs.Parse discovered the issue.
if errors.Is(err, errWrongUsage) {
fmt.Fprintln(fs.Output(), err)
fs.Usage()
}
// The flag package first prints all errors that it returns.
// If the buffer ends up not being empty, flush it into the request.
if b.Len() != 0 {
http.Error(w, strings.TrimSpace(b.String()), http.StatusBadRequest)
} else if err != nil {
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
} }
} }
@ -497,18 +683,16 @@ func notifierNotify(ctx context.Context, task Task) error {
task.ID, task.FullName(), task.Hash, task.ID, task.FullName(), task.Hash,
payload.Context, payload.State, payload.Description) payload.Context, payload.State, payload.Description)
uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s", req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf(
gConfig.Gitea, task.Owner, task.Repo, task.Hash) "/api/v1/repos/%s/%s/statuses/%s",
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, url.PathEscape(task.Owner),
bytes.NewReader(body)) url.PathEscape(task.Repo),
url.PathEscape(task.Hash)), bytes.NewReader(body))
if err != nil { if err != nil {
return err return err
} }
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", "token "+gConfig.Token)
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err