Awesome-omni-skills monte-carlo-push-ingestion
Monte Carlo Push Ingestion workflow skill. Use this skill when the user needs Expert guide for pushing metadata, lineage, and query logs to Monte Carlo from any data warehouse and the operator should preserve the upstream workflow, copied support files, and provenance before merging or handing off.
git clone https://github.com/diegosouzapw/awesome-omni-skills
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/monte-carlo-push-ingestion" ~/.claude/skills/diegosouzapw-awesome-omni-skills-monte-carlo-push-ingestion && rm -rf "$T"
skills/monte-carlo-push-ingestion/SKILL.mdMonte Carlo Push Ingestion
Overview
This public intake copy packages
plugins/antigravity-awesome-skills-claude/skills/monte-carlo-push-ingestion from https://github.com/sickn33/antigravity-awesome-skills into the native Omni Skills editorial shape without hiding its origin.
Use it when the operator needs the upstream workflow, support files, and repository context to stay intact while the public validator and private enhancer continue their normal downstream flow.
This intake keeps the copied upstream files intact and uses
metadata.json plus ORIGIN.md as the provenance anchor for review.
Monte Carlo Push Ingestion You are an agent that helps customers collect metadata, lineage, and query logs from their data warehouses and push that data to Monte Carlo via the push ingestion API. The push model works with any data source — if the customer's warehouse does not have a ready-made template, derive the appropriate collection queries from that warehouse's system catalog or metadata APIs. The push format and pycarlo SDK calls are the same regardless of source. Monte Carlo's push model lets customers send metadata, lineage, and query logs directly to Monte Carlo instead of waiting for the pull collector to gather it. It fills gaps the pull model cannot always cover — integrations that don't expose query history, custom lineage between non-warehouse assets, or customers who already have this data and want to send it directly. Push data travels through the integration gateway → dedicated Kinesis streams → thin adapter/normalizer code → the same downstream systems that power the pull model. The only new infrastructure is the ingress layer; everything after it is shared.
Imported source sections that did not map cleanly to the public headings are still preserved below or in the support files. Notable imported sections: MANDATORY — Always start from templates, Environment variable conventions, What this skill can build for you, Prerequisites — read this first, What you can push, Custom lineage nodes and edges.
When to Use This Skill
Use this section as the trigger filter. It should make the activation boundary explicit before the operator loads files, runs commands, or opens a pull request.
- Reference file - Load when…
- references/prerequisites.md - Customer is setting up for the first time, has auth errors, or needs help creating API keys
- references/push-metadata.md - Building or debugging a metadata collection script
- references/push-lineage.md - Building or debugging a lineage collection script
- references/push-query-logs.md - Building or debugging a query log collection script
- references/custom-lineage.md - Customer needs custom lineage nodes or edges via GraphQL
Operating Table
| Situation | Start here | Why it matters |
|---|---|---|
| First-time use | | Confirms repository, branch, commit, and imported path before touching the copied workflow |
| Provenance review | | Gives reviewers a plain-language audit trail for the imported source |
| Workflow execution | | Starts with the smallest copied file that materially changes execution |
| Supporting context | | Adds the next most relevant copied source file without loading the entire package |
| Handoff decision | | Helps the operator switch to a stronger native skill when the task drifts |
Workflow
This workflow is intentionally editorial and operational at the same time. It keeps the imported source useful to the operator while still satisfying the public intake standards that feed the downstream enhancer flow.
- Metadata (schema + volume + freshness): references/push-metadata.md
- Table and column lineage: references/push-lineage.md
- Query logs: references/push-query-logs.md
- Metadata: visible within a few minutes
- Table lineage: visible within seconds to a few minutes (fast direct path to Neo4j)
- Column lineage: a few minutes
- Query logs: at least 15-20 minutes (async processing pipeline)
Imported Workflow Notes
Imported: Step 1 — Generate your collection scripts
Ask Claude to build the script for your warehouse:
"Build me a metadata collection script for Snowflake. My MC resource UUID is
."abc-123
The script templates in
**/push-ingestion/scripts/templates/ (Snowflake, BigQuery, BigQuery Iceberg, Databricks, Redshift, Hive)
are the mandatory starting point for script generation — they contain the correct pycarlo
imports, model constructors, and SDK calls. They are not an exhaustive list. If the
customer's warehouse is not listed, use the templates as a guide and determine the appropriate
queries or file-collection approach for their platform. For file-based sources (like Hive
Metastore logs), provide the command to retrieve the file, parse it, and transform it into the
format required by the push APIs. The push format and SDK calls are identical regardless of
source; only the collection queries change.
Batching: For large payloads, split events into batches. Use a batch size of 50 assets per push call. The pycarlo HTTP client has a hardcoded 10-second read timeout that cannot be overridden (
Session and Client do not accept a timeout parameter) — larger batches (200+)
will timeout on warehouses with thousands of tables. The compressed request body must also not
exceed 1MB (Kinesis limit). All push endpoints support batching.
Push frequency: Push at most once per hour. Sub-hourly pushes produce unpredictable anomaly detector behavior because the training pipeline aggregates into hourly buckets.
Per flow, see:
- Metadata (schema + volume + freshness):
references/push-metadata.md - Table and column lineage:
references/push-lineage.md - Query logs:
references/push-query-logs.md
Imported: Step 2 — Validate pushed data
After pushing, verify data is visible in Monte Carlo using the GraphQL API (GraphQL API key).
→
references/validation.md — all verification queries (getTable, getMetricsV4,
getTableLineage, getDerivedTablesPartialLineage, getAggregatedQueries)
Timing expectations:
- Metadata: visible within a few minutes
- Table lineage: visible within seconds to a few minutes (fast direct path to Neo4j)
- Column lineage: a few minutes
- Query logs: at least 15-20 minutes (async processing pipeline)
Imported: Step 3 — Anomaly detection (optional)
If you want Monte Carlo's freshness and volume detectors to fire on pushed data, you need to push consistently over time — detectors require historical data to train.
→
references/anomaly-detection.md — recommended push frequency, minimum samples,
training windows, and what to tell customers who ask why detectors aren't activating
Imported: MANDATORY — Always start from templates
When generating any push-ingestion script, you MUST:
- Read the corresponding template before writing any code. Templates live in this skill's
directory under
. To find them, glob forscripts/templates/<warehouse>/
— this works regardless of where the skill is installed. Do NOT search from the current working directory alone.**/push-ingestion/scripts/templates/<warehouse>/*.py - Adapt the template to the customer's needs — do not write pycarlo imports, model constructors, or SDK method calls from memory.
- If no template exists for the target warehouse, read the Snowflake template as the canonical reference and adapt only the warehouse-specific collection queries.
Template files follow this naming pattern:
— collection only (queries the warehouse, writes a JSON manifest)collect_<flow>.py
— push only (reads the manifest, sends to Monte Carlo)push_<flow>.py
— combined (imports from both, runs in sequence)collect_and_push_<flow>.py
After running any push script, you MUST surface the
invocation_id(s) returned by the API
to the user. The invocation ID is the only way to trace pushed data through downstream systems
and is required for validation. Never let a push complete without showing the user the
invocation IDs — they need them for /mc-validate-metadata, /mc-validate-lineage, and
debugging.
Examples
Example 1: Ask for the upstream workflow directly
Use @monte-carlo-push-ingestion to handle <task>. Start from the copied upstream workflow, load only the files that change the outcome, and keep provenance visible in the answer.
Explanation: This is the safest starting point when the operator needs the imported workflow, but not the entire repository.
Example 2: Ask for a provenance-grounded review
Review @monte-carlo-push-ingestion against metadata.json and ORIGIN.md, then explain which copied upstream files you would load first and why.
Explanation: Use this before review or troubleshooting when you need a precise, auditable explanation of origin and file selection.
Example 3: Narrow the copied support files before execution
Use @monte-carlo-push-ingestion for <task>. Load only the copied references, examples, or scripts that change the outcome, and name the files explicitly before proceeding.
Explanation: This keeps the skill aligned with progressive disclosure instead of loading the whole copied package by default.
Example 4: Build a reviewer packet
Review @monte-carlo-push-ingestion using the copied upstream files plus provenance, then summarize any gaps before merge.
Explanation: This is useful when the PR is waiting for human review and you want a repeatable audit packet.
Imported Usage Notes
Imported: Available slash commands
Customers can invoke these explicitly instead of describing their intent in prose:
| Command | Purpose |
|---|---|
| Generate a metadata collection script |
| Generate a lineage collection script |
| Generate a query log collection script |
| Verify pushed metadata via the GraphQL API |
| Verify pushed lineage via the GraphQL API |
| Verify pushed query logs via the GraphQL API |
| Create a custom lineage node |
| Create a custom lineage edge |
| Delete a custom lineage node |
| Delete push-ingested tables |
Best Practices
Treat the generated public skill as a reviewable packaging layer around the upstream repository. The goal is to keep provenance explicit and load only the copied source material that materially improves execution.
- Keep the imported skill grounded in the upstream repository; do not invent steps that the source material cannot support.
- Prefer the smallest useful set of support files so the workflow stays auditable and fast to review.
- Keep provenance, source commit, and imported file paths visible in notes and PR descriptions.
- Point directly at the copied upstream files that justify the workflow instead of relying on generic review boilerplate.
- Treat generated examples as scaffolding; adapt them to the concrete task before execution.
- Route to a stronger native skill when architecture, debugging, design, or security concerns become dominant.
Troubleshooting
Problem: The operator skipped the imported context and answered too generically
Symptoms: The result ignores the upstream workflow in
plugins/antigravity-awesome-skills-claude/skills/monte-carlo-push-ingestion, fails to mention provenance, or does not use any copied source files at all.
Solution: Re-open metadata.json, ORIGIN.md, and the most relevant copied upstream files. Load only the files that materially change the answer, then restate the provenance before continuing.
Problem: The imported workflow feels incomplete during review
Symptoms: Reviewers can see the generated
SKILL.md, but they cannot quickly tell which references, examples, or scripts matter for the current task.
Solution: Point at the exact copied references, examples, scripts, or assets that justify the path you took. If the gap is still real, record it in the PR instead of hiding it.
Problem: The task drifted into a different specialization
Symptoms: The imported skill starts in the right place, but the work turns into debugging, architecture, design, security, or release orchestration that a native skill handles better. Solution: Use the related skills section to hand off deliberately. Keep the imported provenance visible so the next skill inherits the right context instead of starting blind.
Related Skills
- Use when the work is better handled by that native specialization after this imported skill establishes context.@monte-carlo-monitor-creation
- Use when the work is better handled by that native specialization after this imported skill establishes context.@monte-carlo-prevent
- Use when the work is better handled by that native specialization after this imported skill establishes context.@monte-carlo-validation-notebook
- Use when the work is better handled by that native specialization after this imported skill establishes context.@moodle-external-api-development
Additional Resources
Use this support matrix and the linked files below as the operator packet for this imported skill. They should reflect real copied source material, not generic scaffolding.
| Resource family | What it gives the reviewer | Example path |
|---|---|---|
| copied reference notes, guides, or background material from upstream | |
| worked examples or reusable prompts copied from upstream | |
| upstream helper scripts that change execution or validation | |
| routing or delegation notes that are genuinely part of the imported package | |
| supporting assets or schemas copied from the source package | |
- anomaly-detection.md
- custom-lineage.md
- direct-http-api.md
- prerequisites.md
- sample_verify.py
- templates/bigquery/collect_and_push_lineage.py
- templates/bigquery/collect_and_push_metadata.py
- templates/bigquery/collect_and_push_query_logs.py
Imported Reference Notes
Imported: Canonical pycarlo API — authoritative reference
The following imports, classes, and method signatures are the ONLY correct pycarlo API for push ingestion. If your training data suggests different names, it is wrong. Use exactly what is listed here.
Imports and client setup
from pycarlo.core import Client, Session from pycarlo.features.ingestion import IngestionService from pycarlo.features.ingestion.models import ( # Metadata RelationalAsset, AssetMetadata, AssetField, AssetVolume, AssetFreshness, Tag, # Lineage LineageEvent, LineageAssetRef, ColumnLineageField, ColumnLineageSourceField, # Query logs QueryLogEntry, ) client = Client(session=Session(mcd_id=key_id, mcd_token=key_token, scope="Ingestion")) service = IngestionService(mc_client=client)
Method signatures
# Metadata service.send_metadata(resource_uuid=..., resource_type=..., events=[RelationalAsset(...)]) # Lineage (table or column) service.send_lineage(resource_uuid=..., resource_type=..., events=[LineageEvent(...)]) # Query logs — note: log_type, NOT resource_type service.send_query_logs(resource_uuid=..., log_type=..., events=[QueryLogEntry(...)]) # Extract invocation ID from any response service.extract_invocation_id(result)
RelationalAsset structure (nested, NOT flat)
RelationalAsset( type="TABLE", # ONLY "TABLE" or "VIEW" (uppercase) — normalize warehouse-native values metadata=AssetMetadata( name="my_table", database="analytics", schema="public", description="optional description", ), fields=[ AssetField(name="id", type="INTEGER", description=None), AssetField(name="amount", type="DECIMAL(10,2)"), ], volume=AssetVolume(row_count=1000000, byte_count=111111111), # optional freshness=AssetFreshness(last_update_time="2026-03-12T14:30:00Z"), # optional )
Imported: Environment variable conventions
All generated scripts MUST use these exact variable names. Do NOT invent alternatives like
MCD_KEY_ID, MC_TOKEN, MONTE_CARLO_KEY, etc.
| Variable | Purpose | Used by |
|---|---|---|
| Ingestion key ID (scope=Ingestion) | push scripts |
| Ingestion key secret | push scripts |
| GraphQL API key ID | verification scripts |
| GraphQL API key secret | verification scripts |
| Warehouse resource UUID | all scripts |
Imported: What this skill can build for you
Tell Claude your warehouse or data platform and Monte Carlo resource UUID and this skill will generate a ready-to-run Python script that:
- Connects to your warehouse using the idiomatic driver for that platform
- Discovers databases, schemas, and tables
- Extracts the right columns — names, types, row counts, byte counts, last modified time, descriptions
- Builds the correct pycarlo
,RelationalAsset
, orLineageEvent
objectsQueryLogEntry - Pushes to Monte Carlo and saves an output manifest with the
for tracinginvocation_id
Templates are available for common warehouses (Snowflake, BigQuery, BigQuery Iceberg, Databricks, Redshift, Hive). For any other platform, Claude will derive the appropriate collection queries from the warehouse's system catalog or metadata APIs and generate an equivalent script.
Ready-to-run examples
Production-ready example scripts built from these templates are published in the mcd-public-resources repo:
- BigQuery Iceberg (BigLake) tables —
metadata and query log collection for BigQuery Iceberg tables that are invisible to Monte
Carlo's standard pull collector (which uses
). Includes a__TABLES__
flag for fast periodic pushes that skip the schema/fields query — useful for hourly cron jobs after the initial full metadata push.--only-freshness-and-volume
Imported: Prerequisites — read this first
→ Load
references/prerequisites.md
Two separate API keys are required. This is the most common setup stumbling block:
- Ingestion key (scope=Ingestion) — for pushing data
- GraphQL API key — for verification queries
Both use the same
x-mcd-id / x-mcd-token headers but point to different endpoints.
Imported: What you can push
| Flow | pycarlo method | Push endpoint | Type field | Expiration |
|---|---|---|---|---|
| Table metadata | | | (e.g. ) | Never expires |
| Table lineage | | | (same as metadata) | Never expires |
| Column lineage | (events include ) | | (same as metadata) | Expires after 10 days |
| Query logs | | | (not !) | Same as pulled |
| Custom lineage | GraphQL mutations | | N/A — uses GraphQL API key | 7 days default; set for permanent |
Important: Query logs use
log_type instead of resource_type. This is the only push
endpoint where the field name differs. See references/push-query-logs.md for the full list
of supported log_type values.
The pycarlo SDK is optional — you can also call the push APIs directly via HTTP/curl. See
references/direct-http-api.md for examples.
Every push returns an
invocation_id — save it. It is your primary debugging handle across
all downstream systems.
Imported: Custom lineage nodes and edges
For non-warehouse assets (dbt models, Airflow DAGs, custom ETL pipelines) or cross-resource lineage, use the GraphQL mutations directly:
→
references/custom-lineage.md — createOrUpdateLineageNode, createOrUpdateLineageEdge,
deleteLineageNode, and the critical expireAt: "9999-12-31" rule
Imported: Deleting push-ingested tables
Push tables are excluded from the normal pull-based deletion flow (intentionally). To delete them explicitly, use
deletePushIngestedTables — covered in references/validation.md
under "Table management operations".
Imported: Debugging checkpoints
When pushed data isn't appearing, work through these five checkpoints in order:
-
Did the SDK return a
and an202
? If not, the gateway rejected the request — check auth headers andinvocation_id
.resource.uuid -
Is the integration key the right type? Must be scope
, created viaIngestion
. A standard GraphQL API key will not work for push.montecarlo integrations create-key --scope Ingestion -
Is
correct and authorized? The key can be scoped to specific warehouse UUIDs. If the UUID doesn't match, you getresource.uuid
.403 -
Did the normalizer process it? Use the
to search CloudWatch logs for the relevant Lambda. For query logs, check theinvocation_id
— Hive requireslog_type
, not"hive-s3"
."hive" -
Did the downstream system pick it up?
- Metadata: query
in GraphQLgetTable - Table lineage: check Neo4j within seconds–minutes (fast path via PushLineageProcessor)
- Query logs: wait at least 15-20 minutes; check
getAggregatedQueries
- Metadata: query
Imported: Known gotchas
vslog_type
: metadata and lineage useresource_type
(e.g.resource_type
); query logs use"data-lake"
— the only endpoint where the field name differs. Wrong value →log_type
error.Unsupported ingest query-log log_type
must be saved: every output manifest should include it — it's your only tracing handle once the request leaves the SDK.invocation_id- Query log async delay: at least 15-20 minutes.
will return 0 until processing completes — this is expected, not a bug.getAggregatedQueries - Custom lineage
defaults to 7 days: nodes vanish silently unless you setexpireAt
for permanent nodes.expireAt: "9999-12-31" - Push tables are never auto-deleted: the periodic cleanup job excludes them by default
(
). Delete them explicitly viaexclude_push_tables=True
(max 1,000 MCONs per call; also deletes lineage nodes and all edges touching those nodes).deletePushIngestedTables - Anomaly detectors need history: pushing once is not enough. Freshness needs 7+ pushes over ~2 weeks; volume needs 10–48 samples over ~42 days. Push at most once per hour.
- Batching required for large payloads: the compressed request body must not exceed 1MB. Split large event lists into batches.
- Column lineage expires after 10 days: unlike table metadata and table lineage (which never expire), column lineage has a 10-day TTL, same as pulled column lineage.
- Quote SQL identifiers in warehouse queries: database, schema, and table names must be
quoted to handle mixed-case or special characters. The quoting syntax varies by warehouse —
Snowflake and Redshift use double quotes (
), BigQuery/Databricks/Hive use backticks ("{db}"
). The templates already handle this correctly for each warehouse — follow the same quoting pattern when adapting.`db`
Imported: Memory safety
Generated scripts must include a startup memory check. The collection phase loads query history rows into memory for parsing — on large warehouses with long lookback windows, this can exhaust available RAM and cause the process to be silently killed (SIGKILL / exit 137) with no traceback.
Add this pattern near the top of every generated script, after imports:
import os def _check_available_memory(min_gb: float = 2.0) -> None: """Warn if available memory is below the threshold.""" try: if hasattr(os, "sysconf"): # Linux / macOS page_size = os.sysconf("SC_PAGE_SIZE") avail_pages = os.sysconf("SC_AVPHYS_PAGES") avail_gb = (page_size * avail_pages) / (1024 ** 3) else: return # Windows — skip check except (ValueError, OSError): return if avail_gb < min_gb: print( f"WARNING: Only {avail_gb:.1f} GB of memory available " f"(minimum recommended: {min_gb:.1f} GB). " f"Consider reducing the lookback window or increasing available memory." )
Call
_check_available_memory() before connecting to the warehouse.
Additionally, when fetching query history:
- Use
in a loop instead ofcursor.fetchmany(batch_size)
when possiblecursor.fetchall() - For very large result sets, consider adding a LIMIT clause and processing in windows
Imported: Limitations
- Use this skill only when the task clearly matches the scope described above.
- Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
- Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.