refactor(server): simplify listener lifecycle

This commit is contained in:
Kit Langton
2026-05-13 21:19:00 -04:00
parent 3fc7486d15
commit 6fd0cda3e1
19 changed files with 292 additions and 215 deletions

View File

@@ -32,11 +32,6 @@
"bun": "./src/pty/pty.bun.ts",
"node": "./src/pty/pty.node.ts",
"default": "./src/pty/pty.bun.ts"
},
"#httpapi-server": {
"bun": "./src/server/httpapi-server.node.ts",
"node": "./src/server/httpapi-server.node.ts",
"default": "./src/server/httpapi-server.node.ts"
}
},
"devDependencies": {

View File

@@ -1,35 +0,0 @@
import { NodeHttpServer } from "@effect/platform-node"
import { Effect, Layer } from "effect"
import { createServer } from "node:http"
import { Service } from "./httpapi-server"
export { Service }
export const name = "node-http-server"
export type Opts = { port: number; hostname: string }
export const layer = (opts: Opts) => {
const server = createServer()
const serverRef = { closeStarted: false, forceStop: false }
const close = server.close.bind(server)
// Keep shutdown owned by NodeHttpServer, but honor listener.stop(true) by
// force-closing active HTTP sockets when its finalizer calls server.close().
server.close = ((callback?: Parameters<typeof server.close>[0]) => {
serverRef.closeStarted = true
const result = close(callback)
if (serverRef.forceStop) server.closeAllConnections()
return result
}) as typeof server.close
return Layer.mergeAll(
NodeHttpServer.layer(() => server, { port: opts.port, host: opts.hostname, gracefulShutdownTimeout: "1 second" }),
Layer.succeed(Service)(
Service.of({
closeAll: Effect.sync(() => {
serverRef.forceStop = true
if (serverRef.closeStarted) server.closeAllConnections()
}),
}),
),
)
}

View File

@@ -1,9 +0,0 @@
import { Context, Effect } from "effect"
export interface Interface {
readonly closeAll: Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/HttpApiServer") {}
export * as HttpApiServer from "./httpapi-server"

View File

@@ -24,5 +24,3 @@ export function initProjectors() {
},
})
}
initProjectors()

View File

@@ -1,4 +1,4 @@
import { Context, Effect, Layer } from "effect"
import { Config as EffectConfig, Context, Effect, Layer } from "effect"
import { HttpApiBuilder, OpenApi } from "effect/unstable/httpapi"
import {
FetchHttpClient,
@@ -88,15 +88,6 @@ import { schemaErrorLayer } from "./middleware/schema-error"
export const context = Context.makeUnsafe<unknown>(new Map())
const runtime = HttpRouter.middleware()(
Effect.succeed((effect) =>
Effect.gen(function* () {
yield* Effect.annotateCurrentSpan({ "opencode.server.backend": "effect-httpapi" })
return yield* effect
}),
),
).layer
const cors = (corsOptions?: CorsOptions) =>
HttpRouter.middleware(
HttpMiddleware.cors({
@@ -175,7 +166,16 @@ const uiRoute = HttpRouter.use((router) =>
}),
).pipe(Layer.provide(authOnlyRouterLayer))
export function createRoutes(corsOptions?: CorsOptions) {
type RouteRequirements =
| HttpRouter.HttpRouter
| HttpRouter.Request<"Error", unknown>
| HttpRouter.Request<"GlobalError", unknown>
| HttpRouter.Request<"Requires", unknown>
| HttpRouter.Request<"GlobalRequires", never>
export function createRoutes(
corsOptions?: CorsOptions,
): Layer.Layer<never, EffectConfig.ConfigError, RouteRequirements> {
return Layer.mergeAll(rootApiRoutes, eventApiRoutes, instanceRoutes, docRoute, uiRoute).pipe(
Layer.provide([
errorLayer,
@@ -183,7 +183,6 @@ export function createRoutes(corsOptions?: CorsOptions) {
corsVaryFix,
fenceLayer,
cors(corsOptions),
runtime,
Account.defaultLayer,
Agent.defaultLayer,
Auth.defaultLayer,
@@ -227,28 +226,19 @@ export function createRoutes(corsOptions?: CorsOptions) {
FetchHttpClient.layer,
HttpServer.layerServices,
]),
Layer.provideMerge(Layer.succeed(CorsConfig)(corsOptions)),
Layer.provideMerge(InstanceLayer.layer),
Layer.provideMerge(Observability.layer),
Layer.provide(Layer.succeed(CorsConfig)(corsOptions)),
Layer.provide(InstanceLayer.layer),
Layer.provide(Observability.layer),
)
}
export const routes = createRoutes()
const defaultWebHandler = lazy(() =>
export const webHandler = lazy(() =>
HttpRouter.toWebHandler(routes, {
memoMap,
middleware: disposeMiddleware,
}),
)
export function webHandler(corsOptions?: CorsOptions) {
if (!corsOptions?.cors?.length) return defaultWebHandler()
return HttpRouter.toWebHandler(createRoutes(corsOptions), {
// Server-level CORS options are dynamic; don't reuse the default route layer memoized without them.
memoMap: Layer.makeMemoMapUnsafe(),
middleware: disposeMiddleware,
})
}
export * as ExperimentalHttpApiServer from "./server"
export * as HttpApiApp from "./server"

View File

@@ -1,15 +1,17 @@
import { NodeHttpServer } from "@effect/platform-node"
import * as Log from "@opencode-ai/core/util/log"
import { ConfigProvider, Context, Effect, Exit, Layer, Scope } from "effect"
import { ConfigProvider, Context, Effect, Exit, Layer, Ref, Scope } from "effect"
import { HttpRouter, HttpServer } from "effect/unstable/http"
import { OpenApi } from "effect/unstable/httpapi"
import * as HttpApiServer from "#httpapi-server"
import { createServer } from "node:http"
import { MDNS } from "./mdns"
import { initProjectors } from "./projectors"
import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server"
import { HttpApiApp } from "./routes/instance/httpapi/server"
import { disposeMiddleware } from "./routes/instance/httpapi/lifecycle"
import { WebSocketTracker } from "./routes/instance/httpapi/websocket-tracker"
import { PublicApi } from "./routes/instance/httpapi/public"
import type { CorsOptions } from "./cors"
import { lazy } from "@/util/lazy"
// @ts-ignore This global is needed to prevent ai-sdk from logging warnings to stdout https://github.com/vercel/ai/blob/2dc67e0ef538307f21368db32d5a12345d98831b/packages/ai/src/logger/log-warnings.ts#L85
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -36,19 +38,34 @@ type ListenOptions = CorsOptions & {
mdns?: boolean
mdnsDomain?: string
}
type ListenerState = {
scope: Scope.Scope
server: Context.Service.Shape<typeof HttpServer.HttpServer>
http: ListenerServer
websockets: WebSocketTracker.Interface
}
type EffectListener = Omit<Listener, "stop"> & {
stop: (close?: boolean) => Effect.Effect<void>
}
const defaultHttpApi = (() => {
const handler = ExperimentalHttpApiServer.webHandler().handler
interface ListenerServer {
readonly closeAll: Effect.Effect<void>
}
class ListenerServerService extends Context.Service<ListenerServerService, ListenerServer>()(
"@opencode/ListenerServer",
) {}
export const Default = lazy(() => {
const handler = HttpApiApp.webHandler().handler
const app: ServerApp = {
fetch: (request: Request) => handler(request, ExperimentalHttpApiServer.context),
fetch: (request: Request) => handler(request, HttpApiApp.context),
request(input, init) {
return app.fetch(input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init))
},
}
return { app }
})()
export const Default = () => defaultHttpApi
})
export async function openapi() {
return OpenApi.fromApi(PublicApi)
@@ -57,102 +74,149 @@ export async function openapi() {
export let url: URL
export async function listen(opts: ListenOptions): Promise<Listener> {
log.info("server backend", { "opencode.server.runtime": HttpApiServer.name })
const buildLayer = (port: number) =>
HttpRouter.serve(ExperimentalHttpApiServer.createRoutes(opts), {
middleware: disposeMiddleware,
disableLogger: true,
disableListenLog: true,
}).pipe(
Layer.provideMerge(WebSocketTracker.layer),
Layer.provideMerge(HttpApiServer.layer({ port, hostname: opts.hostname })),
// Install a fresh `ConfigProvider` per listener so `Config.string(...)`
// reads reflect the current `process.env`. Effect's default
// `ConfigProvider` snapshots `process.env` on first read and caches the
// result on a module-singleton Reference; without overriding it here,
// every later `Server.listen()` keeps observing that initial snapshot.
Layer.provide(ConfigProvider.layer(ConfigProvider.fromEnv())),
)
const start = async (port: number) => {
const scope = Scope.makeUnsafe()
try {
const layer = buildLayer(port) as Layer.Layer<
HttpServer.HttpServer | WebSocketTracker.Service | HttpApiServer.Service,
unknown,
never
>
const ctx = await Effect.runPromise(Layer.buildWithMemoMap(layer, Layer.makeMemoMapUnsafe(), scope))
return { scope, ctx }
} catch (err) {
await Effect.runPromise(Scope.close(scope, Exit.void)).catch(() => undefined)
throw err
}
}
// Match the legacy adapter port-resolution behavior: explicit `0` prefers
// 4096 first, then any free port.
let resolved: Awaited<ReturnType<typeof start>> | undefined
if (opts.port === 0) {
resolved = await start(4096).catch(() => undefined)
if (!resolved) resolved = await start(0)
} else {
resolved = await start(opts.port)
}
if (!resolved) throw new Error(`Failed to start server on port ${opts.port}`)
const server = Context.get(resolved.ctx, HttpServer.HttpServer)
if (server.address._tag !== "TcpAddress") {
await Effect.runPromise(Scope.close(resolved.scope, Exit.void))
throw new Error(`Unexpected HttpServer address tag: ${server.address._tag}`)
}
const port = server.address.port
const innerUrl = new URL("http://localhost")
innerUrl.hostname = opts.hostname
innerUrl.port = String(port)
url = innerUrl
const mdns =
opts.mdns && port && opts.hostname !== "127.0.0.1" && opts.hostname !== "localhost" && opts.hostname !== "::1"
if (mdns) {
MDNS.publish(port, opts.mdnsDomain)
} else if (opts.mdns) {
log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
}
let forceStopPromise: Promise<void> | undefined
let stopPromise: Promise<void> | undefined
let mdnsUnpublished = false
const unpublish = () => {
if (!mdns || mdnsUnpublished) return
mdnsUnpublished = true
MDNS.unpublish()
}
const forceStop = () => {
forceStopPromise ??= Effect.runPromiseExit(
Effect.gen(function* () {
yield* Context.get(resolved!.ctx, HttpApiServer.Service).closeAll
yield* Context.get(resolved!.ctx, WebSocketTracker.Service).closeAll
}),
).then(() => undefined)
return forceStopPromise
}
const listener = await Effect.runPromise(listenEffect(opts))
return {
hostname: opts.hostname,
port,
url: innerUrl,
stop: (close?: boolean) => {
unpublish()
const requested = close ? forceStop() : Promise.resolve()
stopPromise ??= requested
.then(() => Effect.runPromiseExit(Scope.close(resolved!.scope, Exit.void)))
.then(() => undefined)
return requested.then(() => stopPromise!)
},
hostname: listener.hostname,
port: listener.port,
url: listener.url,
stop: (close?: boolean) => Effect.runPromiseExit(listener.stop(close)).then(() => undefined),
}
}
const listenEffect: (opts: ListenOptions) => Effect.Effect<EffectListener, unknown> = Effect.fn("Server.listen")(
function* (opts: ListenOptions) {
const state = yield* startWithPortFallback(opts)
const address = yield* tcpAddress(state)
const listenerUrl = makeURL(opts.hostname, address.port)
url = listenerUrl
const unpublishMdns = yield* setupMdns(opts, address.port, state.scope)
return {
hostname: opts.hostname,
port: address.port,
url: listenerUrl,
stop: yield* makeStop(state, unpublishMdns),
}
},
)
function listenerLayer(opts: ListenOptions, port: number) {
return HttpRouter.serve(HttpApiApp.createRoutes(opts), {
middleware: disposeMiddleware,
disableLogger: true,
disableListenLog: true,
}).pipe(
Layer.provideMerge(WebSocketTracker.layer),
Layer.provideMerge(serverLayer({ port, hostname: opts.hostname })),
// Install a fresh `ConfigProvider` per listener so `Config.string(...)`
// reads reflect the current `process.env`. Effect's default
// `ConfigProvider` snapshots `process.env` on first read and caches the
// result on a module-singleton Reference; without overriding it here,
// every later `Server.listen()` keeps observing that initial snapshot.
Layer.provide(ConfigProvider.layer(ConfigProvider.fromEnv())),
)
}
function startWithPortFallback(opts: ListenOptions) {
if (opts.port !== 0) return startListener(opts, opts.port)
// Match the legacy adapter port-resolution behavior: explicit `0` prefers
// 4096 first, then any free port.
return startListener(opts, 4096).pipe(Effect.catch(() => startListener(opts, 0)))
}
function startListener(opts: ListenOptions, port: number) {
const scope = Scope.makeUnsafe()
return Layer.buildWithMemoMap(listenerLayer(opts, port), Layer.makeMemoMapUnsafe(), scope).pipe(
Effect.provide(HttpApiApp.context),
Effect.onError(() => Scope.close(scope, Exit.void).pipe(Effect.ignore)),
Effect.map(
(ctx): ListenerState => ({
scope,
server: Context.get(ctx, HttpServer.HttpServer),
http: Context.get(ctx, ListenerServerService),
websockets: Context.get(ctx, WebSocketTracker.Service),
}),
),
)
}
function tcpAddress(state: ListenerState) {
return Effect.gen(function* () {
if (state.server.address._tag === "TcpAddress") return state.server.address
yield* Scope.close(state.scope, Exit.void).pipe(Effect.ignore)
return yield* Effect.die(new Error(`Unexpected HttpServer address tag: ${state.server.address._tag}`))
})
}
function makeURL(hostname: string, port: number) {
const result = new URL("http://localhost")
result.hostname = hostname
result.port = String(port)
return result
}
function setupMdns(opts: ListenOptions, port: number, scope: Scope.Scope) {
return Effect.gen(function* () {
const publish =
opts.mdns && port && opts.hostname !== "127.0.0.1" && opts.hostname !== "localhost" && opts.hostname !== "::1"
if (publish) {
const unpublish = yield* Effect.cached(Effect.sync(() => MDNS.unpublish()))
yield* Effect.sync(() => MDNS.publish(port, opts.mdnsDomain))
yield* Scope.addFinalizer(scope, unpublish)
return unpublish
}
if (opts.mdns) log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
return Effect.void
})
}
function makeStop(state: ListenerState, unpublishMdns: Effect.Effect<void>) {
return Effect.gen(function* () {
const forceRequested = yield* Ref.make(false)
const forceCloseOnce = yield* Effect.cached(forceClose(state).pipe(Effect.ignore))
const closeScopeOnce = yield* Effect.cached(Scope.close(state.scope, Exit.void).pipe(Effect.ignore))
return (close?: boolean) =>
Effect.gen(function* () {
if (close) yield* Ref.set(forceRequested, true)
yield* unpublishMdns
if (close) yield* forceCloseOnce
yield* closeScopeOnce
if (yield* Ref.get(forceRequested)) yield* forceCloseOnce
})
})
}
function forceClose(state: ListenerState) {
return Effect.all([state.http.closeAll, state.websockets.closeAll], { concurrency: "unbounded", discard: true })
}
function serverLayer(opts: { port: number; hostname: string }) {
const server = createServer()
const serverRef = { closeStarted: false, forceStop: false }
const close = server.close.bind(server)
// Keep shutdown owned by NodeHttpServer, but honor listener.stop(true) by
// force-closing active HTTP sockets when its finalizer calls server.close().
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- Node's overloads don't preserve a monkey-patched method assignment.
server.close = ((callback?: Parameters<typeof server.close>[0]) => {
serverRef.closeStarted = true
const result = close(callback)
if (serverRef.forceStop) server.closeAllConnections()
return result
}) as typeof server.close
return Layer.mergeAll(
NodeHttpServer.layer(() => server, { port: opts.port, host: opts.hostname, gracefulShutdownTimeout: "1 second" }),
Layer.succeed(ListenerServerService)(
ListenerServerService.of({
closeAll: Effect.sync(() => {
serverRef.forceStop = true
if (serverRef.closeStarted) server.closeAllConnections()
}),
}),
),
)
}
export * as Server from "./server"

View File

@@ -6,7 +6,7 @@ import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/un
import * as Socket from "effect/unstable/socket/Socket"
import { Server } from "../../src/server/server"
import { InstancePaths } from "../../src/server/routes/instance/httpapi/groups/instance"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { resetDatabase } from "../fixture/db"
import { testEffect } from "../lib/effect"
@@ -27,7 +27,7 @@ const testStateLayer = Layer.effectDiscard(
)
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)
@@ -63,7 +63,7 @@ describe("HttpApi CORS", () => {
it.live("adds CORS headers to unauthorized responses", () =>
Effect.gen(function* () {
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.createRoutes().pipe(
HttpApiApp.createRoutes().pipe(
Layer.provide(ConfigProvider.layer(ConfigProvider.fromUnknown({ OPENCODE_SERVER_PASSWORD: "secret" }))),
),
{ disableLogger: true },
@@ -73,7 +73,7 @@ describe("HttpApi CORS", () => {
new Request(new URL("/global/config", "http://localhost"), {
headers: { origin: "https://app.opencode.ai" },
}),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
)

View File

@@ -49,7 +49,7 @@ function app(modules: Runtime, options: CallOptions) {
if (appCache[cacheKey]) return appCache[cacheKey]
const handler = HttpRouter.toWebHandler(
modules.ExperimentalHttpApiServer.routes.pipe(
modules.HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({ OPENCODE_SERVER_PASSWORD: password, OPENCODE_SERVER_USERNAME: username }),
@@ -62,7 +62,7 @@ function app(modules: Runtime, options: CallOptions) {
request(input: string | URL | Request, init?: RequestInit) {
return handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
modules.ExperimentalHttpApiServer.context,
modules.HttpApiApp.context,
)
},
})

View File

@@ -1,6 +1,6 @@
export type Runtime = {
PublicApi: (typeof import("../../../src/server/routes/instance/httpapi/public"))["PublicApi"]
ExperimentalHttpApiServer: (typeof import("../../../src/server/routes/instance/httpapi/server"))["ExperimentalHttpApiServer"]
HttpApiApp: (typeof import("../../../src/server/routes/instance/httpapi/server"))["HttpApiApp"]
AppLayer: (typeof import("../../../src/effect/app-runtime"))["AppLayer"]
InstanceRef: (typeof import("../../../src/effect/instance-ref"))["InstanceRef"]
Instance: (typeof import("../../../src/project/instance"))["Instance"]
@@ -34,7 +34,7 @@ export function runtime() {
const db = await import("../../fixture/db")
return {
PublicApi: publicApi.PublicApi,
ExperimentalHttpApiServer: httpApiServer.ExperimentalHttpApiServer,
HttpApiApp: httpApiServer.HttpApiApp,
AppLayer: appRuntime.AppLayer,
InstanceRef: instanceRef.InstanceRef,
Instance: instance.Instance,

View File

@@ -1,7 +1,7 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Context } from "effect"
import path from "path"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { FilePaths } from "../../src/server/routes/instance/httpapi/groups/file"
import { Instance } from "../../src/project/instance"
import * as Log from "@opencode-ai/core/util/log"
@@ -17,7 +17,7 @@ function request(route: string, directory: string, query?: Record<string, string
for (const [key, value] of Object.entries(query ?? {})) {
url.searchParams.set(key, value)
}
return ExperimentalHttpApiServer.webHandler().handler(
return HttpApiApp.webHandler().handler(
new Request(url, {
headers: {
"x-opencode-directory": directory,

View File

@@ -8,7 +8,7 @@ import { WorkspaceID } from "../../src/control-plane/schema"
import { ControlPaths } from "../../src/server/routes/instance/httpapi/groups/control"
import { InstancePaths } from "../../src/server/routes/instance/httpapi/groups/instance"
import { SessionPaths } from "../../src/server/routes/instance/httpapi/groups/session"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { HEADER as FenceHeader } from "../../src/server/shared/fence"
import { resetDatabase } from "../fixture/db"
import { tmpdirScoped } from "../fixture/fixture"
@@ -37,7 +37,7 @@ const testStateLayer = Layer.effectDiscard(
// 127.0.0.1:0 and a fetch-based HttpClient that prepends the server URL. This
// keeps the test wired directly through the same route layer production uses.
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)
@@ -122,7 +122,7 @@ describe("instance HttpApi", () => {
const dir = yield* tmpdirScoped({ git: true })
const request = (path: string, init?: RequestInit) =>
Effect.promise(() =>
ExperimentalHttpApiServer.webHandler().handler(
HttpApiApp.webHandler().handler(
new Request(`http://localhost${path}`, {
...init,
headers: { "x-opencode-directory": dir, "content-type": "application/json", ...init?.headers },

View File

@@ -156,6 +156,16 @@ function waitForMessage(ws: WebSocket, predicate: (message: string) => boolean)
})
}
async function openPtySocket(listener: Awaited<ReturnType<typeof startListener>>, dir: string) {
const info = await createCat(listener, dir)
const ticket = await connectTicket(listener, info.id, dir)
const ws = await openSocket(socketURL(listener, info.id, dir, ticket.ticket))
return {
ws,
closed: new Promise<void>((resolve) => ws.addEventListener("close", () => resolve(), { once: true })),
}
}
describe("HttpApi Server.listen", () => {
testPty("serves HTTP routes and upgrades PTY websocket through Server.listen", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
@@ -208,6 +218,72 @@ describe("HttpApi Server.listen", () => {
}
})
testPty("stop(true) is safe when called concurrently and repeatedly", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const socket = await openPtySocket(listener, tmp.path)
await withTimeout(
Promise.all([listener.stop(true), listener.stop(true)]).then(() => undefined),
10_000,
"timed out waiting for concurrent listener.stop(true)",
)
await withTimeout(socket.closed, 5_000, "timed out waiting for websocket close after concurrent stop")
await withTimeout(listener.stop(true), 5_000, "timed out waiting for repeated listener.stop(true)")
stopped = true
} finally {
if (!stopped) await stop(listener, "timed out cleaning up concurrent stop listener").catch(() => undefined)
}
})
testPty("stop(true) can force a graceful stop already in progress", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const socket = await openPtySocket(listener, tmp.path)
const graceful = listener.stop()
const forced = listener.stop(true)
await withTimeout(
Promise.all([graceful, forced]).then(() => undefined),
10_000,
"timed out waiting for forced listener stop",
)
await withTimeout(socket.closed, 5_000, "timed out waiting for websocket close after forced stop")
stopped = true
} finally {
if (!stopped) await stop(listener, "timed out cleaning up forced stop listener").catch(() => undefined)
}
})
testPty("graceful stop waits for an overlapping forced stop", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const socket = await openPtySocket(listener, tmp.path)
const forced = listener.stop(true)
await withTimeout(listener.stop(), 10_000, "timed out waiting for graceful stop after forced stop")
stopped = true
await withTimeout(forced, 5_000, "timed out waiting for overlapping forced stop")
await withTimeout(socket.closed, 5_000, "timed out waiting for websocket close before graceful stop resolved")
} finally {
if (!stopped) await stop(listener, "timed out cleaning up overlapping stop listener").catch(() => undefined)
}
})
test("stop() gracefully closes an idle listener and is repeat-safe", async () => {
const listener = await startListener()
await withTimeout(listener.stop(), 10_000, "timed out waiting for graceful listener.stop()")
await withTimeout(listener.stop(), 5_000, "timed out waiting for repeated graceful listener.stop()")
await expect(
fetch(new URL(PtyPaths.shells, listener.url), { headers: { authorization: authorization() } }),
).rejects.toThrow()
})
testPty("rejects unsafe PTY ticket mint and connect requests", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()

View File

@@ -1,6 +1,6 @@
import { describe, expect } from "bun:test"
import { Context, Effect, Layer } from "effect"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { McpPaths } from "../../src/server/routes/instance/httpapi/groups/mcp"
import { Server } from "../../src/server/server"
import * as Log from "@opencode-ai/core/util/log"
@@ -23,10 +23,10 @@ function app() {
return Server.Default().app
}
type TestApp = ReturnType<typeof app>
type TestHandler = ReturnType<typeof ExperimentalHttpApiServer.webHandler>
type TestHandler = ReturnType<typeof HttpApiApp.webHandler>
const handlerScoped = Effect.acquireRelease(
Effect.sync(() => ExperimentalHttpApiServer.webHandler()),
Effect.sync(() => HttpApiApp.webHandler()),
(handler) => Effect.promise(() => handler.dispose()).pipe(Effect.ignore),
)

View File

@@ -10,7 +10,7 @@ import { disposeAllInstances, tmpdir, tmpdirScoped } from "../fixture/fixture"
import { Config, Effect, Layer, Queue, Schema } from "effect"
import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http"
import * as Socket from "effect/unstable/socket/Socket"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { Pty } from "../../src/pty"
import { testEffect } from "../lib/effect"
@@ -30,7 +30,7 @@ const testStateLayer = Layer.effectDiscard(
)
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)

View File

@@ -4,7 +4,7 @@ import { HttpRouter } from "effect/unstable/http"
import { Instance } from "../../src/project/instance"
import { EventPaths } from "../../src/server/routes/instance/httpapi/event"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { PtyID } from "../../src/pty/schema"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
@@ -14,7 +14,7 @@ void Log.init({ print: false })
function app(input: { password?: string; username?: string }) {
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.routes.pipe(
HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({
@@ -28,7 +28,7 @@ function app(input: { password?: string; username?: string }) {
).handler
return {
fetch: (request: Request) => handler(request, ExperimentalHttpApiServer.context),
fetch: (request: Request) => handler(request, HttpApiApp.context),
request(input: string | URL | Request, init?: RequestInit) {
return this.fetch(input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init))
},

View File

@@ -10,7 +10,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2"
import { validateSession } from "../../src/cli/cmd/tui/validate-session"
import { InstanceBootstrap } from "../../src/project/bootstrap-service"
import { InstanceStore } from "../../src/project/instance-store"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { Server } from "../../src/server/server"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { MessageV2 } from "../../src/session/message-v2"
@@ -53,7 +53,7 @@ function app(serverPath: ServerPath, input?: { password?: string; username?: str
if (serverPath === "default") return Server.Default().app
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.routes.pipe(
HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({
@@ -66,7 +66,7 @@ function app(serverPath: ServerPath, input?: { password?: string; username?: str
{ disableLogger: true },
).handler
return {
fetch: (request: Request) => handler(request, ExperimentalHttpApiServer.context),
fetch: (request: Request) => handler(request, HttpApiApp.context),
request(input: string | URL | Request, init?: RequestInit) {
return this.fetch(input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init))
},

View File

@@ -3,7 +3,7 @@ import { Context, Effect } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Server } from "../../src/server/server"
import { SyncPaths } from "../../src/server/routes/instance/httpapi/groups/sync"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { Session } from "@/session/session"
import * as Log from "@opencode-ai/core/util/log"
import { resetDatabase } from "../fixture/db"
@@ -144,7 +144,7 @@ describe("sync HttpApi", () => {
Effect.gen(function* () {
const tmp = yield* TestInstance
const response = yield* Effect.promise(() =>
ExperimentalHttpApiServer.webHandler().handler(
HttpApiApp.webHandler().handler(
new Request(`http://localhost${SyncPaths.history}`, {
method: "POST",
headers: { "x-opencode-directory": tmp.directory, "content-type": "application/json" },

View File

@@ -15,7 +15,7 @@ import {
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { ServerAuth } from "../../src/server/auth"
import { authorizationRouterMiddleware } from "../../src/server/routes/instance/httpapi/middleware/authorization"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { serveEmbeddedUIEffect, serveUIEffect } from "../../src/server/shared/ui"
import { Server } from "../../src/server/server"
import { testEffect } from "../lib/effect"
@@ -56,7 +56,7 @@ function restoreEnv(key: string, value: string | undefined) {
function app(input?: { password?: string; username?: string }) {
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.routes.pipe(
HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({
@@ -74,7 +74,7 @@ function app(input?: { password?: string; username?: string }) {
Promise.resolve(
handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
),
)
@@ -112,7 +112,7 @@ function uiApp(input?: { password?: string; username?: string; client?: Layer.La
Promise.resolve(
handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
),
)

View File

@@ -4,7 +4,7 @@ import { HttpRouter } from "effect/unstable/http"
import { Flag } from "@opencode-ai/core/flag/flag"
import { GlobalBus, type GlobalEvent } from "@/bus/global"
import { Worktree } from "@/worktree"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { ExperimentalPaths } from "../../src/server/routes/instance/httpapi/groups/experimental"
import { WorkspacePaths } from "../../src/server/routes/instance/httpapi/groups/workspace"
import { resetDatabase } from "../fixture/db"
@@ -36,15 +36,13 @@ type ScopedWorktree = { directory: string; body: CreatedWorktree; ready: Effect.
function serverScoped() {
return Effect.acquireRelease(
Effect.sync(() => HttpRouter.toWebHandler(ExperimentalHttpApiServer.routes, { disableLogger: true })),
Effect.sync(() => HttpRouter.toWebHandler(HttpApiApp.routes, { disableLogger: true })),
(server) => Effect.promise(() => server.dispose()).pipe(Effect.ignore),
)
}
function request(server: TestServer, input: string, init?: RequestInit) {
return Effect.promise(() =>
server.handler(new Request(new URL(input, "http://localhost"), init), ExperimentalHttpApiServer.context),
)
return Effect.promise(() => server.handler(new Request(new URL(input, "http://localhost"), init), HttpApiApp.context))
}
function withRequestTimeout(effect: Effect.Effect<Response>, label: string, ms = 5_000) {