feat: add dogfood background daemon mode

This commit is contained in:
Nikhil Sonti
2026-04-27 16:17:53 -07:00
parent a11f9caa64
commit d4b56d4aa7
24 changed files with 2436 additions and 122 deletions

View File

@@ -83,6 +83,9 @@ The dev profile defaults to:
browseros-dogfood start
```
`start` runs inline. It holds the browseros-dogfood runtime lock until you press `Ctrl+C`, so
another inline or background environment cannot start at the same time.
Each start:
- Warns if the configured checkout has uncommitted changes.
@@ -130,6 +133,53 @@ To print the log directory and file paths:
browseros-dogfood logs
```
## Background Mode
```bash
browseros-dogfood start-background
```
`start-background` starts the same BrowserOS dogfooding environment under a
user-level background daemon, streams startup progress, waits for the local
server `/health` endpoint to report a CDP-connected BrowserOS, and then returns.
It does not install a root daemon or configure login startup.
The inline and background modes share one OS file lock. If either mode is already
running, a second `browseros-dogfood start` or `browseros-dogfood start-background`
exits with an error. Crash recovery is automatic: when the owning process exits,
macOS releases the lock; the next start cleans up stale socket and state files.
Background control commands:
```bash
browseros-dogfood status
browseros-dogfood stop
browseros-dogfood restart
browseros-dogfood restart --pull
browseros-dogfood restart --pull --force
browseros-dogfood logs tail
browseros-dogfood logs tail --filter daemon
browseros-dogfood logs tail --filter chromium
browseros-dogfood logs tail --filter server
```
- `status` shows daemon state, PID, uptime, ports, and the structured log path.
- `stop` stops the background daemon and its BrowserOS/server child processes.
- `restart` rebuilds from the current checkout, then restarts BrowserOS and the server.
- `restart --pull` refuses dirty checkouts, runs `git pull --ff-only`, rebuilds, and restarts.
- `restart --pull --force` is destructive: it runs `git fetch --prune`, resets hard
to the configured upstream branch, rebuilds, and restarts.
- `logs tail` follows grouped daemon/chromium/server logs from the background daemon.
Pressing Ctrl-C while `start-background` or `restart` is streaming startup logs
detaches from the monitor. It does not stop the daemon.
If no background daemon is running, control commands tell you to start one with:
```bash
browseros-dogfood start-background
```
## Update The Checkout
`browseros-dogfood start` intentionally does not pull. To update the configured repo:

View File

@@ -0,0 +1,401 @@
package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"browseros-dogfood/ipc"
"browseros-dogfood/proc"
"browseros-dogfood/runlog"
dogfoodruntime "browseros-dogfood/runtime"
"github.com/spf13/cobra"
)
var restartPull bool
var restartForce bool
var logsFilter string
var logsLines int
const defaultMonitorPollInterval = 250 * time.Millisecond
type daemonMonitor struct {
Paths runPaths
Out io.Writer
Filter string
FromStart bool
Detach <-chan struct{}
Detached *bool
PollInterval time.Duration
Status func() (ipc.Response, error)
Follow func(context.Context, func(runlog.Entry)) error
}
var statusCmd = &cobra.Command{
Use: "status",
Short: "Show browseros-dogfood background daemon status",
GroupID: groupInspect,
RunE: func(cmd *cobra.Command, args []string) error {
resp, err := sendControl(ipc.Request{Command: ipc.CmdStatus})
if err != nil {
return err
}
printStatus(resp.Data)
return nil
},
}
var stopCmd = &cobra.Command{
Use: "stop",
Short: "Stop the browseros-dogfood background daemon",
GroupID: groupRun,
RunE: func(cmd *cobra.Command, args []string) error {
if _, err := sendControl(ipc.Request{Command: ipc.CmdStop}); err != nil {
return err
}
fmt.Printf("%s browseros-dogfood background daemon\n", successStyle.Sprint("Stopping:"))
return nil
},
}
var restartCmd = &cobra.Command{
Use: "restart",
Short: "Rebuild/restart current checkout; --pull updates, --pull --force resets",
GroupID: groupRun,
RunE: func(cmd *cobra.Command, args []string) error {
request, err := buildRestartRequest(restartPull, restartForce)
if err != nil {
return err
}
paths, err := defaultRunPaths()
if err != nil {
return err
}
if _, err := sendControlWithPaths(paths, request); err != nil {
return err
}
fmt.Fprintln(os.Stdout, successStyle.Sprint("Restart requested."))
detach, cleanup := newInterruptDetach()
defer cleanup()
detached := false
if err := monitorDaemonUntilRunning(cmd.Context(), daemonMonitor{
Paths: paths,
Out: os.Stdout,
Detach: detach,
Detached: &detached,
}); err != nil {
return err
}
if !detached {
fmt.Printf("%s browseros-dogfood background environment is healthy\n", successStyle.Sprint("Ready:"))
}
return nil
},
}
var logsTailCmd = &cobra.Command{
Use: "tail",
Short: "Tail daemon, chromium, and server logs from the background daemon",
RunE: func(cmd *cobra.Command, args []string) error {
paths, err := defaultRunPaths()
if err != nil {
return err
}
resp, err := sendControlWithPaths(paths, ipc.Request{Command: ipc.CmdStatus})
if err != nil {
return err
}
logPath := logPathFromStatusData(resp.Data, paths.Log)
entries, err := runlog.ReadLast(logPath, logsLines, logsFilter)
if err != nil && !os.IsNotExist(err) {
return err
}
for _, entry := range entries {
fmt.Println(formatRunLogEntry(entry))
}
return followRunLogFromEnd(cmd.Context(), logPath, logsFilter, func(entry runlog.Entry) {
fmt.Println(formatRunLogEntry(entry))
})
},
}
func init() {
restartCmd.Flags().BoolVar(&restartPull, "pull", false, "Pull latest upstream changes before rebuilding and restarting")
restartCmd.Flags().BoolVar(&restartForce, "force", false, "With --pull, reset to upstream before rebuilding and restarting")
logsTailCmd.Flags().StringVar(&logsFilter, "filter", "", "Only show daemon, chromium, or server logs")
logsTailCmd.Flags().IntVarP(&logsLines, "lines", "n", 80, "Number of existing log lines to show before following")
logsCmd.AddCommand(logsTailCmd)
rootCmd.AddCommand(statusCmd, stopCmd, restartCmd)
}
func buildRestartRequest(pull bool, force bool) (ipc.Request, error) {
if force && !pull {
return ipc.Request{}, fmt.Errorf("--force requires --pull")
}
request := ipc.Request{Command: ipc.CmdRestart}
if pull || force {
request.Args = map[string]string{}
}
if pull {
request.Args["pull"] = "true"
}
if force {
request.Args["force"] = "true"
}
return request, nil
}
func sendControl(req ipc.Request) (ipc.Response, error) {
paths, err := defaultRunPaths()
if err != nil {
return ipc.Response{}, err
}
return sendControlWithPaths(paths, req)
}
func sendControlWithPaths(paths runPaths, req ipc.Request) (ipc.Response, error) {
resp, err := ipc.NewClient(paths.Socket).Send(req)
if err != nil {
return ipc.Response{}, daemonUnavailableError(paths, err)
}
if resp.Error != "" {
return ipc.Response{}, errors.New(resp.Error)
}
return resp, nil
}
func daemonUnavailableError(paths runPaths, cause error) error {
lock, err := dogfoodruntime.AcquireLock(paths.Lock)
if err == nil {
_ = lock.Close()
_ = dogfoodruntime.CleanupStaleRunFiles(paths.State)
return cause
}
if errors.Is(err, dogfoodruntime.ErrAlreadyRunning) {
state, stateErr := dogfoodruntime.ReadRunState(paths.State)
if stateErr == nil && state.Mode == "foreground" {
return fmt.Errorf("browseros-dogfood is running in foreground mode (pid %d); background daemon commands are unavailable", state.PID)
}
return fmt.Errorf("browseros-dogfood background daemon is not responding; try `browseros-dogfood stop` if it is stuck, then `browseros-dogfood start-background`")
}
return err
}
func printStatus(data any) {
fmt.Print(formatStatus(data))
}
func monitorDaemonUntilRunning(ctx context.Context, monitor daemonMonitor) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
out := monitor.Out
if out == nil {
out = io.Discard
}
pollInterval := monitor.PollInterval
if pollInterval <= 0 {
pollInterval = defaultMonitorPollInterval
}
status := monitor.Status
if status == nil {
status = func() (ipc.Response, error) {
return sendControlWithPaths(monitor.Paths, ipc.Request{Command: ipc.CmdStatus})
}
}
follow := monitor.Follow
if follow == nil {
follow = func(ctx context.Context, onEntry func(runlog.Entry)) error {
return followRunLog(ctx, monitor.Paths.Log, monitor.Filter, monitor.FromStart, onEntry)
}
}
followErr := make(chan error, 1)
go func() {
followErr <- follow(ctx, func(entry runlog.Entry) {
fmt.Fprintln(out, formatRunLogEntry(entry))
})
}()
if done, err := daemonReachedTerminalState(status); done || err != nil {
return err
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-monitor.Detach:
if monitor.Detached != nil {
*monitor.Detached = true
}
fmt.Fprintf(out, "%s daemon still running. Run %s to reattach.\n", warnStyle.Sprint("Detached;"), commandStyle.Sprint("browseros-dogfood logs tail"))
return nil
case err := <-followErr:
if err != nil && ctx.Err() == nil {
return err
}
case <-ticker.C:
if done, err := daemonReachedTerminalState(status); done || err != nil {
return err
}
}
}
}
func daemonReachedTerminalState(status func() (ipc.Response, error)) (bool, error) {
resp, err := status()
if err != nil {
return false, err
}
state, lastError := monitorStatus(resp.Data)
switch state {
case "running":
return true, nil
case "error":
if lastError == "" {
lastError = "daemon entered error state"
}
return true, fmt.Errorf("%s; run `browseros-dogfood logs tail` for details", lastError)
default:
return false, nil
}
}
func monitorStatus(data any) (string, string) {
status, ok := data.(map[string]any)
if !ok {
return "", ""
}
state, _ := stringValue(status["state"])
lastError, _ := stringValue(status["last_error"])
return state, lastError
}
func followRunLogFromStart(ctx context.Context, path string, filter string, onEntry func(runlog.Entry)) error {
return followRunLog(ctx, path, filter, true, onEntry)
}
func followRunLogFromEnd(ctx context.Context, path string, filter string, onEntry func(runlog.Entry)) error {
return followRunLog(ctx, path, filter, false, onEntry)
}
func followRunLog(ctx context.Context, path string, filter string, fromStart bool, onEntry func(runlog.Entry)) error {
for {
var err error
if fromStart {
err = runlog.FollowFromStartWithContext(ctx, path, filter, onEntry)
} else {
err = runlog.FollowWithContext(ctx, path, filter, onEntry)
}
if err == nil || !os.IsNotExist(err) {
return err
}
select {
case <-ctx.Done():
return nil
case <-time.After(100 * time.Millisecond):
}
}
}
func formatRunLogEntry(entry runlog.Entry) string {
tag := entry.Tag
if tag == "browser" {
tag = "chromium"
}
style := dimStyle
switch entry.Tag {
case "daemon":
style = warnStyle
case "browser":
style = proc.TagBrowser.Color
case "server":
style = proc.TagServer.Color
}
return fmt.Sprintf("%s %s %s", entry.Time.Format("15:04:05"), style.Sprintf("[%s]", tag), entry.Line)
}
func logPathFromStatusData(data any, fallback string) string {
status, ok := data.(map[string]any)
if !ok {
return fallback
}
logPath, ok := stringValue(status["log_path"])
if !ok || logPath == "" {
return fallback
}
return logPath
}
func formatStatus(data any) string {
status, ok := data.(map[string]any)
if !ok {
raw, err := json.MarshalIndent(data, "", " ")
if err != nil {
return fmt.Sprintf("%v\n", data)
}
return string(raw) + "\n"
}
var out strings.Builder
writeStringField(&out, "State", status["state"])
writeNumberField(&out, "PID", status["pid"])
writeStringField(&out, "Uptime", status["uptime"])
if operation, ok := stringValue(status["operation"]); ok && operation != "" {
fmt.Fprintf(&out, "%s %s\n", labelStyle.Sprint("Operation:"), operation)
}
if lastError, ok := stringValue(status["last_error"]); ok && lastError != "" {
fmt.Fprintf(&out, "%s %s\n", warnStyle.Sprint("Last error:"), lastError)
}
if ports, ok := status["ports"].(map[string]any); ok {
fmt.Fprintf(
&out,
"%s CDP=%d Server=%d Extension=%d\n",
labelStyle.Sprint("Ports:"),
intValue(ports["CDP"]),
intValue(ports["Server"]),
intValue(ports["Extension"]),
)
}
writeStringField(&out, "Logs", status["log_path"])
return out.String()
}
func writeStringField(out *strings.Builder, label string, value any) {
if s, ok := stringValue(value); ok && s != "" {
if label == "Logs" {
fmt.Fprintf(out, "%s %s\n", labelStyle.Sprintf("%s:", label), pathStyle.Sprint(s))
return
}
fmt.Fprintf(out, "%s %s\n", labelStyle.Sprintf("%s:", label), s)
}
}
func writeNumberField(out *strings.Builder, label string, value any) {
if n := intValue(value); n != 0 {
fmt.Fprintf(out, "%s %d\n", labelStyle.Sprintf("%s:", label), n)
}
}
func stringValue(value any) (string, bool) {
s, ok := value.(string)
return s, ok
}
func intValue(value any) int {
switch v := value.(type) {
case float64:
return int(v)
case int:
return v
default:
return 0
}
}

View File

@@ -0,0 +1,150 @@
package cmd
import (
"bytes"
"context"
"strings"
"testing"
"time"
"browseros-dogfood/ipc"
"browseros-dogfood/runlog"
)
func TestFormatStatusDataUsesHumanReadableSummary(t *testing.T) {
got := formatStatus(map[string]any{
"state": "running",
"pid": float64(123),
"uptime": "5s",
"log_path": "/tmp/browseros-dogfood/daemon.jsonl",
"ports": map[string]any{
"CDP": float64(9015),
"Server": float64(9115),
"Extension": float64(9315),
},
})
for _, want := range []string{
"State: running",
"PID: 123",
"Uptime: 5s",
"Ports: CDP=9015 Server=9115 Extension=9315",
"Logs: /tmp/browseros-dogfood/daemon.jsonl",
} {
if !strings.Contains(got, want) {
t.Fatalf("formatted status missing %q:\n%s", want, got)
}
}
}
func TestLogPathFromStatusDataPrefersDaemonValue(t *testing.T) {
got := logPathFromStatusData(map[string]any{"log_path": "/tmp/daemon.jsonl"}, "/tmp/local.jsonl")
if got != "/tmp/daemon.jsonl" {
t.Fatalf("got %q want daemon log path", got)
}
}
func TestLogPathFromStatusDataFallsBackToLocalPath(t *testing.T) {
got := logPathFromStatusData(map[string]any{}, "/tmp/local.jsonl")
if got != "/tmp/local.jsonl" {
t.Fatalf("got %q want fallback log path", got)
}
}
func TestBuildRestartRequestUsesPullAndForceArgs(t *testing.T) {
got, err := buildRestartRequest(true, true)
if err != nil {
t.Fatal(err)
}
if got.Command != ipc.CmdRestart {
t.Fatalf("command got %q want restart", got.Command)
}
if got.Args["pull"] != "true" || got.Args["force"] != "true" {
t.Fatalf("args got %#v", got.Args)
}
}
func TestBuildRestartRequestRejectsForceWithoutPull(t *testing.T) {
if _, err := buildRestartRequest(false, true); err == nil {
t.Fatal("expected force without pull to fail")
}
}
func TestRootUsageShowsRestartPullAndOmitsUpdate(t *testing.T) {
usage := stripANSI(rootCmd.UsageString())
if !strings.Contains(usage, "restart Rebuild/restart current checkout; --pull updates, --pull --force resets") {
t.Fatalf("missing restart pull hint in\n%s", usage)
}
if strings.Contains(usage, "\n update") {
t.Fatalf("update should not appear in root usage:\n%s", usage)
}
}
func TestMonitorDaemonUntilRunningPrintsEntriesAndStops(t *testing.T) {
var out bytes.Buffer
statusCalls := 0
err := monitorDaemonUntilRunning(context.Background(), daemonMonitor{
Out: &out,
PollInterval: time.Millisecond,
Status: func() (ipc.Response, error) {
statusCalls++
if statusCalls == 1 {
return ipc.Response{OK: true, Data: map[string]any{"state": "starting"}}, nil
}
return ipc.Response{OK: true, Data: map[string]any{"state": "running"}}, nil
},
Follow: func(ctx context.Context, onEntry func(runlog.Entry)) error {
onEntry(runlog.Entry{Tag: "daemon", Line: "building agent"})
<-ctx.Done()
return nil
},
})
if err != nil {
t.Fatalf("monitor: %v", err)
}
if !strings.Contains(stripANSI(out.String()), "[daemon] building agent") {
t.Fatalf("missing log entry in\n%s", out.String())
}
}
func TestMonitorDaemonUntilRunningReturnsDaemonError(t *testing.T) {
err := monitorDaemonUntilRunning(context.Background(), daemonMonitor{
PollInterval: time.Millisecond,
Status: func() (ipc.Response, error) {
return ipc.Response{OK: true, Data: map[string]any{
"state": "error",
"last_error": "server health check failed",
}}, nil
},
Follow: func(ctx context.Context, onEntry func(runlog.Entry)) error {
<-ctx.Done()
return nil
},
})
if err == nil || !strings.Contains(err.Error(), "server health check failed") {
t.Fatalf("error got %v", err)
}
}
func TestMonitorDaemonUntilRunningDetachesOnInterrupt(t *testing.T) {
var out bytes.Buffer
detach := make(chan struct{})
close(detach)
err := monitorDaemonUntilRunning(context.Background(), daemonMonitor{
Out: &out,
Detach: detach,
PollInterval: time.Hour,
Status: func() (ipc.Response, error) {
return ipc.Response{OK: true, Data: map[string]any{"state": "starting"}}, nil
},
Follow: func(ctx context.Context, onEntry func(runlog.Entry)) error {
<-ctx.Done()
return nil
},
})
if err != nil {
t.Fatalf("monitor: %v", err)
}
if !strings.Contains(stripANSI(out.String()), "Detached; daemon still running.") {
t.Fatalf("missing detach message in\n%s", out.String())
}
}

View File

@@ -0,0 +1,555 @@
package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"browseros-dogfood/config"
"browseros-dogfood/ipc"
"browseros-dogfood/pipeline"
"browseros-dogfood/proc"
"browseros-dogfood/runlog"
dogfoodruntime "browseros-dogfood/runtime"
"github.com/spf13/cobra"
)
type runPaths struct {
Dir string
Lock string
State string
Socket string
Log string
RawLog string
}
var daemonHeadless bool
var daemonRefreshProfile bool
const (
serverHealthAttempts = 120
serverHealthInterval = 500 * time.Millisecond
)
var daemonCmd = &cobra.Command{
Use: "daemon",
Short: "Run the browseros-dogfood background daemon",
Hidden: true,
RunE: runDaemon,
}
func init() {
daemonCmd.Flags().BoolVar(&daemonHeadless, "headless", false, "Run BrowserOS headless")
daemonCmd.Flags().BoolVar(&daemonRefreshProfile, "refresh-profile", false, "Refresh copied BrowserOS profile before launch")
rootCmd.AddCommand(daemonCmd)
}
func newRunPaths(configPath string) runPaths {
dir := filepath.Dir(configPath)
return runPaths{
Dir: dir,
Lock: filepath.Join(dir, "run.lock"),
State: filepath.Join(dir, "state.json"),
Socket: filepath.Join(dir, "daemon.sock"),
Log: filepath.Join(dir, "daemon.jsonl"),
RawLog: filepath.Join(dir, "daemon.log"),
}
}
func defaultRunPaths() (runPaths, error) {
path, err := config.Path()
if err != nil {
return runPaths{}, err
}
return newRunPaths(path), nil
}
func daemonArgs(headless bool) []string {
args := []string{"daemon"}
if headless {
args = append(args, "--headless")
}
return args
}
func daemonArgsWithOptions(headless bool, refreshProfile bool) []string {
args := daemonArgs(headless)
if refreshProfile {
args = append(args, "--refresh-profile")
}
return args
}
func acquireRunLock(paths runPaths, mode string) (*dogfoodruntime.Lock, error) {
lock, err := dogfoodruntime.AcquireLock(paths.Lock)
if err != nil {
if errors.Is(err, dogfoodruntime.ErrAlreadyRunning) {
return nil, runningError(paths)
}
return nil, err
}
if err := dogfoodruntime.CleanupStaleRunFiles(paths.State); err != nil {
lock.Close()
return nil, err
}
socketPath := ""
logPath := ""
if mode == "background" {
socketPath = paths.Socket
logPath = paths.Log
}
if err := dogfoodruntime.WriteRunState(paths.State, dogfoodruntime.RunState{
PID: os.Getpid(),
Mode: mode,
StartedAt: time.Now(),
SocketPath: socketPath,
LogPath: logPath,
}); err != nil {
lock.Close()
return nil, err
}
return lock, nil
}
func runningError(paths runPaths) error {
state, err := dogfoodruntime.ReadRunState(paths.State)
if err == nil {
if state.Mode == "background" {
return fmt.Errorf("browseros-dogfood background daemon is already running (pid %d)", state.PID)
}
return fmt.Errorf("browseros-dogfood is already running in foreground mode (pid %d)", state.PID)
}
return fmt.Errorf("browseros-dogfood is already running")
}
func startBackgroundProcess(paths runPaths, headless bool, refreshProfile bool) error {
exe, err := os.Executable()
if err != nil {
return err
}
if resolved, err := filepath.EvalSymlinks(exe); err == nil {
exe = resolved
}
if err := os.MkdirAll(paths.Dir, 0755); err != nil {
return err
}
rawLog, err := os.OpenFile(paths.RawLog, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
defer rawLog.Close()
cmd := exec.Command(exe, daemonArgsWithOptions(headless, refreshProfile)...)
cmd.Stdout = rawLog
cmd.Stderr = rawLog
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil {
return err
}
done := make(chan error, 1)
go func() { done <- cmd.Wait() }()
deadline := time.After(5 * time.Second)
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case err := <-done:
if err != nil {
return fmt.Errorf("background daemon exited during startup: %w; see %s", err, paths.RawLog)
}
return fmt.Errorf("background daemon exited during startup; see %s", paths.RawLog)
case <-deadline:
return fmt.Errorf("background daemon did not open its control socket; see %s", paths.RawLog)
case <-tick.C:
if resp, err := ipc.NewClient(paths.Socket).Send(ipc.Request{Command: ipc.CmdStatus}); err == nil && resp.OK {
fmt.Printf("%s browseros-dogfood background daemon %s\n", successStyle.Sprint("Started:"), dimStyle.Sprintf("(pid %d)", cmd.Process.Pid))
fmt.Fprintln(os.Stdout, dimStyle.Sprint("Streaming startup logs until healthy..."))
detach, cleanup := newInterruptDetach()
defer cleanup()
detached := false
if err := monitorDaemonUntilRunning(context.Background(), daemonMonitor{
Paths: paths,
Out: os.Stdout,
FromStart: true,
Detach: detach,
Detached: &detached,
}); err != nil {
return err
}
if detached {
return nil
}
fmt.Printf("%s browseros-dogfood background environment is healthy\n", successStyle.Sprint("Ready:"))
fmt.Printf(" %s %s\n", labelStyle.Sprint("Status:"), commandStyle.Sprint("browseros-dogfood status"))
fmt.Printf(" %s %s\n", labelStyle.Sprint("Logs:"), commandStyle.Sprint("browseros-dogfood logs tail"))
fmt.Printf(" %s %s\n", labelStyle.Sprint("Stop:"), commandStyle.Sprint("browseros-dogfood stop"))
return nil
}
}
}
}
func newInterruptDetach() (<-chan struct{}, func()) {
sigCh := make(chan os.Signal, 1)
done := make(chan struct{})
detach := make(chan struct{})
var detachOnce sync.Once
signal.Notify(sigCh, os.Interrupt)
go func() {
select {
case <-sigCh:
detachOnce.Do(func() { close(detach) })
case <-done:
}
}()
return detach, func() {
signal.Stop(sigCh)
close(done)
}
}
type dogfoodDaemon struct {
ctx context.Context
cancel context.CancelFunc
paths runPaths
logWriter *runlog.Writer
opMu sync.Mutex
mu sync.RWMutex
env *environment
state string
operation string
lastError string
ports config.Ports
startedAt time.Time
headless bool
}
type daemonStatus struct {
State string `json:"state"`
Operation string `json:"operation,omitempty"`
LastError string `json:"last_error,omitempty"`
PID int `json:"pid"`
Uptime string `json:"uptime"`
Ports config.Ports `json:"ports"`
LogPath string `json:"log_path"`
}
type healthResponse struct {
CDPConnected *bool `json:"cdpConnected"`
}
func runDaemon(cmd *cobra.Command, args []string) error {
cfg, err := loadConfig()
if err != nil {
return err
}
paths, err := defaultRunPaths()
if err != nil {
return err
}
lock, err := acquireRunLock(paths, "background")
if err != nil {
return err
}
defer lock.Close()
defer dogfoodruntime.CleanupStaleRunFiles(paths.State)
if err := os.Remove(paths.Log); err != nil && !os.IsNotExist(err) {
return err
}
logWriter, err := runlog.NewWriter(paths.Log)
if err != nil {
return err
}
defer logWriter.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := &dogfoodDaemon{
ctx: ctx,
cancel: cancel,
paths: paths,
logWriter: logWriter,
state: "starting",
startedAt: time.Now(),
headless: daemonHeadless,
ports: cfg.Ports,
}
server := ipc.NewServer(paths.Socket, d)
if err := server.Start(); err != nil {
return err
}
defer server.Stop()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
select {
case <-sigCh:
cancel()
case <-ctx.Done():
}
}()
if err := d.startLockedOperation("starting", daemonRefreshProfile); err != nil {
proc.LogMsg(proc.TagInfo, proc.ErrorColor.Sprintf("Startup failed: %v", err))
}
<-ctx.Done()
d.stopEnvironment()
return nil
}
func (d *dogfoodDaemon) Handle(req ipc.Request) ipc.Response {
switch req.Command {
case ipc.CmdStatus:
return ipc.Response{OK: true, Data: d.status()}
case ipc.CmdStop:
go func() {
time.Sleep(100 * time.Millisecond)
d.cancel()
}()
return ipc.Response{OK: true, Data: map[string]string{"state": "stopping"}}
case ipc.CmdRestart:
pull := req.Args["pull"] == "true"
force := req.Args["force"] == "true"
if err := d.scheduleRestart(pull, force); err != nil {
return ipc.Response{Error: err.Error()}
}
return ipc.Response{OK: true, Data: map[string]string{"state": "restarting"}}
default:
return ipc.Response{Error: fmt.Sprintf("unknown command: %s", req.Command)}
}
}
func (d *dogfoodDaemon) status() daemonStatus {
d.mu.RLock()
defer d.mu.RUnlock()
return daemonStatus{
State: d.state,
Operation: d.operation,
LastError: d.lastError,
PID: os.Getpid(),
Uptime: time.Since(d.startedAt).Round(time.Second).String(),
Ports: d.ports,
LogPath: d.paths.Log,
}
}
func (d *dogfoodDaemon) scheduleRestart(pull bool, force bool) error {
if force && !pull {
return fmt.Errorf("--force requires --pull")
}
if pull && !force {
cfg, err := loadConfig()
if err != nil {
return err
}
runner := pipeline.ExecRunner{}
dirty, err := pipeline.Dirty(cfg.RepoPath, runner)
if err != nil {
return err
}
if dirty {
return fmt.Errorf("checkout has uncommitted changes; commit or stash them, or use --force to reset to upstream")
}
}
return d.scheduleOperation("restarting", func() error {
if pull {
cfg, err := loadConfig()
if err != nil {
return err
}
runner := pipeline.ExecRunner{}
if force {
if err := pipeline.Fetch(d.ctx, cfg.RepoPath, runner); err != nil {
return err
}
if err := pipeline.ResetHardToUpstream(d.ctx, cfg.RepoPath, runner); err != nil {
return err
}
} else if err := pipeline.Pull(d.ctx, cfg.RepoPath, runner); err != nil {
return err
}
}
return d.startLocked(false)
})
}
func (d *dogfoodDaemon) startLockedOperation(name string, refreshProfile bool) error {
return d.withOperation(name, func() error {
return d.startLocked(refreshProfile)
})
}
func (d *dogfoodDaemon) withOperation(name string, fn func() error) error {
if !d.opMu.TryLock() {
return fmt.Errorf("daemon is already %s", d.currentOperation())
}
defer d.opMu.Unlock()
d.setState(name, name, "")
err := fn()
if err != nil {
d.logLifecycle("%s failed: %v", name, err)
d.setState("error", "", err.Error())
return err
}
d.setState("running", "", "")
return nil
}
func (d *dogfoodDaemon) scheduleOperation(name string, fn func() error) error {
if !d.opMu.TryLock() {
return fmt.Errorf("daemon is already %s", d.currentOperation())
}
d.setState(name, name, "")
go func() {
defer d.opMu.Unlock()
err := fn()
if err != nil {
d.logLifecycle("%s failed: %v", name, err)
d.setState("error", "", err.Error())
return
}
d.setState("running", "", "")
}()
return nil
}
func (d *dogfoodDaemon) startLocked(refreshProfile bool) error {
cfg, err := loadConfig()
if err != nil {
return err
}
d.stopEnvironment()
opts := environmentOptions{
RefreshProfile: refreshProfile,
Headless: d.headless,
RestartBrowser: true,
Runner: pipeline.ExecRunner{},
Progress: func(message string) {
d.logLifecycle("%s", message)
},
LineHandler: func(tag proc.Tag, stream string, line string) {
_ = d.logWriter.Append(tag.Name, stream, line)
},
}
env, err := buildAndStartEnvironment(d.ctx, cfg, opts)
if err != nil {
return err
}
d.mu.Lock()
d.env = env
d.ports = env.cfg.Ports
d.mu.Unlock()
if err := d.waitUntilHealthy(env.cfg, serverHealthAttempts, serverHealthInterval); err != nil {
return err
}
return nil
}
func (d *dogfoodDaemon) stopEnvironment() {
d.mu.Lock()
env := d.env
d.env = nil
d.mu.Unlock()
if env == nil {
return
}
env.Stop()
done := make(chan struct{})
go func() {
env.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
env.ForceKill()
}
}
func (d *dogfoodDaemon) setState(state string, operation string, lastError string) {
d.mu.Lock()
defer d.mu.Unlock()
d.state = state
d.operation = operation
d.lastError = lastError
}
func (d *dogfoodDaemon) currentOperation() string {
d.mu.RLock()
defer d.mu.RUnlock()
if d.operation == "" {
return "busy"
}
return d.operation
}
func (d *dogfoodDaemon) logLifecycle(format string, args ...any) {
if d == nil || d.logWriter == nil {
return
}
_ = d.logWriter.Append("daemon", "lifecycle", fmt.Sprintf(format, args...))
}
func (d *dogfoodDaemon) waitUntilHealthy(cfg config.Config, maxAttempts int, interval time.Duration) error {
d.logLifecycle("waiting for server health")
if err := waitForServerHealth(d.ctx, cfg.Ports.Server, maxAttempts, interval); err != nil {
return err
}
d.logLifecycle("server healthy")
return nil
}
func waitForServerHealth(ctx context.Context, port int, maxAttempts int, interval time.Duration) error {
client := &http.Client{Timeout: time.Second}
url := fmt.Sprintf("http://127.0.0.1:%d/health", port)
var lastErr error
for range maxAttempts {
if ctx.Err() != nil {
return ctx.Err()
}
resp, err := client.Get(url)
if err == nil {
var health healthResponse
decodeErr := json.NewDecoder(resp.Body).Decode(&health)
resp.Body.Close()
if resp.StatusCode == http.StatusOK && decodeErr == nil && (health.CDPConnected == nil || *health.CDPConnected) {
return nil
}
if decodeErr != nil {
lastErr = decodeErr
} else {
lastErr = fmt.Errorf("health endpoint not ready")
}
} else {
lastErr = err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(interval):
}
}
if lastErr != nil {
return fmt.Errorf("server health check failed: %w", lastErr)
}
return fmt.Errorf("server health check failed")
}

View File

@@ -0,0 +1,174 @@
package cmd
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"path/filepath"
"reflect"
"testing"
"time"
"browseros-dogfood/config"
"browseros-dogfood/runlog"
)
func TestRunPathsLiveBesideConfig(t *testing.T) {
paths := newRunPaths(filepath.Join("/tmp", "browseros-dogfood", "config.yaml"))
if paths.Lock != filepath.Join("/tmp", "browseros-dogfood", "run.lock") {
t.Fatalf("lock path got %q", paths.Lock)
}
if paths.Socket != filepath.Join("/tmp", "browseros-dogfood", "daemon.sock") {
t.Fatalf("socket path got %q", paths.Socket)
}
if paths.Log != filepath.Join("/tmp", "browseros-dogfood", "daemon.jsonl") {
t.Fatalf("log path got %q", paths.Log)
}
}
func TestDaemonArgsIncludeHeadlessWhenRequested(t *testing.T) {
got := daemonArgs(true)
want := []string{"daemon", "--headless"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %#v want %#v", got, want)
}
}
func TestLogLifecycleWritesDaemonRunlogEntry(t *testing.T) {
path := filepath.Join(t.TempDir(), "daemon.jsonl")
writer, err := runlog.NewWriter(path)
if err != nil {
t.Fatal(err)
}
d := &dogfoodDaemon{logWriter: writer}
d.logLifecycle("building agent")
if err := writer.Close(); err != nil {
t.Fatal(err)
}
entries, err := runlog.ReadLast(path, 10, "daemon")
if err != nil {
t.Fatal(err)
}
if len(entries) != 1 {
t.Fatalf("entries len got %d want 1", len(entries))
}
if entries[0].Line != "building agent" {
t.Fatalf("entry line got %q", entries[0].Line)
}
}
func TestWaitForServerHealthRequiresCDPConnectedWhenPresent(t *testing.T) {
var requests int
port, shutdown := startHealthTestServer(t, func(w http.ResponseWriter, r *http.Request) {
requests++
if requests == 1 {
fmt.Fprint(w, `{"status":"ok","cdpConnected":false}`)
return
}
fmt.Fprint(w, `{"status":"ok","cdpConnected":true}`)
})
defer shutdown()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := waitForServerHealth(ctx, port, 5, 10*time.Millisecond); err != nil {
t.Fatalf("wait health: %v", err)
}
if requests < 2 {
t.Fatalf("requests got %d want at least 2", requests)
}
}
func TestWaitForServerHealthAcceptsMissingCDPConnected(t *testing.T) {
port, shutdown := startHealthTestServer(t, func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, `{"status":"ok"}`)
})
defer shutdown()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := waitForServerHealth(ctx, port, 1, 10*time.Millisecond); err != nil {
t.Fatalf("wait health: %v", err)
}
}
func TestWaitUntilHealthyLogsHealthLifecycle(t *testing.T) {
port, shutdown := startHealthTestServer(t, func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, `{"status":"ok","cdpConnected":true}`)
})
defer shutdown()
path := filepath.Join(t.TempDir(), "daemon.jsonl")
writer, err := runlog.NewWriter(path)
if err != nil {
t.Fatal(err)
}
d := &dogfoodDaemon{
ctx: context.Background(),
logWriter: writer,
}
if err := d.waitUntilHealthy(config.Config{Ports: config.Ports{Server: port}}, 1, 10*time.Millisecond); err != nil {
t.Fatalf("wait healthy: %v", err)
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
entries, err := runlog.ReadLast(path, 10, "daemon")
if err != nil {
t.Fatal(err)
}
got := make([]string, 0, len(entries))
for _, entry := range entries {
got = append(got, entry.Line)
}
want := []string{"waiting for server health", "server healthy"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("lifecycle entries got %#v want %#v", got, want)
}
}
func TestWithOperationLogsFailureLifecycle(t *testing.T) {
path := filepath.Join(t.TempDir(), "daemon.jsonl")
writer, err := runlog.NewWriter(path)
if err != nil {
t.Fatal(err)
}
d := &dogfoodDaemon{logWriter: writer}
err = d.withOperation("starting", func() error {
return errors.New("boom")
})
if err == nil {
t.Fatal("expected operation error")
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
entries, err := runlog.ReadLast(path, 10, "daemon")
if err != nil {
t.Fatal(err)
}
if len(entries) != 1 || entries[0].Line != "starting failed: boom" {
t.Fatalf("entries got %#v", entries)
}
}
func startHealthTestServer(t *testing.T, handler http.HandlerFunc) (int, func()) {
t.Helper()
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
server := &http.Server{Handler: handler}
go func() {
_ = server.Serve(listener)
}()
return listener.Addr().(*net.TCPAddr).Port, func() {
_ = server.Close()
}
}

View File

@@ -3,6 +3,7 @@ package cmd
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
@@ -51,11 +52,18 @@ var initCmd = &cobra.Command{
if err := pipeline.WriteProductionEnvFiles(cfg.AgentRoot(), cfg); err != nil {
return err
}
fmt.Printf("%s %s\n%s %s\n", successStyle.Sprint("Config written:"), pathStyle.Sprint(path), labelStyle.Sprint("Run:"), commandStyle.Sprint("browseros-dogfood start"))
printInitNextSteps(cmd.OutOrStdout(), path)
return nil
},
}
func printInitNextSteps(out io.Writer, path string) {
fmt.Fprintf(out, "%s %s\n", successStyle.Sprint("Config written:"), pathStyle.Sprint(path))
fmt.Fprintln(out, labelStyle.Sprint("Start BrowserOS dogfood:"))
fmt.Fprintf(out, " %s %s\n", labelStyle.Sprint("Inline:"), commandStyle.Sprint("browseros-dogfood start"))
fmt.Fprintf(out, " %s %s\n", labelStyle.Sprint("Background:"), commandStyle.Sprint("browseros-dogfood start-background"))
}
func prompt(r *bufio.Reader, label string, current string) string {
fmt.Printf("%s [%s]: ", labelStyle.Sprint(label), pathStyle.Sprint(current))
line, _ := r.ReadString('\n')

View File

@@ -0,0 +1,23 @@
package cmd
import (
"bytes"
"strings"
"testing"
)
func TestPrintInitNextStepsShowsInlineAndBackgroundStart(t *testing.T) {
var out bytes.Buffer
printInitNextSteps(&out, "/tmp/config.yaml")
got := out.String()
for _, want := range []string{
"Config written: /tmp/config.yaml",
"Inline: browseros-dogfood start",
"Background: browseros-dogfood start-background",
} {
if !strings.Contains(got, want) {
t.Fatalf("missing %q in\n%s", want, got)
}
}
}

View File

@@ -38,7 +38,7 @@ var pullCmd = &cobra.Command{
if dirty && !pullForce {
return fmt.Errorf("checkout has uncommitted changes; commit/stash them or use --force")
}
if err := pipeline.Pull(cfg.RepoPath, runner); err != nil {
if err := pipeline.Pull(cmd.Context(), cfg.RepoPath, runner); err != nil {
return err
}
newHead, _ := pipeline.Head(cfg.RepoPath, runner)

View File

@@ -2,6 +2,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
@@ -15,12 +16,15 @@ import (
"browseros-dogfood/pipeline"
"browseros-dogfood/proc"
"browseros-dogfood/profile"
dogfoodruntime "browseros-dogfood/runtime"
"github.com/spf13/cobra"
)
var startRefreshProfile bool
var startHeadless bool
var startBackgroundRefreshProfile bool
var startBackgroundHeadless bool
const (
serverLogName = "server.log"
@@ -30,7 +34,10 @@ const (
func init() {
startCmd.Flags().BoolVar(&startRefreshProfile, "refresh-profile", false, "Refresh copied BrowserOS profile before launch")
startCmd.Flags().BoolVar(&startHeadless, "headless", false, "Run BrowserOS headless")
startBackgroundCmd.Flags().BoolVar(&startBackgroundRefreshProfile, "refresh-profile", false, "Refresh copied BrowserOS profile before launch")
startBackgroundCmd.Flags().BoolVar(&startBackgroundHeadless, "headless", false, "Run BrowserOS headless")
rootCmd.AddCommand(startCmd)
rootCmd.AddCommand(startBackgroundCmd)
}
var startCmd = &cobra.Command{
@@ -42,98 +49,76 @@ var startCmd = &cobra.Command{
if err != nil {
return err
}
agentRoot := cfg.AgentRoot()
runner := pipeline.ExecRunner{}
if dirty, err := pipeline.Dirty(cfg.RepoPath, runner); err == nil && dirty {
fmt.Fprintln(os.Stderr, warnStyle.Sprint("warning: checkout has uncommitted changes; start will use current files"))
}
if startRefreshProfile || !exists(cfg.DevUserDataDir) {
if err := profile.Import(profile.ImportConfig{
SourceUserDataDir: cfg.SourceUserDataDir,
SourceProfileDir: cfg.SourceProfileDir,
DevUserDataDir: cfg.DevUserDataDir,
DevProfileDir: cfg.DevProfileDir,
}); err != nil {
return err
}
} else if err := profile.CleanupSingletons(cfg.DevUserDataDir); err != nil {
return err
}
if err := pipeline.WriteProductionEnvFiles(agentRoot, cfg); err != nil {
return err
}
resolvedPorts, changed, err := proc.ResolvePorts(cfg.Ports)
paths, err := defaultRunPaths()
if err != nil {
return err
}
cfg.Ports = resolvedPorts
if changed {
path, err := config.Path()
if err != nil {
return err
}
if err := config.Save(path, cfg); err != nil {
return err
}
proc.LogMsgf(proc.TagInfo, "Busy ports detected; using CDP=%d Server=%d Extension=%d", cfg.Ports.CDP, cfg.Ports.Server, cfg.Ports.Extension)
} else {
proc.LogMsgf(proc.TagInfo, "Using ports CDP=%d Server=%d Extension=%d", cfg.Ports.CDP, cfg.Ports.Server, cfg.Ports.Extension)
}
if err := pipeline.Build(agentRoot, runner); err != nil {
lock, err := acquireRunLock(paths, "foreground")
if err != nil {
return err
}
return runEnvironment(cfg, agentRoot)
defer lock.Close()
defer dogfoodruntime.CleanupStaleRunFiles(paths.State)
return runEnvironment(cfg, environmentOptions{
RefreshProfile: startRefreshProfile,
Headless: startHeadless,
RestartBrowser: false,
Runner: pipeline.ExecRunner{},
})
},
}
func runEnvironment(cfg config.Config, agentRoot string) error {
var startBackgroundCmd = &cobra.Command{
Use: "start-background",
Short: "Start BrowserOS dogfooding environment in the background",
GroupID: groupRun,
RunE: func(cmd *cobra.Command, args []string) error {
if _, err := loadConfig(); err != nil {
return err
}
paths, err := defaultRunPaths()
if err != nil {
return err
}
if lock, err := dogfoodruntime.AcquireLock(paths.Lock); err == nil {
_ = lock.Close()
if err := dogfoodruntime.CleanupStaleRunFiles(paths.State); err != nil {
return err
}
} else if errors.Is(err, dogfoodruntime.ErrAlreadyRunning) {
return runningError(paths)
} else {
return err
}
return startBackgroundProcess(paths, startBackgroundHeadless, startBackgroundRefreshProfile)
},
}
type environmentOptions struct {
RefreshProfile bool
Headless bool
RestartBrowser bool
LineHandler proc.LineHandler
Progress func(string)
Runner pipeline.Runner
}
type environment struct {
cancel context.CancelFunc
wg sync.WaitGroup
managed []*proc.ManagedProc
cfg config.Config
}
func runEnvironment(cfg config.Config, opts environmentOptions) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := os.MkdirAll(cfg.LogDir(), 0755); err != nil {
env, err := buildAndStartEnvironment(ctx, cfg, opts)
if err != nil {
return err
}
var wg sync.WaitGroup
var managed []*proc.ManagedProc
managed = append(managed, proc.StartManaged(ctx, &wg, proc.ProcConfig{
Tag: proc.TagBrowser,
Dir: agentRoot,
Restart: false,
LogPath: cfg.LogPath(chromiumLogName),
Cmd: browser.BuildArgs(browser.ArgsConfig{
Binary: cfg.BrowserOSAppPath,
AgentRoot: agentRoot,
UserDataDir: cfg.DevUserDataDir,
ProfileDir: cfg.DevProfileDir,
Ports: cfg.Ports,
Headless: startHeadless,
}),
}))
proc.LogMsg(proc.TagServer, "Waiting for CDP...")
if browser.WaitForCDP(ctx, cfg.Ports.CDP, 60) {
proc.LogMsg(proc.TagServer, "CDP ready")
} else {
proc.LogMsg(proc.TagServer, proc.WarnColor.Sprint("CDP not available, starting server anyway"))
}
env := os.Environ()
env = append(env,
"NODE_ENV=development",
fmt.Sprintf("BROWSEROS_CDP_PORT=%d", cfg.Ports.CDP),
fmt.Sprintf("BROWSEROS_SERVER_PORT=%d", cfg.Ports.Server),
fmt.Sprintf("BROWSEROS_EXTENSION_PORT=%d", cfg.Ports.Extension),
fmt.Sprintf("VITE_BROWSEROS_SERVER_PORT=%d", cfg.Ports.Server),
)
serverDir := filepath.Join(agentRoot, "apps/server")
managed = append(managed, proc.StartManaged(ctx, &wg, proc.ProcConfig{
Tag: proc.TagServer,
Dir: serverDir,
Env: env,
Restart: true,
LogPath: cfg.LogPath(serverLogName),
Cmd: serverCommand(),
}))
printSummary(cfg, agentRoot)
defer env.Stop()
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
@@ -143,32 +128,165 @@ func runEnvironment(cfg config.Config, agentRoot string) error {
cancel()
done := make(chan struct{})
go func() {
wg.Wait()
env.Wait()
close(done)
}()
go func() {
select {
case <-sigCh:
for _, p := range managed {
p.ForceKill()
}
env.ForceKill()
os.Exit(1)
case <-done:
}
}()
for _, p := range managed {
p.Stop()
}
env.Stop()
select {
case <-done:
case <-time.After(10 * time.Second):
for _, p := range managed {
p.ForceKill()
}
env.ForceKill()
}
return nil
}
func buildAndStartEnvironment(ctx context.Context, cfg config.Config, opts environmentOptions) (*environment, error) {
if opts.Runner == nil {
opts.Runner = pipeline.ExecRunner{}
}
agentRoot := cfg.AgentRoot()
reportProgress(opts, "checking repo")
if dirty, err := pipeline.Dirty(cfg.RepoPath, opts.Runner); err == nil && dirty {
fmt.Fprintln(os.Stderr, warnStyle.Sprint("warning: checkout has uncommitted changes; start will use current files"))
}
reportProgress(opts, "preparing profile")
if err := prepareEnvironment(&cfg, agentRoot, opts); err != nil {
return nil, err
}
reportProgress(opts, "building agent")
if err := pipeline.Build(ctx, agentRoot, opts.Runner); err != nil {
return nil, err
}
return startEnvironment(ctx, cfg, agentRoot, opts)
}
func prepareEnvironment(cfg *config.Config, agentRoot string, opts environmentOptions) error {
if opts.RefreshProfile || !exists(cfg.DevUserDataDir) {
if err := profile.Import(profile.ImportConfig{
SourceUserDataDir: cfg.SourceUserDataDir,
SourceProfileDir: cfg.SourceProfileDir,
DevUserDataDir: cfg.DevUserDataDir,
DevProfileDir: cfg.DevProfileDir,
}); err != nil {
return err
}
} else if err := profile.CleanupSingletons(cfg.DevUserDataDir); err != nil {
return err
}
if err := pipeline.WriteProductionEnvFiles(agentRoot, *cfg); err != nil {
return err
}
resolvedPorts, changed, err := proc.ResolvePorts(cfg.Ports)
if err != nil {
return err
}
cfg.Ports = resolvedPorts
if changed {
path, err := config.Path()
if err != nil {
return err
}
if err := config.Save(path, *cfg); err != nil {
return err
}
proc.LogMsgf(proc.TagInfo, "Busy ports detected; using CDP=%d Server=%d Extension=%d", cfg.Ports.CDP, cfg.Ports.Server, cfg.Ports.Extension)
} else {
proc.LogMsgf(proc.TagInfo, "Using ports CDP=%d Server=%d Extension=%d", cfg.Ports.CDP, cfg.Ports.Server, cfg.Ports.Extension)
}
return nil
}
func reportProgress(opts environmentOptions, message string) {
if opts.Progress != nil {
opts.Progress(message)
}
}
func startEnvironment(parent context.Context, cfg config.Config, agentRoot string, opts environmentOptions) (*environment, error) {
ctx, cancel := context.WithCancel(parent)
e := &environment{cancel: cancel, cfg: cfg}
reportProgress(opts, "launching Chromium")
e.managed = append(e.managed, proc.StartManaged(ctx, &e.wg, proc.ProcConfig{
Tag: proc.TagBrowser,
Dir: agentRoot,
Restart: opts.RestartBrowser,
LogPath: cfg.LogPath(chromiumLogName),
Cmd: browser.BuildArgs(browser.ArgsConfig{
Binary: cfg.BrowserOSAppPath,
AgentRoot: agentRoot,
UserDataDir: cfg.DevUserDataDir,
ProfileDir: cfg.DevProfileDir,
Ports: cfg.Ports,
Headless: opts.Headless,
}),
LineHandler: opts.LineHandler,
}))
reportProgress(opts, "waiting for CDP")
proc.LogMsg(proc.TagServer, "Waiting for CDP...")
if browser.WaitForCDP(ctx, cfg.Ports.CDP, 60) {
reportProgress(opts, "CDP ready")
proc.LogMsg(proc.TagServer, "CDP ready")
} else {
reportProgress(opts, "CDP not available, starting server anyway")
proc.LogMsg(proc.TagServer, proc.WarnColor.Sprint("CDP not available, starting server anyway"))
}
runtimeEnv := os.Environ()
runtimeEnv = append(runtimeEnv,
"NODE_ENV=development",
fmt.Sprintf("BROWSEROS_CDP_PORT=%d", cfg.Ports.CDP),
fmt.Sprintf("BROWSEROS_SERVER_PORT=%d", cfg.Ports.Server),
fmt.Sprintf("BROWSEROS_EXTENSION_PORT=%d", cfg.Ports.Extension),
fmt.Sprintf("VITE_BROWSEROS_SERVER_PORT=%d", cfg.Ports.Server),
)
serverDir := filepath.Join(agentRoot, "apps/server")
reportProgress(opts, "starting server")
e.managed = append(e.managed, proc.StartManaged(ctx, &e.wg, proc.ProcConfig{
Tag: proc.TagServer,
Dir: serverDir,
Env: runtimeEnv,
Restart: true,
LogPath: cfg.LogPath(serverLogName),
Cmd: serverCommand(),
LineHandler: opts.LineHandler,
}))
printSummary(cfg, agentRoot)
return e, nil
}
func (e *environment) Stop() {
if e == nil {
return
}
e.cancel()
for _, p := range e.managed {
p.Stop()
}
}
func (e *environment) Wait() {
if e == nil {
return
}
e.wg.Wait()
}
func (e *environment) ForceKill() {
if e == nil {
return
}
for _, p := range e.managed {
p.ForceKill()
}
}
func serverCommand() []string {
return []string{"bun", "--env-file=.env.development", "src/index.ts"}
}

View File

@@ -12,3 +12,17 @@ func TestServerCommandDoesNotWatchFiles(t *testing.T) {
t.Fatalf("server command got %#v want %#v", got, want)
}
}
func TestReportProgressInvokesConfiguredProgress(t *testing.T) {
var got []string
reportProgress(environmentOptions{
Progress: func(message string) {
got = append(got, message)
},
}, "checking repo")
want := []string{"checking repo"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("progress got %#v want %#v", got, want)
}
}

View File

@@ -0,0 +1,168 @@
package ipc
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"time"
)
const (
CmdStatus = "status"
CmdStop = "stop"
CmdRestart = "restart"
serverReadTimeout = 5 * time.Second
defaultResponseTimeout = 15 * time.Minute
)
var ErrDaemonNotRunning = errors.New("browseros-dogfood background daemon is not running")
type Request struct {
Command string `json:"command"`
Args map[string]string `json:"args,omitempty"`
}
type Response struct {
OK bool `json:"ok"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
type Handler interface {
Handle(Request) Response
}
type HandlerFunc func(Request) Response
func (f HandlerFunc) Handle(req Request) Response {
return f(req)
}
type Server struct {
socketPath string
handler Handler
listener net.Listener
}
func NewServer(socketPath string, handler Handler) *Server {
return &Server{socketPath: socketPath, handler: handler}
}
func (s *Server) Start() error {
if err := os.MkdirAll(filepath.Dir(s.socketPath), 0755); err != nil {
return err
}
if _, err := os.Stat(s.socketPath); err == nil {
conn, dialErr := net.DialTimeout("unix", s.socketPath, 300*time.Millisecond)
if dialErr == nil {
conn.Close()
return fmt.Errorf("daemon socket is already active: %s", s.socketPath)
}
if err := os.Remove(s.socketPath); err != nil {
return err
}
}
listener, err := net.Listen("unix", s.socketPath)
if err != nil {
return err
}
s.listener = listener
if err := os.Chmod(s.socketPath, 0600); err != nil {
listener.Close()
return err
}
go s.accept()
return nil
}
func (s *Server) Stop() {
if s.listener != nil {
s.listener.Close()
}
os.Remove(s.socketPath)
}
func (s *Server) accept() {
for {
conn, err := s.listener.Accept()
if err != nil {
return
}
go s.handle(conn)
}
}
func (s *Server) handle(conn net.Conn) {
defer conn.Close()
_ = conn.SetReadDeadline(time.Now().Add(serverReadTimeout))
scanner := bufio.NewScanner(conn)
if !scanner.Scan() {
return
}
_ = conn.SetReadDeadline(time.Time{})
var req Request
if err := json.Unmarshal(scanner.Bytes(), &req); err != nil {
writeResponse(conn, Response{Error: "invalid request"})
return
}
writeResponse(conn, s.handler.Handle(req))
}
type Client struct {
socketPath string
responseTimeout time.Duration
}
func NewClient(socketPath string) *Client {
return NewClientWithTimeout(socketPath, defaultResponseTimeout)
}
func NewClientWithTimeout(socketPath string, responseTimeout time.Duration) *Client {
return &Client{socketPath: socketPath, responseTimeout: responseTimeout}
}
func (c *Client) Send(req Request) (Response, error) {
conn, err := net.DialTimeout("unix", c.socketPath, 700*time.Millisecond)
if err != nil {
return Response{}, fmt.Errorf("%w; start it with `browseros-dogfood start-background`", ErrDaemonNotRunning)
}
defer conn.Close()
data, err := json.Marshal(req)
if err != nil {
return Response{}, err
}
data = append(data, '\n')
if _, err := conn.Write(data); err != nil {
return Response{}, err
}
if c.responseTimeout > 0 {
_ = conn.SetReadDeadline(time.Now().Add(c.responseTimeout))
}
scanner := bufio.NewScanner(conn)
if !scanner.Scan() {
if err := scanner.Err(); err != nil {
return Response{}, err
}
return Response{}, errors.New("daemon closed connection without a response")
}
var resp Response
if err := json.Unmarshal(scanner.Bytes(), &resp); err != nil {
return Response{}, err
}
return resp, nil
}
func writeResponse(conn net.Conn, resp Response) {
data, err := json.Marshal(resp)
if err != nil {
data, _ = json.Marshal(Response{Error: "internal response error"})
}
data = append(data, '\n')
_, _ = conn.Write(data)
}

View File

@@ -0,0 +1,63 @@
package ipc
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestServerHandlesRequest(t *testing.T) {
socketPath := filepath.Join(t.TempDir(), "daemon.sock")
server := NewServer(socketPath, HandlerFunc(func(req Request) Response {
if req.Command != CmdStatus {
return Response{Error: "wrong command"}
}
return Response{OK: true, Data: map[string]string{"state": "running"}}
}))
if err := server.Start(); err != nil {
t.Fatalf("start server: %v", err)
}
defer server.Stop()
resp, err := NewClient(socketPath).Send(Request{Command: CmdStatus})
if err != nil {
t.Fatalf("send: %v", err)
}
if !resp.OK {
t.Fatalf("response got %#v", resp)
}
data := resp.Data.(map[string]any)
if data["state"] != "running" {
t.Fatalf("data got %#v", data)
}
}
func TestClientReportsMissingDaemon(t *testing.T) {
_, err := NewClient(filepath.Join(t.TempDir(), "missing.sock")).Send(Request{Command: CmdStatus})
if err == nil {
t.Fatal("expected missing daemon error")
}
}
func TestClientSendTimesOutWhenDaemonDoesNotRespond(t *testing.T) {
dir, err := os.MkdirTemp("", "ipc")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = os.RemoveAll(dir) })
socketPath := filepath.Join(dir, "daemon.sock")
server := NewServer(socketPath, HandlerFunc(func(req Request) Response {
time.Sleep(50 * time.Millisecond)
return Response{OK: true}
}))
if err := server.Start(); err != nil {
t.Fatalf("start server: %v", err)
}
defer server.Stop()
_, err = NewClientWithTimeout(socketPath, 5*time.Millisecond).Send(Request{Command: CmdStatus})
if err == nil {
t.Fatal("expected response timeout")
}
}

View File

@@ -1,16 +1,18 @@
package pipeline
func Build(agentRoot string, r Runner) error {
if err := r.Run(agentRoot, "./tools/dev/setup.sh"); err != nil {
import "context"
func Build(ctx context.Context, agentRoot string, r Runner) error {
if err := r.Run(ctx, agentRoot, "./tools/dev/setup.sh"); err != nil {
return err
}
return r.Run(agentRoot, "bun", "--cwd", "apps/agent", "--env-file=.env.development", "wxt", "build", "--mode", "development")
return r.Run(ctx, agentRoot, "bun", "--cwd", "apps/agent", "--env-file=.env.development", "wxt", "build", "--mode", "development")
}
type ExecRunner struct{}
func (ExecRunner) Run(dir string, args ...string) error {
return runCommand(dir, args...)
func (ExecRunner) Run(ctx context.Context, dir string, args ...string) error {
return runCommand(ctx, dir, args...)
}
func (ExecRunner) OutputRun(dir string, args ...string) (string, error) {

View File

@@ -1,11 +1,14 @@
package pipeline
import "testing"
import (
"context"
"testing"
)
func TestBuildRunsExpectedCommands(t *testing.T) {
root := t.TempDir()
r := &FakeRunner{}
if err := Build(root, r); err != nil {
if err := Build(context.Background(), root, r); err != nil {
t.Fatal(err)
}
want := []string{

View File

@@ -1,12 +1,13 @@
package pipeline
import (
"context"
"os"
"os/exec"
)
func runCommand(dir string, args ...string) error {
cmd := exec.Command(args[0], args[1:]...)
func runCommand(ctx context.Context, dir string, args ...string) error {
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Dir = dir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

View File

@@ -1,9 +1,12 @@
package pipeline
import "strings"
import (
"context"
"strings"
)
type Runner interface {
Run(dir string, args ...string) error
Run(ctx context.Context, dir string, args ...string) error
OutputRun(dir string, args ...string) (string, error)
}
@@ -15,8 +18,16 @@ func Dirty(repoPath string, r Runner) (bool, error) {
return strings.TrimSpace(out) != "", nil
}
func Pull(repoPath string, r Runner) error {
return r.Run(repoPath, "git", "pull", "--ff-only")
func Pull(ctx context.Context, repoPath string, r Runner) error {
return r.Run(ctx, repoPath, "git", "pull", "--ff-only")
}
func Fetch(ctx context.Context, repoPath string, r Runner) error {
return r.Run(ctx, repoPath, "git", "fetch", "--prune")
}
func ResetHardToUpstream(ctx context.Context, repoPath string, r Runner) error {
return r.Run(ctx, repoPath, "git", "reset", "--hard", "@{upstream}")
}
func Head(repoPath string, r Runner) (string, error) {

View File

@@ -1,6 +1,9 @@
package pipeline
import "testing"
import (
"context"
"testing"
)
func TestDirtyStatus(t *testing.T) {
r := &FakeRunner{Output: map[string]string{
@@ -17,7 +20,7 @@ func TestDirtyStatus(t *testing.T) {
func TestPullRunsFastForwardOnly(t *testing.T) {
r := &FakeRunner{}
if err := Pull("/repo", r); err != nil {
if err := Pull(context.Background(), "/repo", r); err != nil {
t.Fatal(err)
}
if got := r.Commands[0]; got != "git pull --ff-only" {
@@ -25,12 +28,32 @@ func TestPullRunsFastForwardOnly(t *testing.T) {
}
}
func TestFetchRunsPrune(t *testing.T) {
r := &FakeRunner{}
if err := Fetch(context.Background(), "/repo", r); err != nil {
t.Fatal(err)
}
if got := r.Commands[0]; got != "git fetch --prune" {
t.Fatalf("got %q", got)
}
}
func TestResetHardToUpstream(t *testing.T) {
r := &FakeRunner{}
if err := ResetHardToUpstream(context.Background(), "/repo", r); err != nil {
t.Fatal(err)
}
if got := r.Commands[0]; got != "git reset --hard @{upstream}" {
t.Fatalf("got %q", got)
}
}
type FakeRunner struct {
Commands []string
Output map[string]string
}
func (f *FakeRunner) Run(dir string, args ...string) error {
func (f *FakeRunner) Run(ctx context.Context, dir string, args ...string) error {
f.Commands = append(f.Commands, join(args))
return nil
}

View File

@@ -29,6 +29,8 @@ type LogFile struct {
ModTime time.Time
}
type LineHandler func(tag Tag, stream string, line string)
var (
TagBuild = Tag{"build", color.New(color.FgYellow)}
TagAgent = Tag{"agent", color.New(color.FgMagenta)}
@@ -58,7 +60,11 @@ func LogMsgTee(t Tag, msg string, file io.Writer, fileMu *sync.Mutex) {
}
func StreamLines(r io.Reader, t Tag) {
streamLines(r, t, os.Stdout, nil, nil)
StreamLinesWithHandler(r, t, "", nil)
}
func StreamLinesWithHandler(r io.Reader, t Tag, stream string, handler LineHandler) {
streamLinesWithHandler(r, t, stream, os.Stdout, nil, nil, handler)
}
func OpenLogFile(logDir string, name string, now time.Time) (*os.File, string, error) {
@@ -129,12 +135,19 @@ func rotateLogIfNeeded(logPath string, now time.Time) error {
}
func streamLines(r io.Reader, t Tag, terminal io.Writer, file io.Writer, fileMu *sync.Mutex) {
streamLinesWithHandler(r, t, "", terminal, file, fileMu, nil)
}
func streamLinesWithHandler(r io.Reader, t Tag, stream string, terminal io.Writer, file io.Writer, fileMu *sync.Mutex, handler LineHandler) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
logMsg(t, line, terminal, file, fileMu)
if handler != nil {
handler(t, stream, line)
}
}
}
if err := scanner.Err(); err != nil {

View File

@@ -145,3 +145,20 @@ func TestStreamLinesLogsScannerErrors(t *testing.T) {
}
}
}
func TestStreamLinesWithHandlerSkipsEmptyLinesAndReportsStream(t *testing.T) {
var got []string
StreamLinesWithHandler(strings.NewReader("one\n\nthree\n"), TagServer, "stderr", func(tag Tag, stream string, line string) {
got = append(got, tag.Name+":"+stream+":"+line)
})
want := []string{"server:stderr:one", "server:stderr:three"}
if len(got) != len(want) {
t.Fatalf("got %#v want %#v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("entry %d got %q want %q", i, got[i], want[i])
}
}
}

View File

@@ -18,8 +18,8 @@ type ProcConfig struct {
Env []string
Restart bool
Cmd []string
BeforeStart func() error
LogPath string
LineHandler LineHandler
}
type ManagedProc struct {
@@ -69,17 +69,6 @@ func (mp *ManagedProc) run(ctx context.Context) {
return
}
if mp.Cfg.BeforeStart != nil {
if err := mp.Cfg.BeforeStart(); err != nil {
log(ErrorColor.Sprintf("Pre-start failed: %v", err))
if !mp.Cfg.Restart || ctx.Err() != nil {
return
}
time.Sleep(time.Second)
continue
}
}
log(fmt.Sprintf("Starting: %s", DimColor.Sprint(strings.Join(mp.Cfg.Cmd, " "))))
cmd := exec.Command(mp.Cfg.Cmd[0], mp.Cfg.Cmd[1:]...)
@@ -113,8 +102,14 @@ func (mp *ManagedProc) run(ctx context.Context) {
var streamWg sync.WaitGroup
streamWg.Add(2)
go func() { defer streamWg.Done(); streamLines(stdout, mp.Cfg.Tag, os.Stdout, logFile, &logMu) }()
go func() { defer streamWg.Done(); streamLines(stderr, mp.Cfg.Tag, os.Stdout, logFile, &logMu) }()
go func() {
defer streamWg.Done()
streamLinesWithHandler(stdout, mp.Cfg.Tag, "stdout", os.Stdout, logFile, &logMu, mp.Cfg.LineHandler)
}()
go func() {
defer streamWg.Done()
streamLinesWithHandler(stderr, mp.Cfg.Tag, "stderr", os.Stdout, logFile, &logMu, mp.Cfg.LineHandler)
}()
streamWg.Wait()
_ = cmd.Wait()

View File

@@ -0,0 +1,210 @@
package runlog
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
type Entry struct {
Time time.Time `json:"time"`
Tag string `json:"tag"`
Stream string `json:"stream"`
Line string `json:"line"`
}
type Writer struct {
mu sync.Mutex
file *os.File
enc *json.Encoder
}
func NewWriter(path string) (*Writer, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
return &Writer{file: file, enc: json.NewEncoder(file)}, nil
}
func (w *Writer) Append(tag string, stream string, line string) error {
if w == nil {
return nil
}
w.mu.Lock()
defer w.mu.Unlock()
return w.enc.Encode(Entry{
Time: time.Now(),
Tag: tag,
Stream: stream,
Line: line,
})
}
func (w *Writer) Close() error {
if w == nil || w.file == nil {
return nil
}
return w.file.Close()
}
func ReadLast(path string, maxLines int, filter string) ([]Entry, error) {
normalized, err := NormalizeFilter(filter)
if err != nil {
return nil, err
}
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var entries []Entry
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
entry, ok := parseLine(scanner.Text(), normalized)
if !ok {
continue
}
entries = append(entries, entry)
if maxLines > 0 && len(entries) > maxLines {
entries = entries[len(entries)-maxLines:]
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return entries, nil
}
func Follow(path string, filter string, onEntry func(Entry)) error {
return FollowWithContext(context.Background(), path, filter, onEntry)
}
func FollowWithContext(ctx context.Context, path string, filter string, onEntry func(Entry)) error {
return followWithContext(ctx, path, filter, true, onEntry)
}
func FollowFromStartWithContext(ctx context.Context, path string, filter string, onEntry func(Entry)) error {
return followWithContext(ctx, path, filter, false, onEntry)
}
func followWithContext(ctx context.Context, path string, filter string, seekEnd bool, onEntry func(Entry)) error {
normalized, err := NormalizeFilter(filter)
if err != nil {
return err
}
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
if seekEnd {
if _, err := file.Seek(0, io.SeekEnd); err != nil {
return err
}
}
reader := bufio.NewReader(file)
for {
if ctx.Err() != nil {
return nil
}
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
nextFile, replaced, reopenErr := reopenIfReplaced(path, file)
if reopenErr != nil {
return reopenErr
}
if replaced {
file.Close()
file = nextFile
reader = bufio.NewReader(file)
continue
}
select {
case <-ctx.Done():
return nil
case <-time.After(200 * time.Millisecond):
}
continue
}
return err
}
entry, ok := parseLine(strings.TrimRight(line, "\r\n"), normalized)
if ok {
onEntry(entry)
}
}
}
func reopenIfReplaced(path string, current *os.File) (*os.File, bool, error) {
currentInfo, currentErr := current.Stat()
pathInfo, pathErr := os.Stat(path)
if os.IsNotExist(pathErr) {
return nil, false, nil
}
if pathErr != nil {
return nil, false, pathErr
}
if currentErr == nil && os.SameFile(currentInfo, pathInfo) {
return nil, false, nil
}
next, err := os.Open(path)
if err != nil {
return nil, false, err
}
return next, true, nil
}
func Format(entry Entry) string {
return fmt.Sprintf("%s [%s] %s", entry.Time.Format("15:04:05"), displayTag(entry.Tag), entry.Line)
}
func NormalizeFilter(filter string) (string, error) {
switch strings.ToLower(strings.TrimSpace(filter)) {
case "", "all":
return "", nil
case "browser", "chromium":
return "browser", nil
case "server":
return "server", nil
case "daemon":
return "daemon", nil
default:
return "", fmt.Errorf("unknown log filter %q; use daemon, chromium, or server", filter)
}
}
func parseLine(line string, filter string) (Entry, bool) {
var entry Entry
if strings.TrimSpace(line) == "" {
return Entry{}, false
}
if err := json.Unmarshal([]byte(line), &entry); err != nil {
return Entry{}, false
}
if filter != "" && entry.Tag != filter {
return Entry{}, false
}
return entry, true
}
func displayTag(tag string) string {
if tag == "browser" {
return "chromium"
}
return tag
}

View File

@@ -0,0 +1,139 @@
package runlog
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func TestWriterStoresJSONLinesAndFiltersByTag(t *testing.T) {
path := filepath.Join(t.TempDir(), "daemon.jsonl")
w, err := NewWriter(path)
if err != nil {
t.Fatalf("new writer: %v", err)
}
if err := w.Append("browser", "stdout", "chromium ready"); err != nil {
t.Fatalf("append browser: %v", err)
}
if err := w.Append("server", "stderr", "server ready"); err != nil {
t.Fatalf("append server: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("close: %v", err)
}
entries, err := ReadLast(path, 10, "chromium")
if err != nil {
t.Fatalf("read last: %v", err)
}
if len(entries) != 1 {
t.Fatalf("entries len got %d want 1", len(entries))
}
if entries[0].Tag != "browser" {
t.Fatalf("tag got %q want browser", entries[0].Tag)
}
line := Format(entries[0])
if !strings.Contains(line, "[chromium] chromium ready") {
t.Fatalf("formatted line got %q", line)
}
}
func TestNormalizeFilterRejectsUnknownValues(t *testing.T) {
if got, err := NormalizeFilter("chromium"); err != nil || got != "browser" {
t.Fatalf("chromium filter got %q %v", got, err)
}
if got, err := NormalizeFilter("server"); err != nil || got != "server" {
t.Fatalf("server filter got %q %v", got, err)
}
if got, err := NormalizeFilter("daemon"); err != nil || got != "daemon" {
t.Fatalf("daemon filter got %q %v", got, err)
}
if _, err := NormalizeFilter("agent"); err == nil {
t.Fatal("expected invalid filter error")
}
}
func TestFollowFromStartReadsExistingEntries(t *testing.T) {
path := filepath.Join(t.TempDir(), "daemon.jsonl")
w, err := NewWriter(path)
if err != nil {
t.Fatal(err)
}
if err := w.Append("daemon", "lifecycle", "building agent"); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
entries := make(chan Entry, 1)
errCh := make(chan error, 1)
go func() {
errCh <- FollowFromStartWithContext(ctx, path, "daemon", func(entry Entry) {
entries <- entry
cancel()
})
}()
select {
case entry := <-entries:
if entry.Line != "building agent" {
t.Fatalf("entry line got %q", entry.Line)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for existing entry")
}
if err := <-errCh; err != nil {
t.Fatalf("follow from start: %v", err)
}
}
func TestFollowReopensReplacedLogFile(t *testing.T) {
path := filepath.Join(t.TempDir(), "daemon.jsonl")
if err := os.WriteFile(path, []byte(""), 0644); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
entries := make(chan Entry, 1)
errCh := make(chan error, 1)
go func() {
errCh <- FollowWithContext(ctx, path, "", func(entry Entry) {
entries <- entry
})
}()
time.Sleep(20 * time.Millisecond)
if err := os.Remove(path); err != nil {
t.Fatal(err)
}
w, err := NewWriter(path)
if err != nil {
t.Fatal(err)
}
if err := w.Append("server", "stdout", "after restart"); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
select {
case entry := <-entries:
if entry.Line != "after restart" {
t.Fatalf("entry line got %q", entry.Line)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for followed entry")
}
cancel()
if err := <-errCh; err != nil {
t.Fatalf("follow: %v", err)
}
}

View File

@@ -0,0 +1,101 @@
package runtime
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"syscall"
"time"
)
var ErrAlreadyRunning = errors.New("browseros-dogfood is already running")
type Lock struct {
file *os.File
path string
}
type RunState struct {
PID int `json:"pid"`
Mode string `json:"mode"`
StartedAt time.Time `json:"started_at"`
SocketPath string `json:"socket_path"`
LogPath string `json:"log_path"`
}
func AcquireLock(path string) (*Lock, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
file.Close()
if errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EAGAIN) {
return nil, ErrAlreadyRunning
}
return nil, err
}
return &Lock{file: file, path: path}, nil
}
func (l *Lock) Close() error {
if l == nil || l.file == nil {
return nil
}
unlockErr := syscall.Flock(int(l.file.Fd()), syscall.LOCK_UN)
closeErr := l.file.Close()
l.file = nil
if unlockErr != nil {
return unlockErr
}
return closeErr
}
func WriteRunState(path string, state RunState) error {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
return err
}
data = append(data, '\n')
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0644); err != nil {
return err
}
return os.Rename(tmp, path)
}
func ReadRunState(path string) (RunState, error) {
data, err := os.ReadFile(path)
if err != nil {
return RunState{}, err
}
var state RunState
if err := json.Unmarshal(data, &state); err != nil {
return RunState{}, fmt.Errorf("parse run state: %w", err)
}
return state, nil
}
func CleanupStaleRunFiles(statePath string) error {
state, err := ReadRunState(statePath)
if err != nil && !os.IsNotExist(err) {
return err
}
if err == nil && state.SocketPath != "" {
if removeErr := os.Remove(state.SocketPath); removeErr != nil && !os.IsNotExist(removeErr) {
return removeErr
}
}
if err := os.Remove(statePath); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}

View File

@@ -0,0 +1,75 @@
package runtime
import (
"errors"
"os"
"path/filepath"
"testing"
)
func TestLockExcludesSecondOwnerAndReleasesOnClose(t *testing.T) {
path := filepath.Join(t.TempDir(), "run.lock")
first, err := AcquireLock(path)
if err != nil {
t.Fatalf("first acquire: %v", err)
}
if _, err := AcquireLock(path); !errors.Is(err, ErrAlreadyRunning) {
t.Fatalf("second acquire got %v want ErrAlreadyRunning", err)
}
if err := first.Close(); err != nil {
t.Fatalf("close first lock: %v", err)
}
second, err := AcquireLock(path)
if err != nil {
t.Fatalf("second acquire after close: %v", err)
}
if err := second.Close(); err != nil {
t.Fatalf("close second lock: %v", err)
}
}
func TestRunStateRemovesStaleSocketWhenLockIsAcquired(t *testing.T) {
dir := t.TempDir()
socketPath := filepath.Join(dir, "daemon.sock")
statePath := filepath.Join(dir, "state.json")
if err := WriteRunState(statePath, RunState{
PID: 12345,
Mode: "background",
SocketPath: socketPath,
}); err != nil {
t.Fatalf("write state: %v", err)
}
if err := touch(socketPath); err != nil {
t.Fatalf("touch socket: %v", err)
}
if err := CleanupStaleRunFiles(statePath); err != nil {
t.Fatalf("cleanup: %v", err)
}
if exists(socketPath) {
t.Fatalf("stale socket still exists")
}
if exists(statePath) {
t.Fatalf("stale state still exists")
}
}
func touch(path string) error {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
return file.Close()
}
func exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}