Claude-skill-registry digdag
Treasure Workflow (digdag) for TD. Covers .dig syntax, session variables (session_date, session_date_compact), td> operator, _parallel/_retry/_error directives, and tdx wf commands.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/digdag" ~/.claude/skills/majiayu000-claude-skill-registry-digdag && rm -rf "$T"
manifest:
skills/data/digdag/SKILL.mdsource content
Treasure Workflow (Digdag)
Basic Structure
timezone: Asia/Tokyo schedule: daily>: 02:00:00 _export: td: database: my_database engine: presto +extract: td>: queries/extract.sql create_table: raw_data +transform: td>: queries/transform.sql create_table: results
Key points:
extension required; filename becomes workflow name.dig- Tasks run sequentially with
prefix+task_name:
is sugar forfoo>: bar
and_type: foo_command: bar
Session Variables
| Variable | Example |
|---|---|
| |
| |
| |
| |
| Previous scheduled date |
| Next scheduled date |
Moment.js available:
+tomorrow: echo>: ${moment(session_time).add(1, 'days').format("YYYY-MM-DD")}
TD Operator
+query: td>: queries/analysis.sql database: analytics engine: presto create_table: results # or insert_into: existing_table
Inline SQL:
+inline: td>: query: | SELECT * FROM events WHERE TD_TIME_RANGE(time, '${session_date}', TD_TIME_ADD('${session_date}', '1d'))
Parallel Execution
+parallel_tasks: _parallel: true +task_a: td>: queries/a.sql +task_b: td>: queries/b.sql +after_parallel: echo>: "Runs after all parallel tasks"
Limited concurrency:
+limited: _parallel: limit: 2
Error Handling
+task: td>: queries/important.sql _retry: 3 _error: +alert: sh>: python scripts/alert.py "Task failed"
Retry with backoff:
+task: _retry: limit: 3 interval: 10 interval_type: exponential # or constant
Variables
_export: td: database: production my_param: value api_key: ${secret:api_credentials.key} # TD parameter store +task: py>: scripts.process.main param: ${my_param}
Conditional & Loops
+check: td>: queries/count.sql store_last_results: true +if_data: if>: ${td.last_results.cnt > 0} _do: +process: td>: queries/process.sql +loop: for_each>: region: [US, EU, ASIA] _do: +process: td>: queries/by_region.sql
Event Triggers
# Runs after another workflow succeeds trigger: attempt>: dependent_workflow_name: segment_refresh dependent_project_name: customer_segments +activate: td>: queries/activate.sql
tdx wf Commands
# Project sync (recommended workflow) tdx wf pull my_project # Pull project to local folder tdx wf push # Push local changes with diff preview tdx wf clone --name my_project_prod # Clone to new project name # Context & discovery tdx wf use my_project # Set default project for session tdx wf projects # List all projects tdx wf workflows # List workflows in project # Running & monitoring tdx wf run # Interactive workflow selector tdx wf run my_project.my_workflow # Run specific workflow tdx wf sessions # List runs tdx wf attempt <id> tasks # Show task status tdx wf attempt <id> logs +task_name # View logs tdx wf attempt <id> retry # Retry failed tdx wf attempt <id> kill # Stop running # Secrets tdx wf secrets list # List secret keys tdx wf secrets set KEY=value # Set a secret tdx wf secrets delete KEY # Delete a secret # Legacy (digdag-style) tdx wf upload my_workflow # Push without sync tracking
Project Structure
workflows/ └── my_project/ # Created by tdx wf pull ├── tdx.json # Sync tracking (auto-generated) ├── main.dig # Workflow definition ├── queries/ │ └── analysis.sql └── scripts/ └── process.py
Schedule Options
schedule: daily>: 02:00:00 # hourly>: 00:00 # cron>: "0 */4 * * *" # weekly>: "Mon,00:00:00"