2024-03-29 14:08:15 +01:00
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
|
|
|
|
"crypto/hmac"
|
|
|
|
|
"crypto/sha256"
|
|
|
|
|
"database/sql"
|
|
|
|
|
"encoding/hex"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"flag"
|
|
|
|
|
"fmt"
|
|
|
|
|
"html/template"
|
|
|
|
|
"io"
|
|
|
|
|
"io/ioutil"
|
|
|
|
|
"log"
|
|
|
|
|
"net"
|
|
|
|
|
"net/http"
|
2024-04-14 22:24:33 +02:00
|
|
|
|
"net/url"
|
2024-03-29 14:08:15 +01:00
|
|
|
|
"os"
|
|
|
|
|
"os/exec"
|
|
|
|
|
"os/signal"
|
2024-12-22 09:00:02 +01:00
|
|
|
|
"path/filepath"
|
2024-03-29 14:08:15 +01:00
|
|
|
|
"sort"
|
|
|
|
|
"strconv"
|
2024-04-14 22:07:39 +02:00
|
|
|
|
"strings"
|
2024-03-29 14:08:15 +01:00
|
|
|
|
"sync"
|
|
|
|
|
"syscall"
|
|
|
|
|
ttemplate "text/template"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
2024-12-22 09:00:02 +01:00
|
|
|
|
"github.com/pkg/sftp"
|
2024-03-29 14:08:15 +01:00
|
|
|
|
"golang.org/x/crypto/ssh"
|
|
|
|
|
"gopkg.in/yaml.v3"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
projectName = "acid"
|
|
|
|
|
projectVersion = "?"
|
|
|
|
|
|
|
|
|
|
gConfig Config = Config{Listen: ":http"}
|
|
|
|
|
gNotifyScript *ttemplate.Template
|
|
|
|
|
gDB *sql.DB
|
|
|
|
|
|
|
|
|
|
gNotifierSignal = make(chan struct{}, 1)
|
|
|
|
|
gExecutorSignal = make(chan struct{}, 1)
|
|
|
|
|
|
|
|
|
|
// The mutex is at least supposed to lock over the tasks as well.
|
|
|
|
|
gRunningMutex sync.Mutex
|
|
|
|
|
gRunning = make(map[int64]*RunningTask)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// --- Config ------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
type Config struct {
|
|
|
|
|
DB string `yaml:"db"` // database file path
|
|
|
|
|
Listen string `yaml:"listen"` // HTTP listener address
|
|
|
|
|
Root string `yaml:"root"` // HTTP root URI
|
|
|
|
|
Gitea string `yaml:"gitea"` // Gitea base URL
|
|
|
|
|
Secret string `yaml:"secret"` // Gitea hook secret
|
|
|
|
|
Token string `yaml:"token"` // Gitea API token
|
|
|
|
|
Notify string `yaml:"notify"` // notifier script
|
|
|
|
|
|
|
|
|
|
Runners map[string]ConfigRunner `yaml:"runners"` // script runners
|
|
|
|
|
Projects map[string]ConfigProject `yaml:"projects"` // configured projects
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ConfigRunner struct {
|
2024-12-21 14:38:11 +01:00
|
|
|
|
Name string `yaml:"name"` // descriptive name
|
|
|
|
|
Manual bool `yaml:"manual"` // only run on request
|
|
|
|
|
Run string `yaml:"run"` // runner executable
|
|
|
|
|
Setup string `yaml:"setup"` // runner setup script (SSH)
|
|
|
|
|
SSH struct {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
User string `yaml:"user"` // remote username
|
|
|
|
|
Address string `yaml:"address"` // TCP host:port
|
|
|
|
|
Identity string `yaml:"identity"` // private key path
|
|
|
|
|
} `yaml:"ssh"` // shell access
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ConfigProject struct {
|
|
|
|
|
Runners map[string]ConfigProjectRunner `yaml:"runners"`
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-21 14:38:11 +01:00
|
|
|
|
func (cf *ConfigProject) AutomaticRunners() (runners []string) {
|
|
|
|
|
// We pass through unknown runner names,
|
|
|
|
|
// so that they can cause reference errors later.
|
|
|
|
|
for runner := range cf.Runners {
|
|
|
|
|
if r, _ := gConfig.Runners[runner]; !r.Manual {
|
|
|
|
|
runners = append(runners, runner)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sort.Strings(runners)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
type ConfigProjectRunner struct {
|
2024-04-19 04:26:48 +02:00
|
|
|
|
Setup string `yaml:"setup"` // project setup script (SSH)
|
|
|
|
|
Build string `yaml:"build"` // project build script (SSH)
|
2024-12-22 09:00:02 +01:00
|
|
|
|
Deploy string `yaml:"deploy"` // project deploy script (local)
|
2024-04-19 04:26:48 +02:00
|
|
|
|
Timeout string `yaml:"timeout"` // timeout duration
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func parseConfig(path string) error {
|
|
|
|
|
if f, err := os.Open(path); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
} else if err = yaml.NewDecoder(f).Decode(&gConfig); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var err error
|
2024-04-16 07:38:23 +02:00
|
|
|
|
gNotifyScript, err =
|
|
|
|
|
ttemplate.New("notify").Funcs(shellFuncs).Parse(gConfig.Notify)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-16 07:38:23 +02:00
|
|
|
|
var shellFuncs = ttemplate.FuncMap{
|
|
|
|
|
"quote": func(word string) string {
|
|
|
|
|
// History expansion is annoying, don't let it cut us.
|
|
|
|
|
if strings.IndexRune(word, '!') >= 0 {
|
2024-04-17 01:44:57 +02:00
|
|
|
|
return "'" + strings.ReplaceAll(word, "'", `'\''`) + "'"
|
2024-04-16 07:38:23 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const special = "$`\"\\"
|
|
|
|
|
quoted := []rune{'"'}
|
|
|
|
|
for _, r := range word {
|
|
|
|
|
if strings.IndexRune(special, r) >= 0 {
|
|
|
|
|
quoted = append(quoted, '\\')
|
|
|
|
|
}
|
|
|
|
|
quoted = append(quoted, r)
|
|
|
|
|
}
|
|
|
|
|
return string(append(quoted, '"'))
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-14 22:24:33 +02:00
|
|
|
|
// --- Utilities ---------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
func giteaSign(b []byte) string {
|
|
|
|
|
payloadHmac := hmac.New(sha256.New, []byte(gConfig.Secret))
|
|
|
|
|
payloadHmac.Write(b)
|
|
|
|
|
return hex.EncodeToString(payloadHmac.Sum(nil))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func giteaNewRequest(ctx context.Context, method, path string, body io.Reader) (
|
|
|
|
|
*http.Request, error) {
|
|
|
|
|
req, err := http.NewRequestWithContext(
|
|
|
|
|
ctx, method, gConfig.Gitea+path, body)
|
|
|
|
|
if req != nil {
|
|
|
|
|
req.Header.Set("Authorization", "token "+gConfig.Token)
|
|
|
|
|
req.Header.Set("Accept", "application/json")
|
|
|
|
|
}
|
|
|
|
|
return req, err
|
|
|
|
|
}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
|
|
|
|
func getTasks(ctx context.Context, query string, args ...any) ([]Task, error) {
|
|
|
|
|
rows, err := gDB.QueryContext(ctx, `
|
|
|
|
|
SELECT id, owner, repo, hash, runner,
|
2024-12-22 09:00:02 +01:00
|
|
|
|
state, detail, notified,
|
|
|
|
|
runlog, tasklog, deploylog FROM task `+query, args...)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
tasks := []Task{}
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var t Task
|
|
|
|
|
err := rows.Scan(&t.ID, &t.Owner, &t.Repo, &t.Hash, &t.Runner,
|
2024-12-22 09:00:02 +01:00
|
|
|
|
&t.State, &t.Detail, &t.Notified,
|
|
|
|
|
&t.RunLog, &t.TaskLog, &t.DeployLog)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
tasks = append(tasks, t)
|
|
|
|
|
}
|
|
|
|
|
return tasks, rows.Err()
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-14 22:24:33 +02:00
|
|
|
|
// --- Task views --------------------------------------------------------------
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
var templateTasks = template.Must(template.New("tasks").Parse(`
|
|
|
|
|
<!DOCTYPE html>
|
|
|
|
|
<html>
|
|
|
|
|
<head>
|
|
|
|
|
<meta charset="utf-8">
|
|
|
|
|
<title>Tasks</title>
|
|
|
|
|
</head>
|
|
|
|
|
<body>
|
|
|
|
|
<h1>Tasks</h1>
|
|
|
|
|
<table border="1">
|
|
|
|
|
<thead>
|
|
|
|
|
<tr>
|
|
|
|
|
<th>ID</th>
|
|
|
|
|
<th>Repository</th>
|
|
|
|
|
<th>Hash</th>
|
|
|
|
|
<th>Runner</th>
|
|
|
|
|
<th>State</th>
|
|
|
|
|
<th>Detail</th>
|
|
|
|
|
<th>Notified</th>
|
|
|
|
|
</tr>
|
|
|
|
|
</thead>
|
|
|
|
|
<tbody>
|
|
|
|
|
{{range .}}
|
|
|
|
|
<tr>
|
|
|
|
|
<td><a href="task/{{.ID}}">{{.ID}}</a></td>
|
|
|
|
|
<td><a href="{{.RepoURL}}">{{.FullName}}</a></td>
|
|
|
|
|
<td><a href="{{.CommitURL}}">{{.Hash}}</a></td>
|
|
|
|
|
<td>{{.RunnerName}}</td>
|
|
|
|
|
<td>{{.State}}</td>
|
|
|
|
|
<td>{{.Detail}}</td>
|
|
|
|
|
<td>{{.Notified}}</td>
|
|
|
|
|
</tr>
|
|
|
|
|
{{end}}
|
|
|
|
|
</tbody>
|
|
|
|
|
</table>
|
|
|
|
|
</body>
|
|
|
|
|
</html>
|
|
|
|
|
`))
|
|
|
|
|
|
|
|
|
|
func handleTasks(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
tasks, err := getTasks(r.Context(), `ORDER BY id DESC`)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Error retrieving tasks: "+err.Error(),
|
|
|
|
|
http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := templateTasks.Execute(w, tasks); err != nil {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var templateTask = template.Must(template.New("tasks").Parse(`
|
|
|
|
|
<!DOCTYPE html>
|
|
|
|
|
<html>
|
|
|
|
|
<head>
|
|
|
|
|
<title>Task {{.ID}}</title>
|
|
|
|
|
<meta charset="utf-8">
|
|
|
|
|
{{if .IsRunning}}
|
|
|
|
|
<meta http-equiv="refresh" content="5">
|
|
|
|
|
{{end}}
|
|
|
|
|
</head>
|
|
|
|
|
<body>
|
2024-04-08 02:49:12 +02:00
|
|
|
|
<h1><a href="..">Tasks</a> » {{.ID}}</h1>
|
2024-03-29 14:08:15 +01:00
|
|
|
|
<dl>
|
|
|
|
|
<dt>Project</dt>
|
|
|
|
|
<dd><a href="{{.RepoURL}}">{{.FullName}}</a></dd>
|
|
|
|
|
<dt>Commit</dt>
|
|
|
|
|
<dd><a href="{{.CommitURL}}">{{.Hash}}</a></dd>
|
|
|
|
|
<dt>Runner</dt>
|
|
|
|
|
<dd>{{.RunnerName}}</dd>
|
|
|
|
|
<dt>State</dt>
|
|
|
|
|
<dd>{{.State}}{{if .Detail}} ({{.Detail}}){{end}}</dd>
|
|
|
|
|
<dt>Notified</dt>
|
|
|
|
|
<dd>{{.Notified}}</dd>
|
|
|
|
|
</dl>
|
|
|
|
|
{{if .RunLog}}
|
|
|
|
|
<h2>Runner log</h2>
|
|
|
|
|
<pre>{{printf "%s" .RunLog}}</pre>
|
|
|
|
|
{{end}}
|
|
|
|
|
{{if .TaskLog}}
|
|
|
|
|
<h2>Task log</h2>
|
|
|
|
|
<pre>{{printf "%s" .TaskLog}}</pre>
|
|
|
|
|
{{end}}
|
2024-12-22 09:00:02 +01:00
|
|
|
|
{{if .DeployLog}}
|
|
|
|
|
<h2>Deploy log</h2>
|
|
|
|
|
<pre>{{printf "%s" .DeployLog}}</pre>
|
|
|
|
|
{{end}}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
</table>
|
|
|
|
|
</body>
|
|
|
|
|
</html>
|
|
|
|
|
`))
|
|
|
|
|
|
|
|
|
|
func handleTask(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
id, err := strconv.Atoi(r.PathValue("id"))
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Invalid ID: "+err.Error(), http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tasks, err := getTasks(r.Context(), `WHERE id = ?`, id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Error retrieving task: "+err.Error(),
|
|
|
|
|
http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if len(tasks) == 0 {
|
|
|
|
|
http.NotFound(w, r)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
task := struct {
|
|
|
|
|
Task
|
|
|
|
|
IsRunning bool
|
|
|
|
|
}{Task: tasks[0]}
|
|
|
|
|
func() {
|
|
|
|
|
gRunningMutex.Lock()
|
|
|
|
|
defer gRunningMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
rt, ok := gRunning[task.ID]
|
|
|
|
|
task.IsRunning = ok
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-25 22:18:30 +01:00
|
|
|
|
rt.RunLog.Lock()
|
|
|
|
|
defer rt.RunLog.Unlock()
|
|
|
|
|
rt.TaskLog.Lock()
|
|
|
|
|
defer rt.TaskLog.Unlock()
|
|
|
|
|
rt.DeployLog.Lock()
|
|
|
|
|
defer rt.DeployLog.Unlock()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-12-25 22:18:30 +01:00
|
|
|
|
task.RunLog = rt.RunLog.Serialize(0)
|
|
|
|
|
task.TaskLog = rt.TaskLog.Serialize(0)
|
|
|
|
|
task.DeployLog = rt.DeployLog.Serialize(0)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
if err := templateTask.Execute(w, &task); err != nil {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- Push hook ---------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
type GiteaPushEvent struct {
|
|
|
|
|
HeadCommit struct {
|
|
|
|
|
ID string `json:"id"`
|
|
|
|
|
} `json:"head_commit"`
|
|
|
|
|
Repository struct {
|
|
|
|
|
Name string `json:"name"`
|
|
|
|
|
FullName string `json:"full_name"`
|
|
|
|
|
Owner struct {
|
|
|
|
|
Username string `json:"username"`
|
|
|
|
|
} `json:"owner"`
|
|
|
|
|
} `json:"repository"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func createTasks(ctx context.Context,
|
|
|
|
|
owner, repo, hash string, runners []string) error {
|
|
|
|
|
tx, err := gDB.BeginTx(ctx, nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer tx.Rollback()
|
|
|
|
|
|
|
|
|
|
stmt, err := tx.Prepare(`INSERT INTO task(owner, repo, hash, runner)
|
|
|
|
|
VALUES (?, ?, ?, ?)`)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, runner := range runners {
|
|
|
|
|
if _, err := stmt.Exec(owner, repo, hash, runner); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
notifierAwaken()
|
|
|
|
|
executorAwaken()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handlePush(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
// X-Gitea-Delivery doesn't seem useful, pushes explode into multiple tasks.
|
|
|
|
|
if r.Header.Get("X-Gitea-Event") != "push" {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Expected a Gitea push event", http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
body, err := ioutil.ReadAll(r.Body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Error reading request body", http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if r.Header.Get("X-Gitea-Signature") != giteaSign(body) {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Signature mismatch", http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var event GiteaPushEvent
|
|
|
|
|
if err := json.Unmarshal(body, &event); err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Invalid request body: "+err.Error(), http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Printf("received push: %s %s\n",
|
|
|
|
|
event.Repository.FullName, event.HeadCommit.ID)
|
|
|
|
|
|
|
|
|
|
project, ok := gConfig.Projects[event.Repository.FullName]
|
|
|
|
|
if !ok {
|
|
|
|
|
// This is okay, don't set any commit statuses.
|
|
|
|
|
fmt.Fprintf(w, "The project is not configured.")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-21 14:38:11 +01:00
|
|
|
|
runners := project.AutomaticRunners()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
if err := createTasks(r.Context(),
|
|
|
|
|
event.Repository.Owner.Username, event.Repository.Name,
|
|
|
|
|
event.HeadCommit.ID, runners); err != nil {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- RPC ---------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
const rpcHeaderSignature = "X-ACID-Signature"
|
|
|
|
|
|
2024-04-14 22:07:39 +02:00
|
|
|
|
var errWrongUsage = errors.New("wrong usage")
|
|
|
|
|
|
2024-04-14 22:24:33 +02:00
|
|
|
|
func rpcRestartOne(ctx context.Context, id int64) error {
|
|
|
|
|
gRunningMutex.Lock()
|
|
|
|
|
defer gRunningMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := gRunning[id]; ok {
|
|
|
|
|
return fmt.Errorf("%d: not restarting running tasks", id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The executor bumps to "running" after inserting into gRunning,
|
|
|
|
|
// so we should not need to exclude that state here.
|
|
|
|
|
result, err := gDB.ExecContext(ctx, `UPDATE task
|
|
|
|
|
SET state = ?, detail = '', notified = 0 WHERE id = ?`,
|
|
|
|
|
taskStateNew, id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("%d: %w", id, err)
|
|
|
|
|
} else if n, _ := result.RowsAffected(); n != 1 {
|
|
|
|
|
return fmt.Errorf("%d: no such task", id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
notifierAwaken()
|
|
|
|
|
executorAwaken()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func rpcEnqueueOne(ctx context.Context,
|
|
|
|
|
owner, repo, hash, runner string) error {
|
|
|
|
|
tasks, err := getTasks(ctx, `WHERE owner = ? AND repo = ? AND hash = ?
|
|
|
|
|
AND runner = ? ORDER BY id DESC LIMIT 1`, owner, repo, hash, runner)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(tasks) != 0 {
|
|
|
|
|
return rpcRestartOne(ctx, tasks[0].ID)
|
|
|
|
|
} else {
|
|
|
|
|
return createTasks(ctx, owner, repo, hash, []string{runner})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func giteaResolveRef(ctx context.Context, owner, repo, ref string) (
|
|
|
|
|
string, error) {
|
|
|
|
|
req, err := giteaNewRequest(ctx, http.MethodGet, fmt.Sprintf(
|
|
|
|
|
"/api/v1/repos/%s/%s/git/commits/%s",
|
|
|
|
|
url.PathEscape(owner),
|
|
|
|
|
url.PathEscape(repo),
|
|
|
|
|
url.PathEscape(ref)), nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
commit := struct {
|
|
|
|
|
SHA string `json:"sha"`
|
|
|
|
|
}{}
|
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
|
|
|
return "", errors.New(resp.Status)
|
|
|
|
|
} else if err := json.Unmarshal(body, &commit); err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
return commit.SHA, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func rpcEnqueue(ctx context.Context,
|
|
|
|
|
w io.Writer, fs *flag.FlagSet, args []string) error {
|
|
|
|
|
if err := fs.Parse(args); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if fs.NArg() < 3 {
|
|
|
|
|
return errWrongUsage
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
owner, repo, ref := fs.Arg(0), fs.Arg(1), fs.Arg(2)
|
|
|
|
|
hash, err := giteaResolveRef(ctx, owner, repo, ref)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("%s: %w", ref, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
project, ok := gConfig.Projects[owner+"/"+repo]
|
|
|
|
|
if !ok {
|
|
|
|
|
return fmt.Errorf("project configuration not found")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runners := fs.Args()[3:]
|
|
|
|
|
if len(runners) == 0 {
|
2024-12-21 14:38:11 +01:00
|
|
|
|
runners = project.AutomaticRunners()
|
2024-04-14 22:24:33 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, runner := range runners {
|
|
|
|
|
if _, ok := project.Runners[runner]; !ok {
|
|
|
|
|
return fmt.Errorf("project not configured for runner %s", runner)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, runner := range runners {
|
|
|
|
|
err := rpcEnqueueOne(ctx, owner, repo, hash, runner)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Fprintf(w, "runner %s: %s\n", runner, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-14 22:07:39 +02:00
|
|
|
|
func rpcRestart(ctx context.Context,
|
|
|
|
|
w io.Writer, fs *flag.FlagSet, args []string) error {
|
|
|
|
|
if err := fs.Parse(args); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ids := []int64{}
|
|
|
|
|
for _, arg := range fs.Args() {
|
|
|
|
|
id, err := strconv.ParseInt(arg, 10, 64)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("%w: %s", errWrongUsage, err)
|
|
|
|
|
}
|
|
|
|
|
ids = append(ids, id)
|
|
|
|
|
}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
for _, id := range ids {
|
2024-04-14 22:24:33 +02:00
|
|
|
|
if err := rpcRestartOne(ctx, id); err != nil {
|
|
|
|
|
fmt.Fprintln(w, err)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
2024-04-14 22:24:33 +02:00
|
|
|
|
}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-04-14 22:24:33 +02:00
|
|
|
|
// Mainly to allow scripts to touch the database directly.
|
|
|
|
|
if len(ids) == 0 {
|
|
|
|
|
notifierAwaken()
|
|
|
|
|
executorAwaken()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
2024-04-14 22:07:39 +02:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
|
|
|
|
|
|
var rpcCommands = map[string]struct {
|
|
|
|
|
// handler must not write anything when returning an error.
|
|
|
|
|
handler func(context.Context, io.Writer, *flag.FlagSet, []string) error
|
|
|
|
|
usage string
|
|
|
|
|
function string
|
|
|
|
|
}{
|
2024-04-14 22:24:33 +02:00
|
|
|
|
"enqueue": {rpcEnqueue, "OWNER REPO REF [RUNNER]...",
|
|
|
|
|
"Create or restart tasks for the given reference."},
|
|
|
|
|
"restart": {rpcRestart, "[ID]...",
|
2024-04-14 22:07:39 +02:00
|
|
|
|
"Schedule tasks with the given IDs to be rerun."},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func rpcPrintCommands(w io.Writer) {
|
|
|
|
|
// The alphabetic ordering is unfortunate, but tolerable.
|
|
|
|
|
keys := []string{}
|
|
|
|
|
for key := range rpcCommands {
|
|
|
|
|
keys = append(keys, key)
|
|
|
|
|
}
|
|
|
|
|
sort.Strings(keys)
|
|
|
|
|
|
|
|
|
|
fmt.Fprintf(w, "Commands:\n")
|
|
|
|
|
for _, key := range keys {
|
|
|
|
|
cmd := rpcCommands[key]
|
|
|
|
|
fmt.Fprintf(w, " %s [OPTION...] %s\n \t%s\n",
|
|
|
|
|
key, cmd.usage, cmd.function)
|
|
|
|
|
}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleRPC(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
body, err := ioutil.ReadAll(r.Body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Error reading request body", http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if r.Header.Get(rpcHeaderSignature) != giteaSign(body) {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Signature mismatch", http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var args []string
|
|
|
|
|
if err := json.Unmarshal(body, &args); err != nil {
|
|
|
|
|
http.Error(w,
|
|
|
|
|
"Invalid request body: "+err.Error(), http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if len(args) == 0 {
|
|
|
|
|
http.Error(w, "Missing command", http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-14 22:07:39 +02:00
|
|
|
|
// Our handling closely follows what the flag package does internally.
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
command, args := args[0], args[1:]
|
2024-04-14 22:07:39 +02:00
|
|
|
|
cmd, ok := rpcCommands[command]
|
|
|
|
|
if !ok {
|
|
|
|
|
http.Error(w, "unknown command: "+command, http.StatusBadRequest)
|
|
|
|
|
rpcPrintCommands(w)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If we redirected the FlagSet straight to the response,
|
|
|
|
|
// we would be unable to set our own HTTP status.
|
|
|
|
|
b := bytes.NewBuffer(nil)
|
|
|
|
|
|
|
|
|
|
fs := flag.NewFlagSet(command, flag.ContinueOnError)
|
|
|
|
|
fs.SetOutput(b)
|
|
|
|
|
fs.Usage = func() {
|
|
|
|
|
fmt.Fprintf(fs.Output(),
|
|
|
|
|
"Usage: %s [OPTION...] %s\n%s\n",
|
|
|
|
|
fs.Name(), cmd.usage, cmd.function)
|
|
|
|
|
fs.PrintDefaults()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = cmd.handler(r.Context(), w, fs, args)
|
|
|
|
|
|
|
|
|
|
// Wrap this error to make it as if fs.Parse discovered the issue.
|
|
|
|
|
if errors.Is(err, errWrongUsage) {
|
|
|
|
|
fmt.Fprintln(fs.Output(), err)
|
|
|
|
|
fs.Usage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The flag package first prints all errors that it returns.
|
|
|
|
|
// If the buffer ends up not being empty, flush it into the request.
|
|
|
|
|
if b.Len() != 0 {
|
|
|
|
|
http.Error(w, strings.TrimSpace(b.String()), http.StatusBadRequest)
|
|
|
|
|
} else if err != nil {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- Notifier ----------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
func notifierRunCommand(ctx context.Context, task Task) {
|
|
|
|
|
script := bytes.NewBuffer(nil)
|
|
|
|
|
if err := gNotifyScript.Execute(script, &task); err != nil {
|
|
|
|
|
log.Printf("error: notify: %s", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cmd := exec.CommandContext(ctx, "sh")
|
|
|
|
|
cmd.Stdin = script
|
|
|
|
|
cmd.Stdout = os.Stdout
|
|
|
|
|
cmd.Stderr = os.Stderr
|
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
|
|
|
log.Printf("error: notify: %s", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func notifierNotify(ctx context.Context, task Task) error {
|
|
|
|
|
// Loosely assuming that this only runs on state changes.
|
|
|
|
|
if task.State != taskStateNew && task.State != taskStateRunning {
|
|
|
|
|
go notifierRunCommand(ctx, task)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
payload := struct {
|
|
|
|
|
Context string `json:"context"`
|
|
|
|
|
Description string `json:"description"`
|
|
|
|
|
State string `json:"state"`
|
|
|
|
|
TargetURL string `json:"target_url"`
|
|
|
|
|
}{}
|
|
|
|
|
|
|
|
|
|
runner, ok := gConfig.Runners[task.Runner]
|
|
|
|
|
if !ok {
|
|
|
|
|
log.Printf("task %d has an unknown runner %s\n", task.ID, task.Runner)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
payload.Context = runner.Name
|
|
|
|
|
payload.TargetURL = fmt.Sprintf("%s/task/%d", gConfig.Root, task.ID)
|
|
|
|
|
|
|
|
|
|
switch task.State {
|
|
|
|
|
case taskStateNew:
|
|
|
|
|
payload.State, payload.Description = "pending", "Pending"
|
|
|
|
|
case taskStateRunning:
|
|
|
|
|
payload.State, payload.Description = "pending", "Running"
|
|
|
|
|
case taskStateError:
|
|
|
|
|
payload.State, payload.Description = "error", "Error"
|
|
|
|
|
case taskStateFailed:
|
|
|
|
|
payload.State, payload.Description = "failure", "Failure"
|
|
|
|
|
case taskStateSuccess:
|
|
|
|
|
payload.State, payload.Description = "success", "Success"
|
|
|
|
|
default:
|
|
|
|
|
log.Printf("task %d is in unknown state %d\n", task.ID, task.State)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We should only fill this in case we have some specific information.
|
|
|
|
|
if task.Detail != "" {
|
|
|
|
|
payload.Description = task.Detail
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
body, err := json.Marshal(payload)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Printf("task %d for %s: notifying: %s: %s: %s (%s)\n",
|
|
|
|
|
task.ID, task.FullName(), task.Hash,
|
|
|
|
|
payload.Context, payload.State, payload.Description)
|
|
|
|
|
|
2024-04-14 22:24:33 +02:00
|
|
|
|
req, err := giteaNewRequest(ctx, http.MethodPost, fmt.Sprintf(
|
|
|
|
|
"/api/v1/repos/%s/%s/statuses/%s",
|
|
|
|
|
url.PathEscape(task.Owner),
|
|
|
|
|
url.PathEscape(task.Repo),
|
|
|
|
|
url.PathEscape(task.Hash)), bytes.NewReader(body))
|
2024-03-29 14:08:15 +01:00
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
|
|
body, err = ioutil.ReadAll(resp.Body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err = gDB.ExecContext(ctx, `UPDATE task SET notified = 1
|
|
|
|
|
WHERE id = ? AND state = ? AND detail = ? AND notified = 0`,
|
|
|
|
|
task.ID, task.State, task.Detail)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func notifierRun(ctx context.Context) error {
|
|
|
|
|
tasks, err := getTasks(ctx, `WHERE notified = 0 ORDER BY id ASC`)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, task := range tasks {
|
|
|
|
|
if err := notifierNotify(ctx, task); err != nil {
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"task %d for %s: %w", task.ID, task.FullName(), err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func notifier(ctx context.Context) {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-gNotifierSignal:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := notifierRun(ctx); err != nil {
|
|
|
|
|
log.Printf("error: notifier: %s\n", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func notifierAwaken() {
|
|
|
|
|
select {
|
|
|
|
|
case gNotifierSignal <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- Executor ----------------------------------------------------------------
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// ~~~ Running task ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// RunningTask stores all data pertaining to a currently running task.
|
2024-03-29 14:08:15 +01:00
|
|
|
|
type RunningTask struct {
|
|
|
|
|
DB Task
|
|
|
|
|
Runner ConfigRunner
|
|
|
|
|
ProjectRunner ConfigProjectRunner
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
RunLog terminalWriter
|
|
|
|
|
TaskLog terminalWriter
|
|
|
|
|
DeployLog terminalWriter
|
|
|
|
|
|
|
|
|
|
wd string // acid working directory
|
|
|
|
|
timeout time.Duration // time limit on task execution
|
|
|
|
|
signer ssh.Signer // SSH private key
|
|
|
|
|
tmplScript *ttemplate.Template // remote build script
|
|
|
|
|
tmplDeploy *ttemplate.Template // local deployment script
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// newRunningTask prepares a task for running, without executing anything yet.
|
|
|
|
|
func newRunningTask(task Task) (*RunningTask, error) {
|
|
|
|
|
rt := &RunningTask{DB: task}
|
|
|
|
|
|
|
|
|
|
var ok bool
|
|
|
|
|
rt.Runner, ok = gConfig.Runners[rt.DB.Runner]
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("unknown runner: %s", rt.DB.Runner)
|
|
|
|
|
}
|
|
|
|
|
project, ok := gConfig.Projects[rt.DB.FullName()]
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("project configuration not found")
|
|
|
|
|
}
|
|
|
|
|
rt.ProjectRunner, ok = project.Runners[rt.DB.Runner]
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf(
|
|
|
|
|
"project not configured for runner %s", rt.DB.Runner)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
var err error
|
|
|
|
|
if rt.wd, err = os.Getwd(); err != nil {
|
|
|
|
|
return nil, err
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// Lenient or not, some kind of a time limit is desirable.
|
|
|
|
|
rt.timeout = time.Hour
|
|
|
|
|
if rt.ProjectRunner.Timeout != "" {
|
|
|
|
|
rt.timeout, err = time.ParseDuration(rt.ProjectRunner.Timeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("timeout: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
privateKey, err := os.ReadFile(rt.Runner.SSH.Identity)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf(
|
|
|
|
|
"cannot read SSH identity for runner %s: %w", rt.DB.Runner, err)
|
|
|
|
|
}
|
|
|
|
|
rt.signer, err = ssh.ParsePrivateKey(privateKey)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf(
|
|
|
|
|
"cannot parse SSH identity for runner %s: %w", rt.DB.Runner, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The runner setup script may change the working directory,
|
|
|
|
|
// so do everything in one go. However, this approach also makes it
|
|
|
|
|
// difficult to distinguish project-independent runner failures.
|
|
|
|
|
// (For that, we can start multiple ssh.Sessions.)
|
|
|
|
|
//
|
|
|
|
|
// We could pass variables through SSH environment variables,
|
|
|
|
|
// which would require enabling PermitUserEnvironment in sshd_config,
|
|
|
|
|
// or through prepending script lines, but templates are a bit simpler.
|
|
|
|
|
//
|
|
|
|
|
// We let the runner itself clone the repository:
|
|
|
|
|
// - it is a more flexible in that it can test AUR packages more normally,
|
|
|
|
|
// - we might have to clone submodules as well.
|
|
|
|
|
// Otherwise, we could download a source archive from Gitea,
|
|
|
|
|
// and use SFTP to upload it to the runner.
|
|
|
|
|
rt.tmplScript, err = ttemplate.New("script").Funcs(shellFuncs).
|
|
|
|
|
Parse(rt.Runner.Setup + "\n" +
|
|
|
|
|
rt.ProjectRunner.Setup + "\n" + rt.ProjectRunner.Build)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("script/build: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rt.tmplDeploy, err = ttemplate.New("deploy").Funcs(shellFuncs).
|
|
|
|
|
Parse(rt.ProjectRunner.Deploy)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("script/deploy: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-24 21:05:26 +01:00
|
|
|
|
if os.Getenv("ACID_TERMINAL_DEBUG") != "" {
|
|
|
|
|
base := filepath.Join(executorTmpDir("/tmp"),
|
|
|
|
|
fmt.Sprintf("acid-%d-%s-%s-%s-",
|
|
|
|
|
task.ID, task.Owner, task.Repo, task.Runner))
|
2024-12-25 22:18:30 +01:00
|
|
|
|
rt.RunLog.Tee, _ = os.Create(base + "runlog")
|
|
|
|
|
rt.TaskLog.Tee, _ = os.Create(base + "tasklog")
|
2024-12-24 21:05:26 +01:00
|
|
|
|
// The deployment log should not be interesting.
|
|
|
|
|
}
|
2024-12-22 09:00:02 +01:00
|
|
|
|
return rt, nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-24 21:05:26 +01:00
|
|
|
|
func (rt *RunningTask) close() {
|
|
|
|
|
for _, tee := range []io.WriteCloser{
|
2024-12-25 22:18:30 +01:00
|
|
|
|
rt.RunLog.Tee, rt.TaskLog.Tee, rt.DeployLog.Tee} {
|
2024-12-24 21:05:26 +01:00
|
|
|
|
if tee != nil {
|
|
|
|
|
tee.Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// localEnv creates a process environment for locally run executables.
|
|
|
|
|
func (rt *RunningTask) localEnv() []string {
|
|
|
|
|
return append(os.Environ(),
|
|
|
|
|
"ACID_ROOT="+rt.wd,
|
|
|
|
|
"ACID_RUNNER="+rt.DB.Runner,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// update stores the running task's state in the database.
|
|
|
|
|
func (rt *RunningTask) update() error {
|
|
|
|
|
for _, i := range []struct {
|
|
|
|
|
tw *terminalWriter
|
|
|
|
|
log *[]byte
|
|
|
|
|
}{
|
|
|
|
|
{&rt.RunLog, &rt.DB.RunLog},
|
|
|
|
|
{&rt.TaskLog, &rt.DB.TaskLog},
|
|
|
|
|
{&rt.DeployLog, &rt.DB.DeployLog},
|
|
|
|
|
} {
|
2024-12-25 22:18:30 +01:00
|
|
|
|
i.tw.Lock()
|
|
|
|
|
defer i.tw.Unlock()
|
|
|
|
|
if *i.log = i.tw.Serialize(0); *i.log == nil {
|
2024-12-22 09:00:02 +01:00
|
|
|
|
*i.log = []byte{}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return rt.DB.update()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ~~~ Deploy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
|
|
|
|
func executorDownloadNode(sc *sftp.Client, remotePath, localPath string,
|
|
|
|
|
info os.FileInfo) error {
|
|
|
|
|
if info.IsDir() {
|
|
|
|
|
// Hoping that caller invokes us on parents first.
|
|
|
|
|
return os.MkdirAll(localPath, info.Mode().Perm())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
src, err := sc.Open(remotePath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to open remote file %s: %w", remotePath, err)
|
|
|
|
|
}
|
|
|
|
|
defer src.Close()
|
|
|
|
|
|
|
|
|
|
dst, err := os.OpenFile(
|
|
|
|
|
localPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, info.Mode().Perm())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to create local file: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer dst.Close()
|
|
|
|
|
|
|
|
|
|
if _, err = io.Copy(dst, src); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to copy file from remote %s to local %s: %w",
|
|
|
|
|
remotePath, localPath, err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func executorDownload(client *ssh.Client, remoteRoot, localRoot string) error {
|
|
|
|
|
sc, err := sftp.NewClient(client)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer sc.Close()
|
|
|
|
|
|
|
|
|
|
walker := sc.Walk(remoteRoot)
|
|
|
|
|
for walker.Step() {
|
|
|
|
|
if walker.Err() != nil {
|
|
|
|
|
return walker.Err()
|
|
|
|
|
}
|
|
|
|
|
relativePath, err := filepath.Rel(remoteRoot, walker.Path())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err = executorDownloadNode(sc, walker.Path(),
|
|
|
|
|
filepath.Join(localRoot, relativePath), walker.Stat()); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func executorLocalShell() string {
|
|
|
|
|
if shell := os.Getenv("SHELL"); shell != "" {
|
|
|
|
|
return shell
|
|
|
|
|
}
|
|
|
|
|
// The os/user package doesn't store the parsed out shell field.
|
|
|
|
|
return "/bin/sh"
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-24 21:05:26 +01:00
|
|
|
|
func executorTmpDir(fallback string) string {
|
|
|
|
|
// See also: https://systemd.io/TEMPORARY_DIRECTORIES/
|
|
|
|
|
if tmp := os.Getenv("TMPDIR"); tmp != "" {
|
|
|
|
|
return tmp
|
|
|
|
|
}
|
|
|
|
|
return fallback
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
func executorDeploy(
|
|
|
|
|
ctx context.Context, client *ssh.Client, rt *RunningTask) error {
|
|
|
|
|
script := bytes.NewBuffer(nil)
|
|
|
|
|
if err := rt.tmplDeploy.Execute(script, &rt.DB); err != nil {
|
|
|
|
|
return &executorError{"Deploy template failed", err}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Thus the deployment directory must exist iff the script is not empty.
|
|
|
|
|
if script.Len() == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We expect the files to be moved elsewhere on the filesystem,
|
|
|
|
|
// and they may get very large, so avoid /tmp.
|
2024-12-24 21:05:26 +01:00
|
|
|
|
dir := filepath.Join(executorTmpDir("/var/tmp"), "acid-deploy")
|
2024-12-22 09:00:02 +01:00
|
|
|
|
if err := os.RemoveAll(dir); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := os.Mkdir(dir, 0755); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The passed remoteRoot is relative to sc.Getwd.
|
|
|
|
|
if err := executorDownload(client, "acid-deploy", dir); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cmd := exec.CommandContext(ctx, executorLocalShell(), "-c", script.String())
|
|
|
|
|
cmd.Env = rt.localEnv()
|
|
|
|
|
cmd.Dir = dir
|
|
|
|
|
cmd.Stdout = &rt.DeployLog
|
|
|
|
|
cmd.Stderr = &rt.DeployLog
|
|
|
|
|
return cmd.Run()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ~~~ Build ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
|
|
|
|
func executorBuild(
|
|
|
|
|
ctx context.Context, client *ssh.Client, rt *RunningTask) error {
|
|
|
|
|
// This is here to fail early, though logically it is misplaced.
|
|
|
|
|
script := bytes.NewBuffer(nil)
|
|
|
|
|
if err := rt.tmplScript.Execute(script, &rt.DB); err != nil {
|
|
|
|
|
return &executorError{"Script template failed", err}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
session, err := client.NewSession()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return &executorError{"SSH failure", err}
|
|
|
|
|
}
|
|
|
|
|
defer session.Close()
|
|
|
|
|
|
|
|
|
|
modes := ssh.TerminalModes{ssh.ECHO: 0}
|
|
|
|
|
if err := session.RequestPty("dumb", 24, 80, modes); err != nil {
|
|
|
|
|
return &executorError{"SSH failure", err}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Printf("task %d for %s: connected\n", rt.DB.ID, rt.DB.FullName())
|
|
|
|
|
|
|
|
|
|
session.Stdout = &rt.TaskLog
|
|
|
|
|
session.Stderr = &rt.TaskLog
|
|
|
|
|
|
|
|
|
|
// Although passing the script directly takes away the option to specify
|
|
|
|
|
// a particular shell (barring here-documents!), it is simple and reliable.
|
|
|
|
|
//
|
|
|
|
|
// Passing the script over Stdin to sh tended to end up with commands
|
|
|
|
|
// eating the script during processing, and resulted in a hang,
|
|
|
|
|
// because closing the Stdin does not result in remote processes
|
|
|
|
|
// getting a stream of EOF.
|
|
|
|
|
//
|
|
|
|
|
// Piping the script into `cat | sh` while appending a ^D to the end of it
|
|
|
|
|
// appeared to work, but it seems likely that commands might still steal
|
|
|
|
|
// script bytes from the cat program if they choose to read from the tty
|
|
|
|
|
// and the script is longer than the buffer.
|
|
|
|
|
chSession := make(chan error, 1)
|
|
|
|
|
go func() {
|
|
|
|
|
chSession <- session.Run(script.String())
|
|
|
|
|
close(chSession)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
// Either shutdown, or early runner termination.
|
|
|
|
|
// The runner is not supposed to finish before the session.
|
|
|
|
|
err = context.Cause(ctx)
|
|
|
|
|
case err = <-chSession:
|
|
|
|
|
// Killing a runner may perfectly well trigger this first,
|
|
|
|
|
// in particular when it's on the same machine.
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
|
|
|
|
|
|
// executorError describes a taskStateError.
|
|
|
|
|
type executorError struct {
|
|
|
|
|
Detail string
|
|
|
|
|
Err error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *executorError) Unwrap() error { return e.Err }
|
|
|
|
|
func (e *executorError) Error() string {
|
|
|
|
|
return fmt.Sprintf("%s: %s", e.Detail, e.Err)
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
func executorConnect(
|
|
|
|
|
ctx context.Context, config *ssh.ClientConfig, address string) (
|
|
|
|
|
*ssh.Client, error) {
|
|
|
|
|
deadline := time.Now().Add(3 * time.Minute)
|
|
|
|
|
ctxDeadlined, cancel := context.WithDeadline(ctx, deadline)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
var d net.Dialer
|
|
|
|
|
for {
|
|
|
|
|
// net.DNSError eats the cause, as in it cannot be unwrapped
|
|
|
|
|
// and tested for a particular subtype.
|
|
|
|
|
conn, err := d.DialContext(ctxDeadlined, "tcp", address)
|
|
|
|
|
if e := ctxDeadlined.Err(); e != nil {
|
|
|
|
|
// This may provide a little bit more information.
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return nil, e
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We ignore the parent context, but at least we try.
|
|
|
|
|
conn.SetDeadline(deadline)
|
|
|
|
|
sc, chans, reqs, err := ssh.NewClientConn(conn, address, config)
|
|
|
|
|
conn.SetDeadline(time.Time{})
|
|
|
|
|
|
|
|
|
|
// cloud-init-enabled machines, such as OpenBSD,
|
|
|
|
|
// may have a race condition between sshd starting for the first time,
|
|
|
|
|
// and having a configured user.
|
|
|
|
|
//
|
|
|
|
|
// Authentication may therefore regularly fail,
|
|
|
|
|
// and we need to ignore all errors whatsoever,
|
|
|
|
|
// not just spurious partial successes resulting in RST or FIN.
|
|
|
|
|
var neterr net.Error
|
|
|
|
|
if errors.As(err, &neterr) || errors.Is(err, io.EOF) || err != nil {
|
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
return ssh.NewClient(sc, chans, reqs), nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func executorRunTask(ctx context.Context, task Task) error {
|
2024-12-22 09:00:02 +01:00
|
|
|
|
rt, err := newRunningTask(task)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
if err != nil {
|
2024-12-22 09:00:02 +01:00
|
|
|
|
task.State, task.Detail = taskStateError, "Misconfigured"
|
|
|
|
|
task.Notified = 0
|
|
|
|
|
task.RunLog = []byte(err.Error())
|
|
|
|
|
task.TaskLog = []byte{}
|
|
|
|
|
task.DeployLog = []byte{}
|
|
|
|
|
return task.update()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
2024-12-24 21:05:26 +01:00
|
|
|
|
defer rt.close()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
ctx, cancelTimeout := context.WithTimeout(ctx, rt.timeout)
|
2024-04-19 04:26:48 +02:00
|
|
|
|
defer cancelTimeout()
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// RunningTasks can be concurrently accessed by HTTP handlers.
|
|
|
|
|
locked := func(f func()) {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
gRunningMutex.Lock()
|
|
|
|
|
defer gRunningMutex.Unlock()
|
2024-12-22 09:00:02 +01:00
|
|
|
|
f()
|
|
|
|
|
}
|
|
|
|
|
locked(func() {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateRunning, ""
|
|
|
|
|
rt.DB.Notified = 0
|
|
|
|
|
rt.DB.RunLog = []byte{}
|
|
|
|
|
rt.DB.TaskLog = []byte{}
|
2024-12-22 09:00:02 +01:00
|
|
|
|
rt.DB.DeployLog = []byte{}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
gRunning[rt.DB.ID] = rt
|
2024-12-22 09:00:02 +01:00
|
|
|
|
})
|
|
|
|
|
defer locked(func() {
|
|
|
|
|
delete(gRunning, rt.DB.ID)
|
|
|
|
|
})
|
|
|
|
|
if err := rt.update(); err != nil {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
return fmt.Errorf("SQL: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Errors happening while trying to write an error are unfortunate,
|
|
|
|
|
// but not important enough to abort entirely.
|
|
|
|
|
setError := func(detail string) {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateError, detail
|
2024-12-22 09:00:02 +01:00
|
|
|
|
if err := rt.update(); err != nil {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
log.Printf("error: task %d for %s: SQL: %s",
|
|
|
|
|
rt.DB.ID, rt.DB.FullName(), err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cmd := exec.CommandContext(ctx, rt.Runner.Run)
|
2024-12-22 09:00:02 +01:00
|
|
|
|
cmd.Env = rt.localEnv()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-04-08 03:22:51 +02:00
|
|
|
|
// Pushing the runner into a new process group that can be killed at once
|
|
|
|
|
// with all its children isn't bullet-proof, it messes with job control
|
|
|
|
|
// when acid is run from an interactive shell, and it also seems avoidable
|
|
|
|
|
// (use "exec" in runner scripts, so that VMs take over the process).
|
|
|
|
|
// Maybe this is something that could be opt-in.
|
|
|
|
|
/*
|
|
|
|
|
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
|
|
|
|
cmd.Cancel = func() error {
|
|
|
|
|
err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
|
|
|
|
if err == syscall.ESRCH {
|
|
|
|
|
return os.ErrProcessDone
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
log.Printf("task %d for %s: starting %s\n",
|
|
|
|
|
rt.DB.ID, rt.DB.FullName(), rt.Runner.Name)
|
|
|
|
|
|
|
|
|
|
cmd.Stdout = &rt.RunLog
|
|
|
|
|
cmd.Stderr = &rt.RunLog
|
|
|
|
|
if err := cmd.Start(); err != nil {
|
2024-12-22 09:00:02 +01:00
|
|
|
|
fmt.Fprintf(&rt.TaskLog, "%s\n", err)
|
|
|
|
|
locked(func() { setError("Runner failed to start") })
|
2024-03-29 14:08:15 +01:00
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctxRunner, cancelRunner := context.WithCancelCause(ctx)
|
|
|
|
|
defer cancelRunner(context.Canceled)
|
|
|
|
|
go func() {
|
|
|
|
|
if err := cmd.Wait(); err != nil {
|
|
|
|
|
cancelRunner(err)
|
|
|
|
|
} else {
|
|
|
|
|
cancelRunner(errors.New("runner exited successfully but early"))
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
defer func() {
|
|
|
|
|
_ = cmd.Process.Signal(os.Interrupt)
|
|
|
|
|
select {
|
|
|
|
|
case <-ctxRunner.Done():
|
|
|
|
|
// This doesn't leave the runner almost any time on our shutdown,
|
|
|
|
|
// but whatever--they're supposed to be ephemeral.
|
|
|
|
|
case <-time.After(5 * time.Second):
|
|
|
|
|
}
|
2024-04-08 03:22:51 +02:00
|
|
|
|
_ = cmd.Cancel()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
client, err := executorConnect(ctxRunner, &ssh.ClientConfig{
|
|
|
|
|
User: rt.Runner.SSH.User,
|
2024-12-22 09:00:02 +01:00
|
|
|
|
Auth: []ssh.AuthMethod{ssh.PublicKeys(rt.signer)},
|
2024-03-29 14:08:15 +01:00
|
|
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
|
|
|
}, rt.Runner.SSH.Address)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Fprintf(&rt.TaskLog, "%s\n", err)
|
2024-12-22 09:00:02 +01:00
|
|
|
|
locked(func() { setError("SSH failure") })
|
2024-03-29 14:08:15 +01:00
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer client.Close()
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
var (
|
|
|
|
|
ee1 *ssh.ExitError
|
|
|
|
|
ee2 *executorError
|
|
|
|
|
)
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
err = executorBuild(ctxRunner, client, rt)
|
|
|
|
|
if err != nil {
|
|
|
|
|
locked(func() {
|
|
|
|
|
if errors.As(err, &ee1) {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateFailed, "Scripts failed"
|
|
|
|
|
fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
|
|
|
|
|
} else if errors.As(err, &ee2) {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateError, ee2.Detail
|
|
|
|
|
fmt.Fprintf(&rt.TaskLog, "\n%s\n", ee2.Err)
|
|
|
|
|
} else {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateError, ""
|
|
|
|
|
fmt.Fprintf(&rt.TaskLog, "\n%s\n", err)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
return rt.update()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// This is so that it doesn't stay hanging within the sftp package,
|
|
|
|
|
// which uses context.Background() everywhere.
|
2024-03-29 14:08:15 +01:00
|
|
|
|
go func() {
|
2024-12-22 09:00:02 +01:00
|
|
|
|
<-ctxRunner.Done()
|
|
|
|
|
client.Close()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}()
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
err = executorDeploy(ctxRunner, client, rt)
|
|
|
|
|
locked(func() {
|
|
|
|
|
if err == nil {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateSuccess, ""
|
|
|
|
|
} else if errors.As(err, &ee1) {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateFailed, "Deployment failed"
|
|
|
|
|
fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
|
|
|
|
|
} else if errors.As(err, &ee2) {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateError, ee2.Detail
|
|
|
|
|
fmt.Fprintf(&rt.DeployLog, "\n%s\n", ee2.Err)
|
|
|
|
|
} else {
|
|
|
|
|
rt.DB.State, rt.DB.Detail = taskStateError, ""
|
|
|
|
|
fmt.Fprintf(&rt.DeployLog, "\n%s\n", err)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
return rt.update()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func executorRun(ctx context.Context) error {
|
|
|
|
|
tasks, err := getTasks(ctx, `WHERE state = ? OR state = ? ORDER BY id ASC`,
|
|
|
|
|
taskStateNew, taskStateRunning)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, task := range tasks {
|
|
|
|
|
if err := executorRunTask(ctx, task); err != nil {
|
|
|
|
|
return fmt.Errorf("task %d for %s: %w",
|
|
|
|
|
task.ID, task.FullName(), err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func executor(ctx context.Context) {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-gExecutorSignal:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := executorRun(ctx); err != nil {
|
|
|
|
|
log.Printf("error: executor: %s\n", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func executorAwaken() {
|
|
|
|
|
select {
|
|
|
|
|
case gExecutorSignal <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- Main --------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
type taskState int64
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
taskStateNew taskState = iota // → · pending (queued)
|
|
|
|
|
taskStateRunning // → · pending (running)
|
|
|
|
|
taskStateError // → ! error (internal issue)
|
|
|
|
|
taskStateFailed // → × failure (runner issue)
|
|
|
|
|
taskStateSuccess // → ✓ success (runner finished)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (ts taskState) String() string {
|
|
|
|
|
switch ts {
|
|
|
|
|
case taskStateNew:
|
|
|
|
|
return "New"
|
|
|
|
|
case taskStateRunning:
|
|
|
|
|
return "Running"
|
|
|
|
|
case taskStateError:
|
|
|
|
|
return "Error"
|
|
|
|
|
case taskStateFailed:
|
|
|
|
|
return "Failed"
|
|
|
|
|
case taskStateSuccess:
|
|
|
|
|
return "Success"
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Sprintf("%d", ts)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Task mirrors SQL task table records, adding a few convenience methods.
|
|
|
|
|
type Task struct {
|
|
|
|
|
ID int64
|
|
|
|
|
|
|
|
|
|
Owner string
|
|
|
|
|
Repo string
|
|
|
|
|
Hash string
|
|
|
|
|
Runner string
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
State taskState
|
|
|
|
|
Detail string
|
|
|
|
|
Notified int64
|
|
|
|
|
RunLog []byte
|
|
|
|
|
TaskLog []byte
|
|
|
|
|
DeployLog []byte
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *Task) FullName() string { return t.Owner + "/" + t.Repo }
|
|
|
|
|
|
|
|
|
|
func (t *Task) RunnerName() string {
|
|
|
|
|
if runner, ok := gConfig.Runners[t.Runner]; !ok {
|
|
|
|
|
return t.Runner
|
|
|
|
|
} else {
|
|
|
|
|
return runner.Name
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *Task) URL() string {
|
|
|
|
|
return fmt.Sprintf("%s/task/%d", gConfig.Root, t.ID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *Task) RepoURL() string {
|
|
|
|
|
return fmt.Sprintf("%s/%s/%s", gConfig.Gitea, t.Owner, t.Repo)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *Task) CommitURL() string {
|
|
|
|
|
return fmt.Sprintf("%s/%s/%s/commit/%s",
|
|
|
|
|
gConfig.Gitea, t.Owner, t.Repo, t.Hash)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *Task) CloneURL() string {
|
|
|
|
|
return fmt.Sprintf("%s/%s/%s.git", gConfig.Gitea, t.Owner, t.Repo)
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
func (t *Task) update() error {
|
|
|
|
|
_, err := gDB.ExecContext(context.Background(), `UPDATE task
|
|
|
|
|
SET state = ?, detail = ?, notified = ?,
|
|
|
|
|
runlog = ?, tasklog = ?, deploylog = ? WHERE id = ?`,
|
|
|
|
|
t.State, t.Detail, t.Notified,
|
|
|
|
|
t.RunLog, t.TaskLog, t.DeployLog, t.ID)
|
|
|
|
|
if err == nil {
|
|
|
|
|
notifierAwaken()
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
|
|
|
|
|
|
const initializeSQL = `
|
|
|
|
|
PRAGMA application_id = 0x61636964; -- "acid" in big endian
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
CREATE TABLE IF NOT EXISTS task(
|
2024-12-22 09:00:02 +01:00
|
|
|
|
id INTEGER NOT NULL, -- unique ID
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
owner TEXT NOT NULL, -- Gitea username
|
|
|
|
|
repo TEXT NOT NULL, -- Gitea repository name
|
|
|
|
|
hash TEXT NOT NULL, -- commit hash
|
|
|
|
|
runner TEXT NOT NULL, -- the runner to use
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
state INTEGER NOT NULL DEFAULT 0, -- task state
|
|
|
|
|
detail TEXT NOT NULL DEFAULT '', -- task state detail
|
|
|
|
|
notified INTEGER NOT NULL DEFAULT 0, -- Gitea knows the state
|
|
|
|
|
runlog BLOB NOT NULL DEFAULT x'', -- combined task runner output
|
|
|
|
|
tasklog BLOB NOT NULL DEFAULT x'', -- combined task SSH output
|
|
|
|
|
deploylog BLOB NOT NULL DEFAULT x'', -- deployment output
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
|
|
|
|
PRIMARY KEY (id)
|
|
|
|
|
) STRICT;
|
|
|
|
|
`
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
func dbEnsureColumn(tx *sql.Tx, table, column, definition string) error {
|
|
|
|
|
var count int64
|
|
|
|
|
if err := tx.QueryRow(
|
|
|
|
|
`SELECT count(*) FROM pragma_table_info(?) WHERE name = ?`,
|
|
|
|
|
table, column).Scan(&count); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
} else if count == 1 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err := tx.Exec(
|
|
|
|
|
`ALTER TABLE ` + table + ` ADD COLUMN ` + column + ` ` + definition)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func dbOpen(path string) error {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
var err error
|
|
|
|
|
gDB, err = sql.Open("sqlite3",
|
|
|
|
|
"file:"+path+"?_foreign_keys=1&_busy_timeout=1000")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
tx, err := gDB.BeginTx(context.Background(), nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer tx.Rollback()
|
|
|
|
|
|
|
|
|
|
var version int64
|
|
|
|
|
if err = tx.QueryRow(`PRAGMA user_version`).Scan(&version); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch version {
|
|
|
|
|
case 0:
|
|
|
|
|
if _, err = tx.Exec(initializeSQL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We had not initially set a database schema version,
|
|
|
|
|
// so we're stuck checking this column even on new databases.
|
|
|
|
|
if err = dbEnsureColumn(tx,
|
|
|
|
|
`task`, `deploylog`, `BLOB NOT NULL DEFAULT x''`); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
case 1:
|
|
|
|
|
// The next migration goes here, remember to increment the number below.
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, err = tx.Exec(
|
|
|
|
|
`PRAGMA user_version = ` + strconv.Itoa(1)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return tx.Commit()
|
2024-03-29 14:08:15 +01:00
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
// callRPC forwards command line commands to a running server.
|
|
|
|
|
func callRPC(args []string) error {
|
|
|
|
|
body, err := json.Marshal(args)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req, err := http.NewRequest(http.MethodPost,
|
|
|
|
|
fmt.Sprintf("%s/rpc", gConfig.Root), bytes.NewReader(body))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req.Header.Set(rpcHeaderSignature, giteaSign(body))
|
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
|
|
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
|
|
if _, err = io.Copy(os.Stdout, resp.Body); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
|
|
|
os.Exit(1)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-25 22:18:30 +01:00
|
|
|
|
// filterTTY exposes the internal virtual terminal filter.
|
|
|
|
|
func filterTTY(path string) {
|
|
|
|
|
var r io.Reader = os.Stdin
|
|
|
|
|
if path != "-" {
|
|
|
|
|
if f, err := os.Open(path); err != nil {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
} else {
|
|
|
|
|
r = f
|
|
|
|
|
defer f.Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var tw terminalWriter
|
|
|
|
|
if _, err := io.Copy(&tw, r); err != nil {
|
|
|
|
|
log.Printf("%s: %s\n", path, err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := os.Stdout.Write(tw.Serialize(0)); err != nil {
|
|
|
|
|
log.Printf("%s: %s\n", path, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-29 14:08:15 +01:00
|
|
|
|
func main() {
|
|
|
|
|
version := flag.Bool("version", false, "show version and exit")
|
2024-12-25 22:18:30 +01:00
|
|
|
|
tty := flag.Bool("tty", false, "run the internal virtual terminal filter")
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
|
|
|
|
flag.Usage = func() {
|
|
|
|
|
f := flag.CommandLine.Output()
|
|
|
|
|
fmt.Fprintf(f,
|
|
|
|
|
"Usage: %s [OPTION]... CONFIG [COMMAND...]\n", os.Args[0])
|
|
|
|
|
flag.PrintDefaults()
|
|
|
|
|
}
|
|
|
|
|
flag.Parse()
|
|
|
|
|
if flag.NArg() < 1 {
|
|
|
|
|
flag.Usage()
|
|
|
|
|
os.Exit(2)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if *version {
|
|
|
|
|
fmt.Printf("%s %s\n", projectName, projectVersion)
|
|
|
|
|
return
|
|
|
|
|
}
|
2024-12-25 22:18:30 +01:00
|
|
|
|
if *tty {
|
|
|
|
|
for _, path := range flag.Args() {
|
|
|
|
|
filterTTY(path)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
2024-03-29 14:08:15 +01:00
|
|
|
|
|
|
|
|
|
if err := parseConfig(flag.Arg(0)); err != nil {
|
|
|
|
|
log.Fatalln(err)
|
|
|
|
|
}
|
|
|
|
|
if flag.NArg() > 1 {
|
|
|
|
|
if err := callRPC(flag.Args()[1:]); err != nil {
|
|
|
|
|
log.Fatalln(err)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-22 09:00:02 +01:00
|
|
|
|
if err := dbOpen(gConfig.DB); err != nil {
|
2024-03-29 14:08:15 +01:00
|
|
|
|
log.Fatalln(err)
|
|
|
|
|
}
|
|
|
|
|
defer gDB.Close()
|
|
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
ctx, stop := signal.NotifyContext(
|
|
|
|
|
context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
|
|
|
|
|
|
server := &http.Server{Addr: gConfig.Listen}
|
|
|
|
|
http.HandleFunc("/{$}", handleTasks)
|
|
|
|
|
http.HandleFunc("/task/{id}", handleTask)
|
|
|
|
|
http.HandleFunc("/push", handlePush)
|
|
|
|
|
http.HandleFunc("/rpc", handleRPC)
|
|
|
|
|
|
|
|
|
|
ln, err := (&net.ListenConfig{}).Listen(ctx, "tcp", server.Addr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatalln(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
notifierAwaken()
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
notifier(ctx)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
executorAwaken()
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
executor(ctx)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
defer stop()
|
|
|
|
|
if err := server.Serve(ln); err != http.ErrServerClosed {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Wait until we either receive a signal, or get a server failure.
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
log.Println("shutting down")
|
|
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
if err := server.Shutdown(context.Background()); err != nil {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Repeated signal deliveries during shutdown assume default behaviour.
|
|
|
|
|
// This might or might not be desirable.
|
|
|
|
|
stop()
|
|
|
|
|
wg.Wait()
|
|
|
|
|
}
|