Cc-skills-golang golang-samber-ro
Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 subject types (Publish, Behavior, Replay, Async, Unicast), declarative pipelines via Pipe, 40+ plugins (HTTP, cron, fsnotify, JSON, logging), automatic backpressure, error propagation, and Go context integration. Apply when using or adopting samber/ro, when the codebase imports github.com/samber/ro, or when building asynchronous event-driven pipelines, real-time data processing, streams, or reactive architectures in Go. Not for finite slice transforms (-> See golang-samber-lo skill).
git clone https://github.com/samber/cc-skills-golang
T=$(mktemp -d) && git clone --depth=1 https://github.com/samber/cc-skills-golang "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/golang-samber-ro" ~/.claude/skills/samber-cc-skills-golang-golang-samber-ro && rm -rf "$T"
skills/golang-samber-ro/SKILL.mdPersona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use
ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
samber/ro — Reactive Streams for Go
Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.
Why samber/ro (Streams vs Slices)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators.
samber/ro solves this with declarative, chainable stream operators.
When to use which tool:
| Scenario | Tool | Why |
|---|---|---|
| Transform a slice (map, filter, reduce) | | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source | | Hot observables (Share/Subjects) handle multicast natively |
Key differences: lo vs ro
| Aspect | | |
|---|---|---|
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return per call | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
Installation
go get github.com/samber/ro
Core Concepts
Four building blocks:
- Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
- Observer — a consumer with three callbacks:
,onNext(T)
,onError(error)onComplete() - Operator — a function that transforms an observable into another observable, chained via
Pipe - Subscription — the connection between observable and observer. Call
to block or.Wait()
to cancel.Unsubscribe()
observable := ro.Pipe2( ro.RangeWithInterval(0, 5, 1*time.Second), ro.Filter(func(x int) bool { return x%2 == 0 }), ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }), ) observable.Subscribe(ro.NewObserver( func(s string) { fmt.Println(s) }, // onNext func(err error) { log.Println(err) }, // onError func() { fmt.Println("Done!") }, // onComplete )) // Output: "even-0", "even-2", "even-4", "Done!" // Or collect synchronously: values, err := ro.Collect(observable)
Cold vs Hot Observables
Cold (default): each
.Subscribe() starts a new independent execution. Safe and predictable — use by default.
Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
|---|---|
| Cold → hot with reference counting. Last unsubscribe tears down |
| Same as Share + buffers last N values for late subscribers |
| Cold → hot, but waits for explicit call |
| Subjects | Natively hot — call , , directly |
| Subject | Constructor | Replay behavior |
|---|---|---|
| | None — late subscribers miss past events |
| | Replays last value to new subscribers |
| | Replays last N values |
| | Emits only last value, only on complete |
| | Single subscriber only |
For subject details and hot observable patterns, see Subjects Guide.
Operator Quick Reference
| Category | Key operators | Purpose |
|---|---|---|
| Creation | , , , , , , | Create observables from various sources |
| Transform | , , , , , | Transform or accumulate stream values |
| Filter | , , , , , , , | Selectively emit values |
| Combine | , , –, –, | Merge multiple observables |
| Error | , , , , | Recover from errors |
| Timing | , , , , , | Control emission timing |
| Side effect | /, , , | Observe without altering stream |
| Terminal | , , , | Consume stream into Go types |
Use typed
Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
Common Mistakes
| Mistake | Why it fails | Fix |
|---|---|---|
Using without error handler | Errors are silently dropped — bugs hide in production | Use with all 3 callbacks |
Using untyped instead of / | Loses compile-time type safety, errors surface at runtime | Use , ... for typed operator chains |
Forgetting on infinite streams | Goroutine leak — the observable runs forever | Use , context cancellation, or explicit |
Using when cold is sufficient | Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
Using for finite slice transforms | Stream overhead (goroutines, subscriptions) for a synchronous operation | Use — it's simpler, faster, and purpose-built for slices |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain or in the pipeline |
Best Practices
- Always handle all three events — use
, not justNewObserver(onNext, onError, onComplete)
. Unhandled errors cause silent data lossOnNext - Use
for synchronous consumption — when the stream is finite and you needCollect()
,[]T
blocks until complete and returns the slice + errorCollect - Prefer typed Pipe functions —
,Pipe2
...Pipe3
catch type mismatches at compile time. Reserve untypedPipe25
for dynamic operator chainsPipe - Bound infinite streams — use
,Take(n)
,TakeUntil(signal)
, or context cancellation. Unbounded streams leak goroutinesTimeout(d) - Use
/Tap
for observability — log, trace, or meter emissions without altering the stream. ChainDo
for error monitoringTapOnError - Prefer
for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, usesamber/lo
. Reach forlo
when data arrives over time, from multiple sources, or needs retry/timeout/backpressurero
Plugin Ecosystem
40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
|---|---|---|
| Encoding | JSON, CSV, Base64, Gob | |
| Network | HTTP, I/O, FSNotify | , , |
| Scheduling | Cron, ICS | , |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops | , |
| Rate limiting | Native, Ulule | |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template | , , etc. |
| System | Process, Signal | , |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
Cross-References
- → See
skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slicesamber/cc-skills-golang@golang-samber-lo - → See
skill for monadic types (Option, Result, Either) that compose with ro pipelinessamber/cc-skills-golang@golang-samber-mo - → See
skill for in-memory caching (also available as an ro plugin)samber/cc-skills-golang@golang-samber-hot - → See
skill for goroutine/channel patterns when reactive streams are overkillsamber/cc-skills-golang@golang-concurrency - → See
skill for monitoring reactive pipelines in productionsamber/cc-skills-golang@golang-observability