Fix warping into local project, add tests

This commit is contained in:
James Long
2026-05-04 15:19:14 -04:00
parent b7b57a0172
commit 71406ae27d
11 changed files with 461 additions and 107 deletions

View File

@@ -46,7 +46,19 @@ export function DialogSessionList() {
const workspace = project.workspace.get(session.workspaceID!)
const list = () => dialog.replace(() => <DialogSessionList />)
const warp = async (selection: WorkspaceSelection) => {
if (selection.type === "none") return
if (selection.type === "none") {
await warpWorkspaceSession({
dialog,
sdk,
sync,
project,
toast,
workspaceID: null,
sessionID: session.id,
done: list,
})
return
}
const workspaceID = await (async () => {
if (selection.type === "existing") return selection.workspaceID
const result = await sdk.client.experimental.workspace

View File

@@ -71,17 +71,21 @@ export async function warpWorkspaceSession(input: {
sync: ReturnType<typeof useSync>
project: ReturnType<typeof useProject>
toast: ReturnType<typeof useToast>
workspaceID: string
workspaceID: string | null
sessionID: string
done?: () => void
showSuccessToast?: boolean
}): Promise<boolean> {
const result = await input.sdk.client.experimental.workspace
.warp({
id: input.workspaceID,
sessionID: input.sessionID,
})
.catch(() => undefined)
const result = await (input.workspaceID === null
? input.sdk.client.experimental.workspace.detach({
workspaceID: null,
sessionID: input.sessionID,
})
: input.sdk.client.experimental.workspace.warp({
id: input.workspaceID,
sessionID: input.sessionID,
})
).catch(() => undefined)
if (!result || result.error) {
input.toast.show({
message: `Failed to warp session: ${errorMessage(result?.error ?? "no response")}`,
@@ -98,7 +102,7 @@ export async function warpWorkspaceSession(input: {
if (input.showSuccessToast !== false) {
input.toast.show({
message: "Session warped into the new workspace",
message: input.workspaceID === null ? "Session moved to the local project" : "Session warped into the new workspace",
variant: "success",
})
}

View File

@@ -212,16 +212,13 @@ export function Prompt(props: PromptProps) {
if (selection.type === "new") void createWorkspace(selection)
return
}
if (selection.type === "none") {
dialog.clear()
return
}
selectWorkspace(selection)
dialog.clear()
const workspace =
selection.type === "existing"
selection.type === "none"
? { id: null, name: "local project" }
: selection.type === "existing"
? { id: selection.workspaceID, name: selection.workspaceName }
: await createWorkspace(selection)
if (!workspace) return
@@ -1474,86 +1471,84 @@ export function Prompt(props: PromptProps) {
<box width="100%" flexDirection="row" justifyContent="space-between">
<Switch>
<Match when={status().type !== "idle"}>
<Show when={true}>
<box
flexDirection="row"
gap={1}
flexGrow={1}
justifyContent={status().type === "retry" ? "space-between" : "flex-start"}
>
<box flexShrink={0} flexDirection="row" gap={1}>
<box marginLeft={1}>
<Show when={kv.get("animations_enabled", true)} fallback={<text fg={theme.textMuted}>[]</text>}>
<spinner color={spinnerDef().color} frames={spinnerDef().frames} interval={40} />
</Show>
</box>
<box flexDirection="row" gap={1} flexShrink={0}>
{(() => {
const retry = createMemo(() => {
const s = status()
if (s.type !== "retry") return
return s
})
const message = createMemo(() => {
const r = retry()
if (!r) return
if (r.message.includes("exceeded your current quota") && r.message.includes("gemini"))
return "gemini is way too hot right now"
if (r.message.length > 80) return r.message.slice(0, 80) + "..."
return r.message
})
const isTruncated = createMemo(() => {
const r = retry()
if (!r) return false
return r.message.length > 120
})
const [seconds, setSeconds] = createSignal(0)
onMount(() => {
const timer = setInterval(() => {
const next = retry()?.next
if (next) setSeconds(Math.round((next - Date.now()) / 1000))
}, 1000)
onCleanup(() => {
clearInterval(timer)
})
})
const handleMessageClick = () => {
const r = retry()
if (!r) return
if (isTruncated()) {
void DialogAlert.show(dialog, "Retry Error", r.message)
}
}
const retryText = () => {
const r = retry()
if (!r) return ""
const baseMessage = message()
const truncatedHint = isTruncated() ? " (click to expand)" : ""
const duration = formatDuration(seconds())
const retryInfo = ` [retrying ${duration ? `in ${duration} ` : ""}attempt #${r.attempt}]`
return baseMessage + truncatedHint + retryInfo
}
return (
<Show when={retry()}>
<box onMouseUp={handleMessageClick}>
<text fg={theme.error}>{retryText()}</text>
</box>
</Show>
)
})()}
</box>
<box
flexDirection="row"
gap={1}
flexGrow={1}
justifyContent={status().type === "retry" ? "space-between" : "flex-start"}
>
<box flexShrink={0} flexDirection="row" gap={1}>
<box marginLeft={1}>
<Show when={kv.get("animations_enabled", true)} fallback={<text fg={theme.textMuted}>[]</text>}>
<spinner color={spinnerDef().color} frames={spinnerDef().frames} interval={40} />
</Show>
</box>
<box flexDirection="row" gap={1} flexShrink={0}>
{(() => {
const retry = createMemo(() => {
const s = status()
if (s.type !== "retry") return
return s
})
const message = createMemo(() => {
const r = retry()
if (!r) return
if (r.message.includes("exceeded your current quota") && r.message.includes("gemini"))
return "gemini is way too hot right now"
if (r.message.length > 80) return r.message.slice(0, 80) + "..."
return r.message
})
const isTruncated = createMemo(() => {
const r = retry()
if (!r) return false
return r.message.length > 120
})
const [seconds, setSeconds] = createSignal(0)
onMount(() => {
const timer = setInterval(() => {
const next = retry()?.next
if (next) setSeconds(Math.round((next - Date.now()) / 1000))
}, 1000)
onCleanup(() => {
clearInterval(timer)
})
})
const handleMessageClick = () => {
const r = retry()
if (!r) return
if (isTruncated()) {
void DialogAlert.show(dialog, "Retry Error", r.message)
}
}
const retryText = () => {
const r = retry()
if (!r) return ""
const baseMessage = message()
const truncatedHint = isTruncated() ? " (click to expand)" : ""
const duration = formatDuration(seconds())
const retryInfo = ` [retrying ${duration ? `in ${duration} ` : ""}attempt #${r.attempt}]`
return baseMessage + truncatedHint + retryInfo
}
return (
<Show when={retry()}>
<box onMouseUp={handleMessageClick}>
<text fg={theme.error}>{retryText()}</text>
</box>
</Show>
)
})()}
</box>
<text fg={store.interrupt > 0 ? theme.primary : theme.text}>
esc{" "}
<span style={{ fg: store.interrupt > 0 ? theme.primary : theme.textMuted }}>
{store.interrupt > 0 ? "again to interrupt" : "interrupt"}
</span>
</text>
</box>
</Show>
<text fg={store.interrupt > 0 ? theme.primary : theme.text}>
esc{" "}
<span style={{ fg: store.interrupt > 0 ? theme.primary : theme.textMuted }}>
{store.interrupt > 0 ? "again to interrupt" : "interrupt"}
</span>
</text>
</box>
</Match>
<Match when={warpNotice()}>
{(notice) => (

View File

@@ -6,6 +6,7 @@ import { asc } from "drizzle-orm"
import { eq } from "drizzle-orm"
import { inArray } from "drizzle-orm"
import { Project } from "@/project/project"
import { Instance } from "@/project/instance"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { Auth } from "@/auth"
@@ -83,7 +84,7 @@ export const CreateInput = Schema.Struct({
export type CreateInput = Schema.Schema.Type<typeof CreateInput>
export const SessionWarpInput = Schema.Struct({
workspaceID: WorkspaceID,
workspaceID: Schema.NullOr(WorkspaceID),
sessionID: SessionID,
}).pipe(withStatics((s) => ({ zod: effectZod(s), zodObject: zodObject(s) })))
export type SessionWarpInput = Schema.Schema.Type<typeof SessionWarpInput>
@@ -504,13 +505,6 @@ export const layer = Layer.effect(
sessionID: input.sessionID,
})
const space = yield* get(input.workspaceID)
if (!space)
return yield* new WorkspaceNotFoundError({
message: `Workspace not found: ${input.workspaceID}`,
workspaceID: input.workspaceID,
})
const current = yield* db((db) =>
db
.select({ workspaceID: SessionTable.workspace_id })
@@ -541,10 +535,36 @@ export const layer = Layer.effect(
// "claim" this session so any future events coming from
// the old workspace are ignored
SyncEvent.claim(input.sessionID, input.workspaceID)
SyncEvent.claim(input.sessionID, input.workspaceID ?? Instance.project.id)
}
}
if (input.workspaceID === null) {
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, {
sessionID: input.sessionID,
info: {
workspaceID: null,
},
}),
)
log.info("session warp complete", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
target: "local",
})
return
}
const workspaceID = input.workspaceID
const space = yield* get(workspaceID)
if (!space)
return yield* new WorkspaceNotFoundError({
message: `Workspace not found: ${workspaceID}`,
workspaceID,
})
const adaptor = getAdaptor(space.projectID, space.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
@@ -624,8 +644,8 @@ export const layer = Layer.effect(
body,
})
return yield* new SessionWarpHttpError({
message: `Failed to warp session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${response.status} ${body}`,
workspaceID: input.workspaceID,
message: `Failed to warp session ${input.sessionID} into workspace ${workspaceID}: HTTP ${response.status} ${body}`,
workspaceID,
sessionID: input.sessionID,
status: response.status,
body,
@@ -658,8 +678,8 @@ export const layer = Layer.effect(
body,
})
return yield* new SessionWarpHttpError({
message: `Failed to steal session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${response.status} ${body}`,
workspaceID: input.workspaceID,
message: `Failed to steal session ${input.sessionID} into workspace ${workspaceID}: HTTP ${response.status} ${body}`,
workspaceID,
sessionID: input.sessionID,
status: response.status,
body,

View File

@@ -137,6 +137,25 @@ export const WorkspaceRoutes = lazy(() =>
return c.json(await Workspace.remove(id))
},
)
.post(
"/warp",
describeRoute({
summary: "Warp session into workspace",
description: "Move a session's sync history into the target workspace, or detach it to the local project.",
operationId: "experimental.workspace.detach",
responses: {
204: {
description: "Session warped",
},
...errors(400),
},
}),
validator("json", Workspace.SessionWarpInput.zodObject),
async (c) => {
await Workspace.sessionWarp(c.req.valid("json") as Workspace.SessionWarpInput)
return c.body(null, 204)
},
)
.post(
"/:id/warp",
describeRoute({

View File

@@ -16,6 +16,7 @@ export const WorkspacePaths = {
list: root,
status: `${root}/status`,
remove: `${root}/:id`,
warpLocal: `${root}/warp`,
warp: `${root}/:id/warp`,
} as const
@@ -72,6 +73,17 @@ export const WorkspaceApi = HttpApi.make("workspace")
description: "Remove an existing workspace.",
}),
),
HttpApiEndpoint.post("warpLocal", WorkspacePaths.warpLocal, {
payload: Workspace.SessionWarpInput,
success: described(HttpApiSchema.NoContent, "Session warped"),
error: HttpApiError.BadRequest,
}).annotateMerge(
OpenApi.annotations({
identifier: "experimental.workspace.detach",
summary: "Warp session into workspace",
description: "Move a session's sync history into the target workspace, or detach it to the local project.",
}),
),
HttpApiEndpoint.post("warp", WorkspacePaths.warp, {
params: { id: Workspace.Info.fields.id },
payload: WarpPayload,

View File

@@ -56,12 +56,21 @@ export const workspaceHandlers = HttpApiBuilder.group(InstanceHttpApi, "workspac
return HttpApiSchema.NoContent.make()
})
const warpLocal = Effect.fn("WorkspaceHttpApi.warpLocal")(function* (ctx: {
payload: typeof Workspace.SessionWarpInput.Type
}) {
const instance = yield* InstanceState.context
yield* Effect.promise(() => Instance.restore(instance, () => Workspace.sessionWarp(ctx.payload)))
return HttpApiSchema.NoContent.make()
})
return handlers
.handle("adaptors", adaptors)
.handle("list", list)
.handle("create", create)
.handle("status", status)
.handle("remove", remove)
.handle("warpLocal", warpLocal)
.handle("warp", warp)
}),
)

View File

@@ -297,6 +297,16 @@ function sessionSequence(sessionID: SessionID) {
)?.seq
}
function sessionSequenceOwner(sessionID: SessionID) {
return Database.use((db) =>
db
.select({ ownerID: EventSequenceTable.owner_id })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, sessionID))
.get(),
)?.ownerID
}
function sessionUpdatedType() {
return SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version)
}
@@ -586,6 +596,134 @@ describe("workspace-old CRUD", () => {
expect(await WorkspaceOld.get(info.id)).toBeUndefined()
})
})
test("sessionWarp moves a session into a local workspace and claims ownership", async () => {
await withInstance(async (dir) => {
const previousType = unique("warp-prev-local")
const targetType = unique("warp-target-local")
const previous = workspaceInfo(Instance.project.id, previousType)
const target = workspaceInfo(Instance.project.id, targetType)
insertWorkspace(previous)
insertWorkspace(target)
registerAdaptor(Instance.project.id, previousType, localAdaptor(path.join(dir, "warp-prev-local")).adaptor)
registerAdaptor(Instance.project.id, targetType, localAdaptor(path.join(dir, "warp-target-local")).adaptor)
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
attachSessionToWorkspace(session.id, previous.id)
await WorkspaceOld.sessionWarp({ workspaceID: target.id, sessionID: session.id })
expect(
Database.use((db) =>
db.select({ workspaceID: SessionTable.workspace_id }).from(SessionTable).where(eq(SessionTable.id, session.id)).get(),
)?.workspaceID,
).toBe(target.id)
expect(sessionSequenceOwner(session.id)).toBe(target.id)
})
})
test("sessionWarp detaches a session to the local project and claims project ownership", async () => {
await withInstance(async (dir) => {
const previousType = unique("warp-detach-local")
const previous = workspaceInfo(Instance.project.id, previousType)
insertWorkspace(previous)
registerAdaptor(Instance.project.id, previousType, localAdaptor(path.join(dir, "warp-detach-local")).adaptor)
const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
attachSessionToWorkspace(session.id, previous.id)
await WorkspaceOld.sessionWarp({ workspaceID: null, sessionID: session.id })
expect(
Database.use((db) =>
db.select({ workspaceID: SessionTable.workspace_id }).from(SessionTable).where(eq(SessionTable.id, session.id)).get(),
)?.workspaceID,
).toBeNull()
expect(sessionSequenceOwner(session.id)).toBe(Instance.project.id)
})
})
it.live("sessionWarp syncs previous remote history, replays it, steals, and claims the sequence", () => {
const calls: FetchCall[] = []
let historySessionID: SessionID | undefined
let historyNextSeq = 0
return Effect.gen(function* () {
yield* HttpServer.serveEffect()(
Effect.gen(function* () {
const req = yield* HttpServerRequest.HttpServerRequest
const bodyText = yield* req.text
const call = {
url: new URL(req.url, "http://localhost"),
method: req.method,
headers: new Headers(req.headers),
bodyText,
json: bodyText ? JSON.parse(bodyText) : undefined,
}
calls.push(call)
if (call.url.pathname === "/warp-source/sync/history") {
return yield* HttpServerResponse.json([
{
id: `evt_${unique("warp-source-history")}`,
aggregate_id: historySessionID!,
seq: historyNextSeq,
type: sessionUpdatedType(),
data: { sessionID: historySessionID!, info: { title: "from source history" } },
},
])
}
if (call.url.pathname === "/warp-target/sync/replay") return yield* HttpServerResponse.json({ sessionID: "ok" })
if (call.url.pathname === "/warp-target/sync/steal") return yield* HttpServerResponse.json({ sessionID: "ok" })
return HttpServerResponse.text("unexpected", { status: 500 })
}),
)
const url = yield* serverUrl()
yield* provideTmpdirInstance(
() =>
Effect.gen(function* () {
const workspace = yield* WorkspaceOld.Service
const sessionSvc = yield* SessionNs.Service
const previousType = unique("warp-remote-source")
const targetType = unique("warp-remote-target")
const previous = workspaceInfo(Instance.project.id, previousType)
const target = workspaceInfo(Instance.project.id, targetType, { directory: "remote-target-dir" })
insertWorkspace(previous)
insertWorkspace(target)
registerAdaptor(Instance.project.id, previousType, remoteAdaptor(`${url}/warp-source`).adaptor)
registerAdaptor(Instance.project.id, targetType, remoteAdaptor(`${url}/warp-target`).adaptor)
const session = yield* sessionSvc.create({})
attachSessionToWorkspace(session.id, previous.id)
historySessionID = session.id
historyNextSeq = (sessionSequence(session.id) ?? -1) + 1
yield* workspace.sessionWarp({ workspaceID: target.id, sessionID: session.id })
expect(calls.map((call) => `${call.method} ${call.url.pathname}`)).toEqual([
"POST /warp-source/sync/history",
"POST /warp-target/sync/replay",
"POST /warp-target/sync/steal",
])
expect(calls[0].json).toEqual({ [session.id]: historyNextSeq - 1 })
expect(calls[1].json).toMatchObject({
directory: "remote-target-dir",
events: [
{
aggregateID: session.id,
seq: 0,
type: SyncEvent.versionedType(SessionNs.Event.Created.type, SessionNs.Event.Created.version),
},
{
aggregateID: session.id,
seq: historyNextSeq,
type: sessionUpdatedType(),
},
],
})
expect(calls[2].json).toEqual({ sessionID: session.id })
expect((yield* sessionSvc.get(session.id)).title).toBe("from source history")
expect(sessionSequenceOwner(session.id)).toBe(target.id)
}),
{ git: true },
)
})
})
})
describe("workspace-old sync state", () => {

View File

@@ -5,7 +5,7 @@ import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
import { Database } from "@/storage/db"
import { EventTable } from "../../src/sync/event.sql"
import { EventSequenceTable, EventTable } from "../../src/sync/event.sql"
import { Identifier } from "../../src/id/id"
import { Flag } from "@opencode-ai/core/flag/flag"
import { initProjectors } from "../../src/server/projectors"
@@ -233,5 +233,72 @@ describe("SyncEvent", () => {
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
}),
)
test(
"claims unowned event sequence on replay with ownerID",
withInstance(() => {
const { Created } = setup()
const id = Identifier.descending("message")
SyncEvent.replay(
{
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 0,
aggregateID: id,
data: { id, name: "owned" },
},
{ publish: false, ownerID: "owner-1" },
)
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
.from(EventSequenceTable)
.get(),
)
expect(row).toEqual({ seq: 0, ownerID: "owner-1" })
}),
)
test(
"ignores replay from a different owner after sequence is claimed",
withInstance(() => {
const { Created } = setup()
const id = Identifier.descending("message")
SyncEvent.replay(
{
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 0,
aggregateID: id,
data: { id, name: "first" },
},
{ publish: false, ownerID: "owner-1" },
)
SyncEvent.replay(
{
id: "evt_2",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 1,
aggregateID: id,
data: { id, name: "ignored" },
},
{ publish: false, ownerID: "owner-2" },
)
const events = Database.use((db) => db.select().from(EventTable).all())
const sequence = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
.from(EventSequenceTable)
.get(),
)
expect(events).toHaveLength(1)
expect(events[0].id).toBe("evt_1")
expect(sequence).toEqual({ seq: 0, ownerID: "owner-1" })
}),
)
})
})

View File

@@ -32,6 +32,8 @@ import type {
ExperimentalWorkspaceAdaptorListResponses,
ExperimentalWorkspaceCreateErrors,
ExperimentalWorkspaceCreateResponses,
ExperimentalWorkspaceDetachErrors,
ExperimentalWorkspaceDetachResponses,
ExperimentalWorkspaceListResponses,
ExperimentalWorkspaceRemoveErrors,
ExperimentalWorkspaceRemoveResponses,
@@ -654,6 +656,49 @@ export class Workspace extends HeyApiClient {
})
}
/**
* Warp session into workspace
*
* Move a session's sync history into the target workspace, or detach it to the local project.
*/
public detach<ThrowOnError extends boolean = false>(
parameters?: {
directory?: string
workspace?: string
workspaceID?: string | null
sessionID?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "query", key: "directory" },
{ in: "query", key: "workspace" },
{ in: "body", key: "workspaceID" },
{ in: "body", key: "sessionID" },
],
},
],
)
return (options?.client ?? this.client).post<
ExperimentalWorkspaceDetachResponses,
ExperimentalWorkspaceDetachErrors,
ThrowOnError
>({
url: "/experimental/workspace/warp",
...options,
...params,
headers: {
"Content-Type": "application/json",
...options?.headers,
...params.headers,
},
})
}
/**
* Remove workspace
*

View File

@@ -2520,6 +2520,39 @@ export type ExperimentalWorkspaceStatusResponses = {
export type ExperimentalWorkspaceStatusResponse =
ExperimentalWorkspaceStatusResponses[keyof ExperimentalWorkspaceStatusResponses]
export type ExperimentalWorkspaceDetachData = {
body?: {
workspaceID: string | null
sessionID: string
}
path?: never
query?: {
directory?: string
workspace?: string
}
url: "/experimental/workspace/warp"
}
export type ExperimentalWorkspaceDetachErrors = {
/**
* Bad request
*/
400: BadRequestError
}
export type ExperimentalWorkspaceDetachError =
ExperimentalWorkspaceDetachErrors[keyof ExperimentalWorkspaceDetachErrors]
export type ExperimentalWorkspaceDetachResponses = {
/**
* Session warped
*/
204: void
}
export type ExperimentalWorkspaceDetachResponse =
ExperimentalWorkspaceDetachResponses[keyof ExperimentalWorkspaceDetachResponses]
export type ExperimentalWorkspaceRemoveData = {
body?: never
path: {