Claude-skill-registry activity-logging

Follow these patterns when implementing activity emission and audit logging in OptAIC. Use for emitting ActivityEnvelopes on mutations (create, update, delete, execute), designing payloads, and ensuring audit compliance.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/activity-logging" ~/.claude/skills/majiayu000-claude-skill-registry-activity-logging && rm -rf "$T"
manifest: skills/data/activity-logging/SKILL.md
source content

Activity Logging Patterns

Guide for implementing audit-compliant activity emission in OptAIC services.

When to Use

Apply when:

  • Implementing service layer methods that mutate state
  • Adding new domain resource operations (CRUD)
  • Tracking execution events (runs, training, backtests)
  • Implementing approval/promotion workflows
  • Ensuring audit trail compliance

Core Rule

If it changes state, it MUST emit an activity.

All mutations must emit activities in the service layer (not API handlers or models).

ActivityEnvelope

ActivityEnvelope(
    tenant_id=UUID,
    actor_principal_id=UUID,
    resource_id=UUID,
    resource_type=str,           # "signal", "dataset", "portfolio"
    action=str,                  # "signal.created", "run.completed"
    visibility=str,              # "private"|"resource"|"scope"|"tenant"
    payload=dict,                # Action-specific data
    delivery_channels=list,      # Where to publish
    correlation_id=UUID          # Links related activities
)

Action Naming

Use pattern:

<resource>.<verb>

Core Resource Actions

signal.registered    signal.validated     signal.promoted
dataset.created      dataset.previewed    dataset.refresh_started
dataset.refresh_completed dataset.refresh_failed

Pipeline Actions

pipeline_def.submitted   pipeline_def.deployed
pipeline_instance.created pipeline.run_started pipeline.run_completed

Experiment Actions

experiment.created   experiment.updated
experiment.run_completed experiment.run_failed
expression.evaluated macro.saved

Run Lifecycle Actions

run.started          run.completed        run.failed          run.cancelled
backtest.started     backtest.completed   backtest.failed
training.started     training.completed   training.failed
inference.started    inference.completed  inference.failed
optimization.started optimization.completed optimization.failed
monitoring.started   monitoring.completed monitoring.alert

Portfolio Actions

portfolio.rebalanced portfolio.constraints_updated
portfolio.weights_computed portfolio.optimization_started

Promotion/Workflow Actions

promotion.requested  promotion.approved   promotion.merged    promotion.rejected
guardrails.validated guardrails.blocked   guardrails.warned

Monitoring Actions

monitoring.drift_detected    monitoring.performance_alert
monitoring.data_quality_alert monitoring.threshold_breach

Emission Patterns

Simple Emission

await record_activity_with_outbox(
    session=self.session,
    envelope=ActivityEnvelope(
        action="signal.created",
        actor_principal_id=self.actor_id,
        tenant_id=self.tenant_id,
        resource_id=resource.id,
        resource_type="signal",
        payload={"signal_type": dto.signal_type}
    )
)

Transaction Wrapper

from libs.core.activity import tx_activity

result, activity = await tx_activity(db, envelope, domain_fn)

Payload Guidelines

Include: Changed fields, related IDs, computed metrics, status transitions Exclude: Passwords, API keys, large blobs, PII beyond necessity

See references/payload-examples.md.

Correlation IDs

Link related activities in workflows:

correlation_id = uuid4()

# Use same correlation_id throughout workflow
await emit("promotion.requested", correlation_id=correlation_id)
await emit("guardrails.validated", correlation_id=correlation_id)
await emit("promotion.merged", correlation_id=correlation_id)

Real-time Notifications (Outbox Worker)

Activities are processed by the outbox worker which publishes to Centrifugo for real-time WebSocket delivery.

Notification Types

TypeWhoMechanismOpt-in?
ImplicitOwner + DelegatorsQuery Resource + RoleBindingAutomatic
ExplicitSubscribersQuery Subscription tableUser opts in

Watchers Build Flow

# Outbox worker builds watchers set:
watchers: set[UUID] = set()

# 1. Explicit subscribers (user opt-in)
watchers |= await _subscription_watchers(session, tenant_id, resource_id)

# 2. Resource owner (implicit - auto-notified)
owner_id = await _resource_owner(session, tenant_id, resource_id)
if owner_id:
    watchers.add(owner_id)

# 3. Delegators (owner/delegator roles on resource or ancestors)
watchers |= await _resource_delegators(session, tenant_id, resource_id)

# 4. CRITICAL: Exclude actor (don't notify yourself)
watchers.discard(actor_principal_id)

# 5. Filter by user notification preferences
watchers = await _filter_watchers_by_preference(session, tenant_id, watchers, action)

Notification Preferences

Users configure via

PUT /notifications/preferences
:

Filter ModeActions Notified
all
All activity types
mutations
(default)
resource.created/updated/deleted
,
transfer.*
,
promotion.*
custom
User-defined patterns (e.g.,
["resource.*", "chat.*"]
)
# Custom pattern matching uses fnmatch
await client.notifications.update_preferences(
    filter_mode="custom",
    custom_actions=["resource.*", "promotion.*"],
)

Anti-Patterns

Anti-PatternWhy It's WrongCorrect Approach
Notify actor about own actionNoisy, redundantAlways
watchers.discard(actor_id)
Hardcode notification targetsInflexibleBuild watchers dynamically
Skip preference filteringUsers can't control noiseAlways filter by preferences
Notify without checking rolesSecurity issueUse
_resource_delegators()

Subscription API (Explicit Opt-in)

# Subscribe to a resource
await client.subscriptions.create(
    resource_id=folder_id,
    scope="descendants",  # or "resource" for single resource
)

# List subscriptions
subs = await client.subscriptions.list()

# Unsubscribe
await client.subscriptions.revoke(subscription_id)

Reference Files