Compare commits

...

2 Commits

Author SHA1 Message Date
Nikhil Sonti
ad716fca78 fix(dev): address watch lock review comments 2026-04-30 11:44:07 -07:00
Nikhil Sonti
df8ff02b8f fix(dev): use run lock for watch cleanup 2026-04-30 11:29:00 -07:00
3 changed files with 448 additions and 107 deletions

View File

@@ -48,6 +48,26 @@ func runWatch(cmd *cobra.Command, args []string) error {
p := defaultPorts
var reservations *proc.PortReservations
userDataDir := "/tmp/browseros-dev"
mode := "watch"
if watchManual {
mode = "manual"
}
var runLock *proc.WatchRunLock
acquireRunLock := func(ports proc.Ports) error {
lock, stopped, err := proc.AcquireWatchRunLock(proc.WatchRunIdentity{
Mode: mode,
Profile: userDataDir,
Ports: ports,
}, 3*time.Second)
if err != nil {
return err
}
runLock = lock
if stopped {
proc.LogMsgf(proc.TagInfo, "Stopped existing dev watch for profile %s", userDataDir)
}
return nil
}
if watchNew {
proc.LogMsg(proc.TagInfo, "Selecting random available ports...")
@@ -62,17 +82,16 @@ func runWatch(cmd *cobra.Command, args []string) error {
}
userDataDir = dir
proc.LogMsgf(proc.TagInfo, "Created fresh profile: %s", userDataDir)
if err := acquireRunLock(p); err != nil {
return err
}
} else {
if err := os.MkdirAll(userDataDir, 0o755); err != nil {
return fmt.Errorf("creating user-data dir: %w", err)
}
stopped, err := proc.StopExistingWatchProcesses(3 * time.Second)
if err != nil {
if err := acquireRunLock(p); err != nil {
return err
}
if stopped > 0 {
proc.LogMsgf(proc.TagInfo, "Stopped %d existing dev watch process group(s)", stopped)
}
proc.LogMsg(proc.TagInfo, "Killing processes on preferred ports...")
if err := proc.KillPortsAndWait(defaultPorts, 3*time.Second); err != nil {
return err
@@ -89,13 +108,14 @@ func runWatch(cmd *cobra.Command, args []string) error {
p.CDP, p.Server, p.Extension)
}
}
defer func() {
if err := runLock.Close(); err != nil {
proc.LogMsgf(proc.TagInfo, "Warning: closing run lock: %v", err)
}
}()
defer reservations.ReleaseAll()
fmt.Println()
mode := "watch"
if watchManual {
mode = "manual"
}
proc.LogMsgf(proc.TagInfo, "Mode: %s", proc.BoldColor.Sprint(mode))
proc.LogMsgf(proc.TagInfo, "Ports: CDP=%d Server=%d Extension=%d", p.CDP, p.Server, p.Extension)
proc.LogMsgf(proc.TagInfo, "Profile: %s", userDataDir)

View File

@@ -1,113 +1,278 @@
package proc
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"os/exec"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
"time"
)
// StopExistingWatchProcesses terminates older default-profile watch supervisors.
// Port cleanup cannot see a previous watch process while it is still waiting
// for CDP, but that process will wake up later and race the new supervisor.
func StopExistingWatchProcesses(timeout time.Duration) (int, error) {
currentPGID, err := syscall.Getpgid(0)
var errWatchRunLocked = errors.New("dev watch run is already locked")
const maxTCPPort = 65535
type WatchRunIdentity struct {
Mode string `json:"mode"`
Profile string `json:"profile"`
Ports Ports `json:"ports"`
}
type WatchRunState struct {
PID int `json:"pid"`
PGID int `json:"pgid"`
StartedAt time.Time `json:"started_at"`
Identity WatchRunIdentity `json:"identity"`
}
type WatchRunLock struct {
file *os.File
statePath string
}
type watchRunPathsResult struct {
Lock string
State string
}
// AcquireWatchRunLock claims ownership of the current dev watch identity.
// If the same run identity is already active, it terminates the recorded
// process group from the state file and waits for the OS lock to be released.
func AcquireWatchRunLock(identity WatchRunIdentity, timeout time.Duration) (*WatchRunLock, bool, error) {
baseDir, err := DefaultWatchRunBaseDir()
if err != nil {
return 0, fmt.Errorf("reading current process group: %w", err)
return nil, false, err
}
return AcquireWatchRunLockInDir(baseDir, identity, timeout)
}
// AcquireWatchRunLockInDir is AcquireWatchRunLock with an explicit base
// directory so tests can exercise flock behavior without touching user state.
func AcquireWatchRunLockInDir(baseDir string, identity WatchRunIdentity, timeout time.Duration) (*WatchRunLock, bool, error) {
identity = normalizeWatchRunIdentity(identity)
if err := validateWatchRunIdentity(identity); err != nil {
return nil, false, err
}
if baseDir == "" {
return nil, false, fmt.Errorf("watch run base dir is empty")
}
groups, err := currentWatchProcessGroups(currentPGID)
if err != nil {
return 0, err
}
if len(groups) == 0 {
return 0, nil
}
for _, pgid := range groups {
if err := signalProcessGroup(pgid, syscall.SIGTERM); err != nil {
return 0, err
paths := watchRunPaths(baseDir, identity)
lock, err := tryAcquireWatchRunLock(paths.Lock, paths.State)
if err == nil {
if err := lock.writeState(identity); err != nil {
lock.Close()
return nil, false, err
}
return lock, false, nil
}
if !errors.Is(err, errWatchRunLocked) {
return nil, false, err
}
state, err := readWatchRunStateWithRetry(paths.State, 250*time.Millisecond)
if err != nil {
return nil, false, fmt.Errorf("dev watch lock is held but state is unreadable at %s: %w", paths.State, err)
}
if state.Identity != identity {
return nil, false, fmt.Errorf("dev watch lock state identity mismatch at %s", paths.State)
}
if state.PGID <= 0 {
return nil, false, fmt.Errorf("dev watch lock state is missing a process group at %s", paths.State)
}
if err := signalProcessGroup(state.PGID, syscall.SIGTERM); err != nil {
return nil, false, err
}
lock, err = waitForWatchRunLock(paths, identity, timeout)
if err == nil {
return lock, true, nil
}
if !errors.Is(err, errWatchRunLocked) {
return nil, false, err
}
if err := signalProcessGroup(state.PGID, syscall.SIGKILL); err != nil {
return nil, false, err
}
lock, err = waitForWatchRunLock(paths, identity, time.Second)
if err != nil {
if errors.Is(err, errWatchRunLocked) {
return nil, false, fmt.Errorf("previous dev watch process group %d did not exit after SIGKILL; inspect %s before retrying", state.PGID, paths.Lock)
}
return nil, false, err
}
return lock, true, nil
}
// DefaultWatchRunBaseDir returns the shared location for dev watch lock files.
// Individual runs are separated by a hash of profile, ports, and mode.
func DefaultWatchRunBaseDir() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, ".browseros-dev", "runs"), nil
}
func (l *WatchRunLock) Close() error {
if l == nil || l.file == nil {
return nil
}
// Keep the lock file path stable. Unlinking it during handoff can let
// another opener lock a different inode while an owner still holds this one.
removeErr := os.Remove(l.statePath)
unlockErr := syscall.Flock(int(l.file.Fd()), syscall.LOCK_UN)
closeErr := l.file.Close()
l.file = nil
if removeErr != nil && !os.IsNotExist(removeErr) {
return removeErr
}
if unlockErr != nil {
return unlockErr
}
return closeErr
}
// ReadWatchRunState reads the metadata used to terminate a previous owner.
// The state file is not the lock; it is only trusted after flock says a run is active.
func ReadWatchRunState(path string) (WatchRunState, error) {
data, err := os.ReadFile(path)
if err != nil {
return WatchRunState{}, err
}
var state WatchRunState
if err := json.Unmarshal(data, &state); err != nil {
return WatchRunState{}, fmt.Errorf("parse watch run state: %w", err)
}
return state, nil
}
func readWatchRunStateWithRetry(path string, timeout time.Duration) (WatchRunState, error) {
deadline := time.Now().Add(timeout)
var lastErr error
for {
state, err := ReadWatchRunState(path)
if err == nil {
return state, nil
}
lastErr = err
if time.Now().After(deadline) {
return WatchRunState{}, lastErr
}
time.Sleep(50 * time.Millisecond)
}
}
func watchRunPaths(baseDir string, identity WatchRunIdentity) watchRunPathsResult {
identity = normalizeWatchRunIdentity(identity)
sum := sha256.Sum256([]byte(fmt.Sprintf("%s\x00%s\x00%d\x00%d\x00%d",
identity.Mode,
identity.Profile,
identity.Ports.CDP,
identity.Ports.Server,
identity.Ports.Extension,
)))
key := hex.EncodeToString(sum[:])
return watchRunPathsResult{
Lock: filepath.Join(baseDir, "watch-"+key+".lock"),
State: filepath.Join(baseDir, "watch-"+key+".json"),
}
}
func normalizeWatchRunIdentity(identity WatchRunIdentity) WatchRunIdentity {
identity.Profile = filepath.Clean(identity.Profile)
return identity
}
func tryAcquireWatchRunLock(lockPath string, statePath string) (*WatchRunLock, error) {
if err := os.MkdirAll(filepath.Dir(lockPath), 0o755); err != nil {
return nil, err
}
file, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0o644)
if err != nil {
return nil, err
}
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
file.Close()
if errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EAGAIN) {
return nil, errWatchRunLocked
}
return nil, err
}
return &WatchRunLock{file: file, statePath: statePath}, nil
}
func (l *WatchRunLock) writeState(identity WatchRunIdentity) error {
pgid, err := syscall.Getpgid(0)
if err != nil {
return fmt.Errorf("reading current process group: %w", err)
}
state := WatchRunState{
PID: os.Getpid(),
PGID: pgid,
StartedAt: time.Now(),
Identity: identity,
}
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
return err
}
data = append(data, '\n')
tmp := l.statePath + ".tmp"
if err := os.WriteFile(tmp, data, 0o644); err != nil {
return err
}
return os.Rename(tmp, l.statePath)
}
func waitForWatchRunLock(paths watchRunPathsResult, identity WatchRunIdentity, timeout time.Duration) (*WatchRunLock, error) {
deadline := time.Now().Add(timeout)
for {
remaining, err := currentWatchProcessGroups(currentPGID)
if err != nil {
return 0, err
lock, err := tryAcquireWatchRunLock(paths.Lock, paths.State)
if err == nil {
if err := lock.writeState(identity); err != nil {
lock.Close()
return nil, err
}
return lock, nil
}
if len(remaining) == 0 {
return len(groups), nil
if !errors.Is(err, errWatchRunLocked) {
return nil, err
}
if time.Now().After(deadline) {
for _, pgid := range remaining {
if err := signalProcessGroup(pgid, syscall.SIGKILL); err != nil {
return 0, err
}
}
return len(groups), nil
return nil, errWatchRunLocked
}
time.Sleep(100 * time.Millisecond)
}
}
func currentWatchProcessGroups(currentPGID int) ([]int, error) {
output, err := exec.Command("ps", "-axo", "pid=,pgid=,command=").Output()
if err != nil {
return nil, fmt.Errorf("listing processes: %w", err)
func validateWatchRunIdentity(identity WatchRunIdentity) error {
if identity.Mode == "" {
return fmt.Errorf("watch run mode is empty")
}
return watchProcessGroupsFromPS(string(output), currentPGID), nil
if identity.Profile == "" {
return fmt.Errorf("watch run profile is empty")
}
if !isValidTCPPort(identity.Ports.CDP) || !isValidTCPPort(identity.Ports.Server) || !isValidTCPPort(identity.Ports.Extension) {
return fmt.Errorf("watch run ports are invalid: %+v", identity.Ports)
}
return nil
}
func watchProcessGroupsFromPS(output string, currentPGID int) []int {
seen := map[int]struct{}{}
for _, line := range strings.Split(output, "\n") {
fields := strings.Fields(line)
if len(fields) < 3 {
continue
}
pgid, err := strconv.Atoi(fields[1])
if err != nil || pgid == currentPGID {
continue
}
if isDefaultWatchCommand(fields[2:]) {
seen[pgid] = struct{}{}
}
}
groups := make([]int, 0, len(seen))
for pgid := range seen {
groups = append(groups, pgid)
}
sort.Ints(groups)
return groups
}
func isDefaultWatchCommand(commandFields []string) bool {
if len(commandFields) < 2 {
return false
}
if filepath.Base(commandFields[0]) != "browseros-dev" {
return false
}
if commandFields[1] != "watch" {
return false
}
for _, field := range commandFields[2:] {
if field == "--new" {
return false
}
}
return true
func isValidTCPPort(port int) bool {
return port > 0 && port <= maxTCPPort
}
func signalProcessGroup(pgid int, signal syscall.Signal) error {
if pgid <= 0 {
return nil
return fmt.Errorf("invalid process group %d", pgid)
}
if err := syscall.Kill(-pgid, signal); err != nil && err != syscall.ESRCH {
return fmt.Errorf("signaling process group %d: %w", pgid, err)

View File

@@ -1,32 +1,188 @@
package proc
import "testing"
import (
"encoding/json"
"os"
"os/exec"
"path/filepath"
"syscall"
"testing"
"time"
)
func TestWatchProcessGroupsFromPSSelectsOtherWatchGroups(t *testing.T) {
output := `
111 111 /tmp/one/browseros-dev watch
222 222 /tmp/two/browseros-dev watch --new
333 333 /tmp/one/browseros-dev cleanup
444 444 rg browseros-dev watch
555 555 bun run dev:watch
`
const watchLockHelperEnv = "BROWSEROS_DEV_WATCH_LOCK_HELPER"
groups := watchProcessGroupsFromPS(output, 999)
func TestMain(m *testing.M) {
if os.Getenv(watchLockHelperEnv) == "1" {
runWatchLockHelper()
return
}
os.Exit(m.Run())
}
if len(groups) != 1 || groups[0] != 111 {
t.Fatalf("expected only pgid 111, got %#v", groups)
func TestWatchRunPathsStableAndDistinct(t *testing.T) {
baseDir := t.TempDir()
identity := WatchRunIdentity{
Mode: "watch",
Profile: "/tmp/browseros-dev",
Ports: Ports{CDP: 9005, Server: 9105, Extension: 9305},
}
first := watchRunPaths(baseDir, identity)
second := watchRunPaths(baseDir, identity)
if first != second {
t.Fatalf("expected stable paths, got %#v and %#v", first, second)
}
withDifferentPort := identity
withDifferentPort.Ports.Server = 9106
third := watchRunPaths(baseDir, withDifferentPort)
if third.Lock == first.Lock || third.State == first.State {
t.Fatalf("expected distinct paths for different ports, got %#v and %#v", first, third)
}
}
func TestWatchProcessGroupsFromPSDedupesProcessGroups(t *testing.T) {
output := `
111 111 /tmp/one/browseros-dev watch
112 111 /tmp/one/browseros-dev watch
`
func TestAcquireWatchRunLockWritesStateAndReleases(t *testing.T) {
baseDir := t.TempDir()
identity := WatchRunIdentity{
Mode: "watch",
Profile: "/tmp/browseros-dev",
Ports: Ports{CDP: 9005, Server: 9105, Extension: 9305},
}
groups := watchProcessGroupsFromPS(output, 999)
lock, stopped, err := AcquireWatchRunLockInDir(baseDir, identity, time.Second)
if err != nil {
t.Fatalf("AcquireWatchRunLockInDir returned error: %v", err)
}
if stopped {
t.Fatal("expected first acquisition not to stop another run")
}
if len(groups) != 1 || groups[0] != 111 {
t.Fatalf("expected one pgid 111, got %#v", groups)
paths := watchRunPaths(baseDir, identity)
state, err := ReadWatchRunState(paths.State)
if err != nil {
t.Fatalf("ReadWatchRunState returned error: %v", err)
}
if state.PID != os.Getpid() {
t.Fatalf("expected state PID %d, got %d", os.Getpid(), state.PID)
}
if state.PGID <= 0 {
t.Fatalf("expected positive PGID, got %d", state.PGID)
}
if state.Identity != identity {
t.Fatalf("expected identity %#v, got %#v", identity, state.Identity)
}
if err := lock.Close(); err != nil {
t.Fatalf("closing lock: %v", err)
}
if _, err := os.Stat(paths.State); !os.IsNotExist(err) {
t.Fatalf("expected state file to be removed on close, got %v", err)
}
if _, err := os.Stat(paths.Lock); err != nil {
t.Fatalf("expected lock file path to remain reusable, got %v", err)
}
lock, stopped, err = AcquireWatchRunLockInDir(baseDir, identity, time.Second)
if err != nil {
t.Fatalf("reacquiring lock returned error: %v", err)
}
if stopped {
t.Fatal("expected reacquisition after close not to stop another run")
}
if err := lock.Close(); err != nil {
t.Fatalf("closing reacquired lock: %v", err)
}
}
func TestAcquireWatchRunLockRejectsInvalidPorts(t *testing.T) {
identity := WatchRunIdentity{
Mode: "watch",
Profile: "/tmp/browseros-dev",
Ports: Ports{CDP: 9005, Server: 65536, Extension: 9305},
}
if _, _, err := AcquireWatchRunLockInDir(t.TempDir(), identity, time.Second); err == nil {
t.Fatal("expected invalid port error")
}
}
func TestAcquireWatchRunLockStopsExistingOwnerByStatePGID(t *testing.T) {
baseDir := t.TempDir()
readyPath := filepath.Join(baseDir, "ready")
identity := WatchRunIdentity{
Mode: "watch",
Profile: "/tmp/browseros-dev",
Ports: Ports{CDP: 9005, Server: 9105, Extension: 9305},
}
identityJSON, err := json.Marshal(identity)
if err != nil {
t.Fatal(err)
}
cmd := exec.Command(os.Args[0], "-test.run=TestMain")
cmd.Env = append(os.Environ(),
watchLockHelperEnv+"=1",
"BROWSEROS_DEV_WATCH_LOCK_BASE="+baseDir,
"BROWSEROS_DEV_WATCH_LOCK_READY="+readyPath,
"BROWSEROS_DEV_WATCH_LOCK_IDENTITY="+string(identityJSON),
)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil {
t.Fatalf("starting helper: %v", err)
}
defer cmd.Process.Kill()
waitForFile(t, readyPath, 3*time.Second)
lock, stopped, err := AcquireWatchRunLockInDir(baseDir, identity, 3*time.Second)
if err != nil {
t.Fatalf("AcquireWatchRunLockInDir returned error: %v", err)
}
defer lock.Close()
if !stopped {
t.Fatal("expected takeover to stop existing owner")
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("expected helper process to exit after takeover")
}
}
func runWatchLockHelper() {
baseDir := os.Getenv("BROWSEROS_DEV_WATCH_LOCK_BASE")
readyPath := os.Getenv("BROWSEROS_DEV_WATCH_LOCK_READY")
var identity WatchRunIdentity
if err := json.Unmarshal([]byte(os.Getenv("BROWSEROS_DEV_WATCH_LOCK_IDENTITY")), &identity); err != nil {
os.Exit(2)
}
lock, _, err := AcquireWatchRunLockInDir(baseDir, identity, time.Second)
if err != nil {
os.Exit(3)
}
defer lock.Close()
if err := os.WriteFile(readyPath, []byte("ready\n"), 0o644); err != nil {
os.Exit(4)
}
time.Sleep(30 * time.Second)
}
func waitForFile(t *testing.T, path string, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for {
if _, err := os.Stat(path); err == nil {
return
}
if time.Now().After(deadline) {
t.Fatalf("timed out waiting for %s", path)
}
time.Sleep(50 * time.Millisecond)
}
}