Cc-skills-golang golang-samber-ro

Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 subject types (Publish, Behavior, Replay, Async, Unicast), declarative pipelines via Pipe, 40+ plugins (HTTP, cron, fsnotify, JSON, logging), automatic backpressure, error propagation, and Go context integration. Apply when using or adopting samber/ro, when the codebase imports github.com/samber/ro, or when building asynchronous event-driven pipelines, real-time data processing, streams, or reactive architectures in Go. Not for finite slice transforms (-> See golang-samber-lo skill).

install
source · Clone the upstream repo
git clone https://github.com/samber/cc-skills-golang
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/samber/cc-skills-golang "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/golang-samber-ro" ~/.claude/skills/samber-cc-skills-golang-golang-samber-ro && rm -rf "$T"
manifest: skills/golang-samber-ro/SKILL.md
source content

Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.

Thinking mode: Use

ultrathink
when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.

samber/ro — Reactive Streams for Go

Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.

Official Resources:

This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.

Why samber/ro (Streams vs Slices)

Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators.

samber/ro
solves this with declarative, chainable stream operators.

When to use which tool:

ScenarioToolWhy
Transform a slice (map, filter, reduce)
samber/lo
Finite, synchronous, eager — no stream overhead needed
Simple goroutine fan-out with error handling
errgroup
Standard lib, lightweight, sufficient for bounded concurrency
Infinite event stream (WebSocket, tickers, file watcher)
samber/ro
Declarative pipeline with backpressure, retry, timeout, combine
Real-time data enrichment from multiple async sources
samber/ro
CombineLatest/Zip compose dependent streams without manual select
Pub/sub with multiple consumers sharing one source
samber/ro
Hot observables (Share/Subjects) handle multicast natively

Key differences: lo vs ro

Aspect
samber/lo
samber/ro
DataFinite slicesInfinite streams
ExecutionSynchronous, blockingAsynchronous, non-blocking
EvaluationEager (allocates intermediate slices)Lazy (processes items as they arrive)
TimingImmediateTime-aware (delay, throttle, interval, timeout)
Error modelReturn
(T, error)
per call
Error channel propagates through pipeline
Use caseCollection transformsEvent-driven, real-time, async pipelines

Installation

go get github.com/samber/ro

Core Concepts

Four building blocks:

  1. Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
  2. Observer — a consumer with three callbacks:
    onNext(T)
    ,
    onError(error)
    ,
    onComplete()
  3. Operator — a function that transforms an observable into another observable, chained via
    Pipe
  4. Subscription — the connection between observable and observer. Call
    .Wait()
    to block or
    .Unsubscribe()
    to cancel
observable := ro.Pipe2(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool { return x%2 == 0 }),
    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)

observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },      // onNext
    func(err error) { log.Println(err) },    // onError
    func() { fmt.Println("Done!") },         // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"

// Or collect synchronously:
values, err := ro.Collect(observable)

Cold vs Hot Observables

Cold (default): each

.Subscribe()
starts a new independent execution. Safe and predictable — use by default.

Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.

Convert withBehavior
Share()
Cold → hot with reference counting. Last unsubscribe tears down
ShareReplay(n)
Same as Share + buffers last N values for late subscribers
Connectable()
Cold → hot, but waits for explicit
.Connect()
call
SubjectsNatively hot — call
.Send()
,
.Error()
,
.Complete()
directly
SubjectConstructorReplay behavior
PublishSubject
NewPublishSubject[T]()
None — late subscribers miss past events
BehaviorSubject
NewBehaviorSubject[T](initial)
Replays last value to new subscribers
ReplaySubject
NewReplaySubject[T](bufferSize)
Replays last N values
AsyncSubject
NewAsyncSubject[T]()
Emits only last value, only on complete
UnicastSubject
NewUnicastSubject[T](bufferSize)
Single subscriber only

For subject details and hot observable patterns, see Subjects Guide.

Operator Quick Reference

CategoryKey operatorsPurpose
Creation
Just
,
FromSlice
,
FromChannel
,
Range
,
Interval
,
Defer
,
Future
Create observables from various sources
Transform
Map
,
MapErr
,
FlatMap
,
Scan
,
Reduce
,
GroupBy
Transform or accumulate stream values
Filter
Filter
,
Take
,
TakeLast
,
Skip
,
Distinct
,
Find
,
First
,
Last
Selectively emit values
Combine
Merge
,
Concat
,
Zip2
Zip6
,
CombineLatest2
CombineLatest5
,
Race
Merge multiple observables
Error
Catch
,
OnErrorReturn
,
OnErrorResumeNextWith
,
Retry
,
RetryWithConfig
Recover from errors
Timing
Delay
,
DelayEach
,
Timeout
,
ThrottleTime
,
SampleTime
,
BufferWithTime
Control emission timing
Side effect
Tap
/
Do
,
TapOnNext
,
TapOnError
,
TapOnComplete
Observe without altering stream
Terminal
Collect
,
ToSlice
,
ToChannel
,
ToMap
Consume stream into Go types

Use typed

Pipe2
,
Pipe3
...
Pipe25
for compile-time type safety across operator chains. The untyped
Pipe
uses
any
and loses type checking.

For the complete operator catalog (150+ operators with signatures), see Operators Guide.

Common Mistakes

MistakeWhy it failsFix
Using
ro.OnNext()
without error handler
Errors are silently dropped — bugs hide in productionUse
ro.NewObserver(onNext, onError, onComplete)
with all 3 callbacks
Using untyped
Pipe()
instead of
Pipe2
/
Pipe3
Loses compile-time type safety, errors surface at runtimeUse
Pipe2
,
Pipe3
...
Pipe25
for typed operator chains
Forgetting
.Unsubscribe()
on infinite streams
Goroutine leak — the observable runs foreverUse
TakeUntil(signal)
, context cancellation, or explicit
Unsubscribe()
Using
Share()
when cold is sufficient
Unnecessary complexity, harder to reason about lifecycleUse hot observables only when multiple consumers need the same stream
Using
samber/ro
for finite slice transforms
Stream overhead (goroutines, subscriptions) for a synchronous operationUse
samber/lo
— it's simpler, faster, and purpose-built for slices
Not propagating context for cancellationStreams ignore shutdown signals, causing resource leaks on terminationChain
ContextWithTimeout
or
ThrowOnContextCancel
in the pipeline

Best Practices

  1. Always handle all three events — use
    NewObserver(onNext, onError, onComplete)
    , not just
    OnNext
    . Unhandled errors cause silent data loss
  2. Use
    Collect()
    for synchronous consumption
    — when the stream is finite and you need
    []T
    ,
    Collect
    blocks until complete and returns the slice + error
  3. Prefer typed Pipe functions
    Pipe2
    ,
    Pipe3
    ...
    Pipe25
    catch type mismatches at compile time. Reserve untyped
    Pipe
    for dynamic operator chains
  4. Bound infinite streams — use
    Take(n)
    ,
    TakeUntil(signal)
    ,
    Timeout(d)
    , or context cancellation. Unbounded streams leak goroutines
  5. Use
    Tap
    /
    Do
    for observability
    — log, trace, or meter emissions without altering the stream. Chain
    TapOnError
    for error monitoring
  6. Prefer
    samber/lo
    for simple transforms
    — if the data is a finite slice and you need Map/Filter/Reduce, use
    lo
    . Reach for
    ro
    when data arrives over time, from multiple sources, or needs retry/timeout/backpressure

Plugin Ecosystem

40+ plugins extend ro with domain-specific operators:

CategoryPluginsImport path prefix
EncodingJSON, CSV, Base64, Gob
plugins/encoding/...
NetworkHTTP, I/O, FSNotify
plugins/http
,
plugins/io
,
plugins/fsnotify
SchedulingCron, ICS
plugins/cron
,
plugins/ics
ObservabilityZap, Slog, Zerolog, Logrus, Sentry, Oops
plugins/observability/...
,
plugins/samber/oops
Rate limitingNative, Ulule
plugins/ratelimit/...
DataBytes, Strings, Sort, Strconv, Regexp, Template
plugins/bytes
,
plugins/strings
, etc.
SystemProcess, Signal
plugins/proc
,
plugins/signal

For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.

For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.

If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.

Cross-References

  • → See
    samber/cc-skills-golang@golang-samber-lo
    skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
  • → See
    samber/cc-skills-golang@golang-samber-mo
    skill for monadic types (Option, Result, Either) that compose with ro pipelines
  • → See
    samber/cc-skills-golang@golang-samber-hot
    skill for in-memory caching (also available as an ro plugin)
  • → See
    samber/cc-skills-golang@golang-concurrency
    skill for goroutine/channel patterns when reactive streams are overkill
  • → See
    samber/cc-skills-golang@golang-observability
    skill for monitoring reactive pipelines in production