Compare commits
No commits in common. "fe81d713e1c59f2175974f0bc3eda5ff7a5f0749" and "013e7eba28013f1f3c998b06e777c8293badde87" have entirely different histories.
fe81d713e1
...
013e7eba28
@ -28,12 +28,8 @@ Options
|
|||||||
|
|
||||||
Commands
|
Commands
|
||||||
--------
|
--------
|
||||||
*enqueue* _OWNER_ _REPO_ _REF_ [_RUNNER_]...::
|
*restart* _ID_...::
|
||||||
Create new or restart existing tasks for the given reference,
|
|
||||||
which will be resolved to a full commit hash.
|
|
||||||
*restart* [_ID_]...::
|
|
||||||
Schedule tasks with the given IDs to be rerun.
|
Schedule tasks with the given IDs to be rerun.
|
||||||
Run this command without arguments to pick up external database changes.
|
|
||||||
|
|
||||||
Configuration
|
Configuration
|
||||||
-------------
|
-------------
|
||||||
|
272
acid.go
272
acid.go
@ -17,13 +17,11 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
ttemplate "text/template"
|
ttemplate "text/template"
|
||||||
@ -97,24 +95,7 @@ func parseConfig(path string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Utilities ---------------------------------------------------------------
|
// --- Task views --------------------------------------------------------------
|
||||||
|
|
||||||
func giteaSign(b []byte) string {
|
|
||||||
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
|
|
||||||
payloadHmac.Write(b)
|
|
||||||
return hex.EncodeToString(payloadHmac.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
|
|
||||||
*http.Request, error) {
|
|
||||||
req, err := http.NewRequestWithContext(
|
|
||||||
ctx, method, gConfig.Gitea+path, body)
|
|
||||||
if req != nil {
|
|
||||||
req.Header.Set("Authorization", "token "+gConfig.Token)
|
|
||||||
req.Header.Set("Accept", "application/json")
|
|
||||||
}
|
|
||||||
return req, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
|
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
|
||||||
rows, err := gDB.QueryContext(ctx, `
|
rows, err := gDB.QueryContext(ctx, `
|
||||||
@ -138,8 +119,6 @@ func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
|
|||||||
return tasks, rows.Err()
|
return tasks, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Task views --------------------------------------------------------------
|
|
||||||
|
|
||||||
var templateTasks = template.Must(template.New("tasks").Parse(`
|
var templateTasks = template.Must(template.New("tasks").Parse(`
|
||||||
<!DOCTYPE html>
|
<!DOCTYPE html>
|
||||||
<html>
|
<html>
|
||||||
@ -321,6 +300,12 @@ func createTasks(ctx context.Context,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func giteaSign(b []byte) string {
|
||||||
|
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
|
||||||
|
payloadHmac.Write(b)
|
||||||
|
return hex.EncodeToString(payloadHmac.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
func handlePush(w http.ResponseWriter, r *http.Request) {
|
func handlePush(w http.ResponseWriter, r *http.Request) {
|
||||||
// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
|
// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
|
||||||
if r.Header.Get("X-Gitea-Event") != "push" {
|
if r.Header.Get("X-Gitea-Event") != "push" {
|
||||||
@ -376,178 +361,29 @@ func handlePush(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
const rpcHeaderSignature = "X-ACID-Signature"
|
const rpcHeaderSignature = "X-ACID-Signature"
|
||||||
|
|
||||||
var errWrongUsage = errors.New("wrong usage")
|
func rpcRestart(w io.Writer, ids []int64) {
|
||||||
|
|
||||||
func rpcRestartOne(ctx context.Context, id int64) error {
|
|
||||||
gRunningMutex.Lock()
|
gRunningMutex.Lock()
|
||||||
defer gRunningMutex.Unlock()
|
defer gRunningMutex.Unlock()
|
||||||
|
|
||||||
if _, ok := gRunning[id]; ok {
|
for _, id := range ids {
|
||||||
return fmt.Errorf("%d: not restarting running tasks", id)
|
if _, ok := gRunning[id]; ok {
|
||||||
}
|
fmt.Fprintf(w, "%d: not restarting running tasks\n", id)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// The executor bumps to "running" after inserting into gRunning,
|
// The executor bumps to "running" after inserting into gRunning,
|
||||||
// so we should not need to exclude that state here.
|
// so we should not need to exclude that state here.
|
||||||
result, err := gDB.ExecContext(ctx, `UPDATE task
|
result, err := gDB.ExecContext(context.Background(), `UPDATE task
|
||||||
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
|
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
|
||||||
taskStateNew, id)
|
taskStateNew, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%d: %w", id, err)
|
fmt.Fprintf(w, "%d: %s\n", id, err)
|
||||||
} else if n, _ := result.RowsAffected(); n != 1 {
|
} else if n, _ := result.RowsAffected(); n != 1 {
|
||||||
return fmt.Errorf("%d: no such task", id)
|
fmt.Fprintf(w, "%d: no such task\n", id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
notifierAwaken()
|
notifierAwaken()
|
||||||
executorAwaken()
|
executorAwaken()
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func rpcEnqueueOne(ctx context.Context,
|
|
||||||
owner, repo, hash, runner string) error {
|
|
||||||
tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ?
|
|
||||||
AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(tasks) != 0 {
|
|
||||||
return rpcRestartOne(ctx, tasks[0].ID)
|
|
||||||
} else {
|
|
||||||
return createTasks(ctx, owner, repo, hash, []string{runner})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func giteaResolveRef(ctx context.Context, owner, repo, ref string) (
|
|
||||||
string, error) {
|
|
||||||
req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf(
|
|
||||||
"/api/v1/repos/%s/%s/git/commits/%s",
|
|
||||||
url.PathEscape(owner),
|
|
||||||
url.PathEscape(repo),
|
|
||||||
url.PathEscape(ref)), nil)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
commit := struct {
|
|
||||||
SHA string `json:"sha"`
|
|
||||||
}{}
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return "", errors.New(resp.Status)
|
|
||||||
} else if err := json.Unmarshal(body, &commit); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return commit.SHA, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func rpcEnqueue(ctx context.Context,
|
|
||||||
w io.Writer, fs *flag.FlagSet, args []string) error {
|
|
||||||
if err := fs.Parse(args); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if fs.NArg() < 3 {
|
|
||||||
return errWrongUsage
|
|
||||||
}
|
|
||||||
|
|
||||||
owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2)
|
|
||||||
hash, err := giteaResolveRef(ctx, owner, repo, ref)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%s: %w", ref, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
project, ok := gConfig.Projects[owner+"/"+repo]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("project configuration not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
runners := fs.Args()[3:]
|
|
||||||
if len(runners) == 0 {
|
|
||||||
for runner := range project.Runners {
|
|
||||||
runners = append(runners, runner)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Strings(runners)
|
|
||||||
|
|
||||||
for _, runner := range runners {
|
|
||||||
if _, ok := project.Runners[runner]; !ok {
|
|
||||||
return fmt.Errorf("project not configured for runner %s", runner)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, runner := range runners {
|
|
||||||
err := rpcEnqueueOne(ctx, owner, repo, hash, runner)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(w, "runner %s: %s\n", runner, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func rpcRestart(ctx context.Context,
|
|
||||||
w io.Writer, fs *flag.FlagSet, args []string) error {
|
|
||||||
if err := fs.Parse(args); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ids := []int64{}
|
|
||||||
for _, arg := range fs.Args() {
|
|
||||||
id, err := strconv.ParseInt(arg, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%w: %s", errWrongUsage, err)
|
|
||||||
}
|
|
||||||
ids = append(ids, id)
|
|
||||||
}
|
|
||||||
for _, id := range ids {
|
|
||||||
if err := rpcRestartOne(ctx, id); err != nil {
|
|
||||||
fmt.Fprintln(w, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mainly to allow scripts to touch the database directly.
|
|
||||||
if len(ids) == 0 {
|
|
||||||
notifierAwaken()
|
|
||||||
executorAwaken()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
||||||
|
|
||||||
var rpcCommands = map[string]struct {
|
|
||||||
// handler must not write anything when returning an error.
|
|
||||||
handler func(context.Context, io.Writer, *flag.FlagSet, []string) error
|
|
||||||
usage string
|
|
||||||
function string
|
|
||||||
}{
|
|
||||||
"enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...",
|
|
||||||
"Create or restart tasks for the given reference."},
|
|
||||||
"restart": {rpcRestart, "[ID]...",
|
|
||||||
"Schedule tasks with the given IDs to be rerun."},
|
|
||||||
}
|
|
||||||
|
|
||||||
func rpcPrintCommands(w io.Writer) {
|
|
||||||
// The alphabetic ordering is unfortunate, but tolerable.
|
|
||||||
keys := []string{}
|
|
||||||
for key := range rpcCommands {
|
|
||||||
keys = append(keys, key)
|
|
||||||
}
|
|
||||||
sort.Strings(keys)
|
|
||||||
|
|
||||||
fmt.Fprintf(w, "Commands:\n")
|
|
||||||
for _, key := range keys {
|
|
||||||
cmd := rpcCommands[key]
|
|
||||||
fmt.Fprintf(w, " %s [OPTION...] %s\n \t%s\n",
|
|
||||||
key, cmd.usage, cmd.function)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleRPC(w http.ResponseWriter, r *http.Request) {
|
func handleRPC(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -574,43 +410,21 @@ func handleRPC(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Our handling closely follows what the flag package does internally.
|
|
||||||
|
|
||||||
command, args := args[0], args[1:]
|
command, args := args[0], args[1:]
|
||||||
cmd, ok := rpcCommands[command]
|
switch command {
|
||||||
if !ok {
|
case "restart":
|
||||||
http.Error(w, "unknown command: "+command, http.StatusBadRequest)
|
ids := []int64{}
|
||||||
rpcPrintCommands(w)
|
for _, arg := range args {
|
||||||
return
|
id, err := strconv.ParseInt(arg, 10, 64)
|
||||||
}
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
// If we redirected the FlagSet straight to the response,
|
return
|
||||||
// we would be unable to set our own HTTP status.
|
}
|
||||||
b := bytes.NewBuffer(nil)
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
fs := flag.NewFlagSet(command, flag.ContinueOnError)
|
rpcRestart(w, ids)
|
||||||
fs.SetOutput(b)
|
default:
|
||||||
fs.Usage = func() {
|
http.Error(w, "Unknown command: "+command, http.StatusBadRequest)
|
||||||
fmt.Fprintf(fs.Output(),
|
|
||||||
"Usage: %s [OPTION...] %s\n%s\n",
|
|
||||||
fs.Name(), cmd.usage, cmd.function)
|
|
||||||
fs.PrintDefaults()
|
|
||||||
}
|
|
||||||
|
|
||||||
err = cmd.handler(r.Context(), w, fs, args)
|
|
||||||
|
|
||||||
// Wrap this error to make it as if fs.Parse discovered the issue.
|
|
||||||
if errors.Is(err, errWrongUsage) {
|
|
||||||
fmt.Fprintln(fs.Output(), err)
|
|
||||||
fs.Usage()
|
|
||||||
}
|
|
||||||
|
|
||||||
// The flag package first prints all errors that it returns.
|
|
||||||
// If the buffer ends up not being empty, flush it into the request.
|
|
||||||
if b.Len() != 0 {
|
|
||||||
http.Error(w, strings.TrimSpace(b.String()), http.StatusBadRequest)
|
|
||||||
} else if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -683,16 +497,18 @@ func notifierNotify(ctx context.Context, task Task) error {
|
|||||||
task.ID, task.FullName(), task.Hash,
|
task.ID, task.FullName(), task.Hash,
|
||||||
payload.Context, payload.State, payload.Description)
|
payload.Context, payload.State, payload.Description)
|
||||||
|
|
||||||
req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf(
|
uri := fmt.Sprintf("%s/api/v1/repos/%s/%s/statuses/%s",
|
||||||
"/api/v1/repos/%s/%s/statuses/%s",
|
gConfig.Gitea, task.Owner, task.Repo, task.Hash)
|
||||||
url.PathEscape(task.Owner),
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri,
|
||||||
url.PathEscape(task.Repo),
|
bytes.NewReader(body))
|
||||||
url.PathEscape(task.Hash)), bytes.NewReader(body))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
req.Header.Set("Authorization", "token "+gConfig.Token)
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := http.DefaultClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user