Claude-code-plugins-plus anth-core-workflow-b
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/anthropic-pack/skills/anth-core-workflow-b" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-anth-core-workflow-b && rm -rf "$T"
manifest:
plugins/saas-packs/anthropic-pack/skills/anth-core-workflow-b/SKILL.mdsource content
Anthropic Core Workflow B — Streaming & Batches
Overview
Two complementary patterns: real-time streaming for interactive UIs (SSE events via
POST /v1/messages with stream: true) and the Message Batches API (POST /v1/messages/batches) for processing up to 100,000 requests asynchronously at 50% cost reduction.
Prerequisites
- Completed
setupanth-install-auth - Familiarity with
(Messages API basics)anth-core-workflow-a - For batches: understanding of async/polling patterns
Instructions
Streaming — Python SDK
import anthropic client = anthropic.Anthropic() # Method 1: High-level streaming (recommended) with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=2048, messages=[{"role": "user", "content": "Write a short story about a robot."}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) # After stream completes, access full message final_message = stream.get_final_message() print(f"\nUsage: {final_message.usage.input_tokens}+{final_message.usage.output_tokens}") # Method 2: Event-level streaming (for custom event handling) with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=2048, messages=[{"role": "user", "content": "Explain REST APIs."}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="") elif event.type == "message_stop": print("\n[Stream complete]")
Streaming — TypeScript SDK
import Anthropic from '@anthropic-ai/sdk'; const client = new Anthropic(); // High-level streaming const stream = client.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 2048, messages: [{ role: 'user', content: 'Write a haiku about code.' }], }); stream.on('text', (text) => process.stdout.write(text)); stream.on('finalMessage', (msg) => { console.log(`\nTokens: ${msg.usage.input_tokens}+${msg.usage.output_tokens}`); }); await stream.finalMessage();
Streaming with Tool Use
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, tools=tools, # Same tools array from core-workflow-a messages=[{"role": "user", "content": "What's the weather?"}] ) as stream: for event in stream: if event.type == "content_block_start": if event.content_block.type == "tool_use": print(f"Tool call: {event.content_block.name}") elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": print(event.delta.partial_json, end="") # Tool input arrives incrementally
Message Batches API — Bulk Processing
# Create a batch of up to 100,000 requests (50% cost savings) batch = client.messages.batches.create( requests=[ { "custom_id": "req-001", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: ...article1..."}] } }, { "custom_id": "req-002", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: ...article2..."}] } }, # ... up to 100,000 requests ] ) print(f"Batch ID: {batch.id}") # msgbatch_01HBMt... print(f"Status: {batch.processing_status}") # in_progress print(f"Counts: {batch.request_counts}") # {processing: 2, succeeded: 0, ...}
Poll for Batch Completion
import time while True: batch_status = client.messages.batches.retrieve(batch.id) if batch_status.processing_status == "ended": break print(f"Processing... {batch_status.request_counts}") time.sleep(30) # Stream results (returns JSONL) for result in client.messages.batches.results(batch.id): if result.result.type == "succeeded": text = result.result.message.content[0].text print(f"[{result.custom_id}]: {text[:100]}...") elif result.result.type == "errored": print(f"[{result.custom_id}] ERROR: {result.result.error}")
SSE Event Types Reference
| Event | Description | Key Fields |
|---|---|---|
| Stream begins | , , |
| New content block | (text/tool_use) |
| Incremental content | or |
| Block complete | |
| Message-level update | , |
| Stream complete | (empty) |
| Keepalive | (empty) |
Error Handling
| Error | Cause | Solution |
|---|---|---|
| Stream disconnects mid-response | Network timeout | Implement reconnection with partial content |
Batch status | Not processed within 24h | Resubmit batch |
results in batch | Individual request invalid | Check for each failed request |
| 429 on batch creation | Too many concurrent batches | Wait; limit is ~100 concurrent batches |
Resources
Next Steps
For common errors, see
anth-common-errors.