P1-03: Create base Process class

- Add src/shared/base-process.ts: stdin JSONL read, stdout JSONL write
- Integrate Zod validation for incoming/outgoing envelopes
- Handle malformed JSON via parseError event; override handleEnvelope/handleParseError
- Add scripts/test-base-process.ts extending BaseProcess for pipe test

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
larchanka
2026-02-16 23:29:46 +01:00
parent c3a2ff9ecb
commit 06cc448138
3 changed files with 133 additions and 10 deletions

View File

@@ -2,16 +2,6 @@
## To Do
### P1-03 Create base Process class
- tags: [pending]
- defaultExpanded: false
```md
Implement a base class or helper to standardize the way individual processes handle stdin/stdout communication, message parsing, and error handling.
Source: P1-03_BASE_PROCESS_CLASS.md
```
### P1-04 Implement Logger Service
- tags: [pending]
@@ -215,6 +205,16 @@
## In Progress
### P1-03 Create base Process class
- tags: [in-progress]
- defaultExpanded: false
```md
Implement a base class or helper to standardize the way individual processes handle stdin/stdout communication, message parsing, and error handling.
Source: P1-03_BASE_PROCESS_CLASS.md
```
## Done
### P1-01 Initialize project structure

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env npx tsx
/**
* Test script for BaseProcess: receives a JSONL message on stdin and responds on stdout.
* Run: echo '{"id":"f47ac10b-58cc-4372-a567-0e02b2c3d479","from":"core","to":"test","type":"ping","version":"1.0","timestamp":1700000000000,"payload":{}}' | npx tsx scripts/test-base-process.ts
*/
import { BaseProcess } from "../src/shared/base-process.js";
import type { Envelope } from "../src/shared/protocol.js";
class EchoProcess extends BaseProcess {
constructor() {
super({ processName: "test" });
}
protected override handleEnvelope(envelope: Envelope): void {
const response: Envelope = {
...envelope,
id: crypto.randomUUID(),
from: this.processName,
to: envelope.from,
type: "response",
timestamp: Date.now(),
payload: { status: "success", result: { echoed: envelope.payload } },
};
this.send(response);
}
}
const proc = new EchoProcess();
proc.start();

View File

@@ -0,0 +1,93 @@
/**
* Base class for processes that communicate via stdin/stdout JSONL.
* Matches _docs/MESSAGE PROTOCOL SPEC.md transport (line-delimited JSON).
*/
import { EventEmitter } from "node:events";
import { createInterface } from "node:readline";
import type { Envelope } from "./protocol.js";
import { envelopeSchema } from "./protocol.js";
export interface BaseProcessOptions {
/** Process name used as default `from` in outgoing messages. */
processName: string;
}
export interface BaseProcessEvents {
message: (envelope: Envelope) => void;
parseError: (payload: { line: string; error: unknown }) => void;
}
/**
* Base process: reads JSONL from stdin, validates with Zod, emits messages;
* writes validated JSONL to stdout.
*/
export class BaseProcess extends EventEmitter {
readonly processName: string;
private readonly rl;
private running = false;
constructor(options: BaseProcessOptions) {
super();
this.processName = options.processName;
this.rl = createInterface({ input: process.stdin, terminal: false });
}
/**
* Start reading stdin. Call once after setting up message handler.
*/
start(): void {
if (this.running) return;
this.running = true;
this.rl.on("line", (line: string) => this.handleLine(line));
this.rl.on("close", () => this.handleClose());
}
/**
* Override to handle each valid envelope. Default emits "message" (for use with onMessage).
*/
protected handleEnvelope(envelope: Envelope): void {
this.emit("message", envelope);
}
/**
* Override to handle parse/validation errors. Default emits "parseError".
*/
protected handleParseError(line: string, error: unknown): void {
this.emit("parseError", { line, error });
}
private handleLine(line: string): void {
const trimmed = line.trim();
if (!trimmed) return;
try {
const raw = JSON.parse(trimmed) as unknown;
const envelope = envelopeSchema.parse(raw) as Envelope;
this.handleEnvelope(envelope);
} catch (error) {
this.handleParseError(line, error);
}
}
private handleClose(): void {
this.running = false;
}
/**
* Send an envelope to stdout. Validates with Zod before writing.
* @throws if envelope fails validation
*/
send(envelope: Envelope): void {
const parsed = envelopeSchema.parse(envelope) as Envelope;
const line = JSON.stringify(parsed) + "\n";
process.stdout.write(line);
}
/**
* Register a message handler. For subclass override, override handleEnvelope instead.
*/
onMessage(handler: (envelope: Envelope) => void): this {
return this.on("message", handler);
}
}