Untether jsonl-subprocess-runner
install
source · Clone the upstream repo
git clone https://github.com/littlebearapps/untether
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/littlebearapps/untether "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/jsonl-subprocess-runner" ~/.claude/skills/littlebearapps-untether-jsonl-subprocess-runner && rm -rf "$T"
manifest:
.claude/skills/jsonl-subprocess-runner/SKILL.mdsource content
JSONL Subprocess Runner Framework
All Untether engine runners (Claude, Codex, OpenCode, Pi) extend
JsonlSubprocessRunner, which manages subprocess lifecycle, JSONL parsing, session locking, and error handling.
Key files
| File | Purpose |
|---|---|
| base class, protocol, , |
| , , , , , |
| — consistent event creation per engine |
| dataclass, |
| — aggregates actions for UI rendering |
| , — connects runners to transport |
| Engine-specific runner implementations |
Class hierarchy
Runner (Protocol) BaseRunner (SessionLockMixin) JsonlSubprocessRunner CodexRunner OpenCodeRunner PiRunner ClaudeRunner (overrides run_impl for PTY support)
Template methods to override
When creating a new runner, override these methods on
JsonlSubprocessRunner:
class MyRunner(JsonlSubprocessRunner): engine: EngineId = "myengine" def command(self) -> str: """CLI binary name (e.g. 'codex', 'opencode', 'pi').""" def build_args(self, prompt, resume, *, state) -> list[str]: """Build CLI arguments. Called once per run.""" def translate(self, data, *, state, resume, found_session) -> list[UntetherEvent]: """Translate a decoded JSON dict to Untether events. Core logic.""" def new_state(self, prompt, resume) -> Any: """Create per-run state (e.g. EventFactory, accumulators).""" # Optional overrides: def stdin_payload(self, prompt, resume, *, state) -> bytes | None: """Data to send on stdin. Default: prompt.encode().""" def env(self, *, state) -> dict[str, str] | None: """Extra environment variables for the subprocess.""" def decode_jsonl(self, *, line: bytes) -> Any | None: """Custom JSON decoder. Default: json.loads."""
Event model (3 events)
Every run emits exactly this sequence:
StartedEvent — emitted once when session ID is known ActionEvent(s) — zero or more, with phase: started → completed CompletedEvent — emitted exactly once, always last
StartedEvent
StartedEvent(engine="codex", resume=ResumeToken(engine="codex", value="thread_123"))
ActionEvent
ActionEvent( engine="codex", action=Action(id="item_5", kind="command", title="ls -la", detail={...}), phase="started", # or "completed" ok=True, # only on completed phase )
CompletedEvent
CompletedEvent(engine="codex", ok=True, answer="Done.", resume=token, usage={...})
ActionKind values
| Kind | When |
|---|---|
| Shell execution (Bash, shell) |
| Generic tool call (Read, Grep, Glob, MCP tools) |
| File edits (Edit, Write, MultiEdit, NotebookEdit) |
| Web search/fetch (WebSearch, WebFetch) |
| Commentary, reasoning, todos (TodoWrite, AskUserQuestion) |
| Non-fatal errors, permission denials |
| Turn markers (metadata-only, ignored by ProgressTracker) |
| Usage/cost data (metadata-only) |
| Subagent/Task invocations |
Session locking
class SessionLockMixin: session_locks: WeakValueDictionary[str, anyio.Semaphore] def lock_for(self, token: ResumeToken) -> anyio.Semaphore: # Key: "engine:session_id" # WeakValueDictionary auto-cleans when semaphore has no references
Locking rules:
- Resume runs: acquire lock immediately before spawning subprocess
- New runs: don't know session_id until
; acquire lock when firstStartedEvent
/system.init
/thread.started
arrives, before yielding the eventstep_start - Lock held until run completes (released in
)finally - Serialises concurrent runs on the same session
Process lifecycle (run_impl
)
run_impl1. new_state(prompt, resume) → create per-run state 2. build_args(prompt, resume, state) → construct CLI command 3. stdin_payload(prompt, resume, state) → optional stdin data 4. manage_subprocess(cmd, ...) → spawn with PIPE for stdin/stdout/stderr 5. _send_payload(proc, payload) → send stdin, close stdin 6. drain_stderr(proc.stderr) → log stderr concurrently (task group) 7. _iter_jsonl_events(proc.stdout) → parse JSONL, call translate() 8. proc.wait() → wait for exit code 9. stream_end_events() or → emit CompletedEvent if not already emitted process_error_events()
JSONL stream state
@dataclass class JsonlStreamState: expected_session: ResumeToken | None # from resume arg found_session: ResumeToken | None # from stream did_emit_completed: bool # guard: exactly one CompletedEvent ignored_after_completed: bool # drop lines after CompletedEvent jsonl_seq: int # line counter for logging
Key invariants:
- Exactly one CompletedEvent per run — after emitting, all subsequent lines are dropped
- Session verification — if expected_session is set and stream yields a different session_id, raise RuntimeError
- Duplicate StartedEvent suppression — only the first StartedEvent is yielded
Error handling
| Scenario | Behaviour |
|---|---|
| Invalid JSON line | → warning ActionEvent, continue |
| Decode error (msgspec) | → warning ActionEvent, continue |
| Translation error | → warning ActionEvent, continue |
| Non-zero exit code | → CompletedEvent(ok=False) |
| Stream ends without result | → CompletedEvent(ok=False) |
Resume token mixin
class ResumeTokenMixin: engine: EngineId resume_re: re.Pattern[str] # engine-specific regex def extract_resume(text) -> ResumeToken | None # parse last match def format_resume(token) -> str # canonical resume line def is_resume_line(line) -> bool # for stripping from prompts
Resume line formats per engine:
- Claude:
`claude --resume <session_id>` - Codex:
`codex resume <thread_id>` - OpenCode:
`opencode --session <ses_XXX>` - Pi:
`pi --session <token>`
Engine backend registration
Each runner module exports a
BACKEND:
# src/untether/runners/myengine.py BACKEND = EngineBackend( id="myengine", build_runner=_build_runner, cli_cmd="myengine", install_cmd="pip install myengine", )
Registered in
pyproject.toml:
[project.entry-points."untether.engine_backends"] codex = "untether.runners.codex:BACKEND" claude = "untether.runners.claude:BACKEND" opencode = "untether.runners.opencode:BACKEND" pi = "untether.runners.pi:BACKEND"
Discovery:
importlib.metadata.entry_points(group="untether.engine_backends")
EventFactory
Helper for consistent event creation:
factory = EventFactory(engine="codex") factory.started(token, title="Codex") factory.action_started(action_id="item_1", kind="command", title="ls") factory.action_completed(action_id="item_1", kind="command", title="ls", ok=True) factory.completed_ok(answer="Done.", resume=token, usage={...}) factory.completed_error(error="timeout", resume=token)
ClaudeRunner PTY exception
ClaudeRunner overrides run_impl entirely because it uses a PTY for the bidirectional control channel instead of standard PIPE stdin. It manages:
for stdin (prevents deadlock with persistent stdin)pty.openpty()- Session registries (
,_SESSION_STDIN
) for concurrent sessions_REQUEST_TO_SESSION - Control request/response draining after every JSONL line
See
.claude/skills/claude-stream-json/SKILL.md for Claude-specific details.