Marketplace wap-ingestion

Ingest data from S3 into bauplan using the Write-Audit-Publish pattern for safe data loading. Use when loading new data from S3, performing safe data ingestion, or when the user mentions WAP, data ingestion, importing parquet/csv/jsonl files, or needs to safely load data with quality checks.

install
source · Clone the upstream repo
git clone https://github.com/aiskillstore/marketplace
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/aiskillstore/marketplace "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/bauplanlabs/wap-ingestion" ~/.claude/skills/aiskillstore-marketplace-wap-ingestion && rm -rf "$T"
manifest: skills/bauplanlabs/wap-ingestion/SKILL.md
source content

Write-Audit-Publish (WAP) Pattern

Implement WAP by writing a Python script using the

bauplan
SDK. Do NOT use CLI commands.

The three steps: Write (ingest to temp branch) → Audit (quality checks) → Publish (merge to main)

Branch safety: All operations happen on a temporary branch, NEVER on

main
. By default, branches are kept open for inspection after success or failure.

Atomic multi-table operations:

merge_branch
is atomic. You can create or modify multiple tables on a branch, and when you merge, either all changes apply to main or none do. This enables safe multi-table ingestion workflows.

Required User Input

Before writing the WAP script, you MUST ask the user for the following parameters:

  1. S3 path (required): The S3 URI pattern for the source data (e.g.,
    s3://bucket/path/*.parquet
    )
  2. Table name (required): The name for the target table
  3. On success behavior (optional):
    • inspect
      (default): Keep the branch open for user inspection before merging
    • merge
      : Automatically merge to main and delete the branch
  4. On failure behavior (optional):
    • keep
      (default): Leave the branch open for inspection/debugging
    • delete
      : Delete the failed branch

WAP Script Template

See wap_template.py for the complete template. Minimal usage:

from wap_template import wap_ingest

branch, success = wap_ingest(
    table_name="orders",
    s3_path="s3://my-bucket/data/*.parquet",
    namespace="bauplan",
    on_success="inspect",  # or "merge"
    on_failure="keep"      # or "delete"
)

Key SDK Methods

MethodDescription
bauplan.Client()
Initialize the bauplan client
client.info()
Get client info; access username via
.user.username
client.create_branch(name, from_ref="main")
Create a new branch from specified ref
client.has_branch(name)
Check if branch exists
client.delete_branch(name)
Delete a branch
client.create_table(table, search_uri, ...)
Create table with schema inferred from S3
client.import_data(table, search_uri, ...)
Import data from S3 into table
client.query(query, ref)
Run SQL query, returns PyArrow Table
client.merge_branch(source_ref, into_branch)
Merge branch into target
client.has_table(table, ref, namespace)
Check if table exists on branch

SDK Reference: For detailed method signatures, check https://docs.bauplanlabs.com/reference/bauplan

Workflow Checklist

Copy and track progress:

WAP Progress:
- [ ] Ask user for: S3 path, table name, on_success, on_failure
- [ ] Write script using wap_template.py
- [ ] Run script: python wap_script.py
- [ ] Verify output shows row count > 0
- [ ] If on_success="inspect": confirm branch ready for review
- [ ] If on_success="merge": confirm merge to main succeeded

Example Output

Successful run (on_success="inspect"):

$ python wap_script.py
Imported 15234 rows
WAP completed successfully. Branch 'alice.wap_orders_1704067200' ready for inspection.
To merge manually: client.merge_branch(source_ref='alice.wap_orders_1704067200', into_branch='main')

Successful run (on_success="merge"):

$ python wap_script.py
Imported 15234 rows
Successfully published orders to main
Cleaned up branch: alice.wap_orders_1704067200

Failed run (on_failure="keep"):

$ python wap_script.py
WAP failed: No data was imported
Branch 'alice.wap_orders_1704067200' preserved for inspection/debugging.

WAP on Existing Tables

To append data to an existing table, skip

create_table
and only call
import_data
:

# Table already exists on main - just import new data
client.import_data(
    table=table_name,
    search_uri=s3_path,
    namespace=namespace,
    branch=branch_name
)

This appends rows to the existing table schema. The audit and publish phases remain the same: the new rows are automatically sandboxed on the branch until merged.

CLI Merge After Inspection

When

on_success="inspect"
(default), the branch is left open for user review. If the user asks to merge after inspecting the data, use the CLI:

# 1. Checkout to main first (required before merging)
bauplan checkout main

# 2. Merge the WAP branch into main
bauplan branch merge <username>.wap_<table_name>_<timestamp>

# 3. Optionally delete the branch after successful merge
bauplan branch rm <username>.wap_<table_name>_<timestamp>

Note: You must be on

main
to run
bauplan branch merge
. The branch name is printed by the WAP script upon completion.