Skills prefect
install
source · Clone the upstream repo
git clone https://github.com/TerminalSkills/skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/TerminalSkills/skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/prefect" ~/.claude/skills/terminalskills-skills-prefect && rm -rf "$T"
manifest:
skills/prefect/SKILL.mdsafety · automated scan (medium risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
- pip install
- references API keys
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content
Prefect
Prefect turns Python functions into observable, schedulable workflows with minimal boilerplate. Add
@flow and @task decorators to get retries, logging, caching, and a monitoring UI.
Installation
# Install Prefect pip install prefect # Start the local Prefect server (UI + API) prefect server start # UI at http://localhost:4200 # Or use Prefect Cloud (managed) prefect cloud login
Basic Flow
# flows/hello.py: Simple flow with tasks from prefect import flow, task, get_run_logger from datetime import timedelta @task(retries=3, retry_delay_seconds=10) def fetch_data(url: str) -> dict: import httpx logger = get_run_logger() logger.info(f"Fetching {url}") response = httpx.get(url) response.raise_for_status() return response.json() @task(cache_expiration=timedelta(hours=1)) def transform(data: dict) -> list: return [ {"id": item["id"], "value": item["amount"] * 100} for item in data["results"] ] @task def load(records: list) -> int: logger = get_run_logger() logger.info(f"Loading {len(records)} records") # Insert into database... return len(records) @flow(name="etl-pipeline", log_prints=True) def etl_pipeline(api_url: str = "https://api.example.com/data"): raw = fetch_data(api_url) cleaned = transform(raw) count = load(cleaned) print(f"Processed {count} records") return count if __name__ == "__main__": etl_pipeline()
Scheduling and Deployments
# flows/deploy.py: Create a deployment with schedule from prefect import flow from prefect.deployments import Deployment from prefect.server.schemas.schedules import CronSchedule @flow def daily_report(): print("Generating daily report...") if __name__ == "__main__": # Deploy via Python daily_report.serve( name="daily-report-deployment", cron="0 8 * * *", # Every day at 8 AM tags=["reporting"], parameters={"param1": "value1"}, )
# deploy.sh: Deploy and manage via CLI # Create deployment from flow file prefect deploy flows/hello.py:etl_pipeline \ --name etl-prod \ --pool default-agent-pool \ --cron "*/30 * * * *" # Start a worker to execute deployments prefect worker start --pool default-agent-pool # Trigger a deployment run prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com
Error Handling and Concurrency
# flows/advanced.py: Concurrent tasks, error handling, and sub-flows from prefect import flow, task from prefect.tasks import task_input_hash import asyncio @task( retries=2, retry_delay_seconds=[10, 60], # Exponential backoff cache_key_fn=task_input_hash, timeout_seconds=300, ) def process_item(item_id: int) -> dict: # Process a single item return {"id": item_id, "status": "done"} @flow def batch_process(item_ids: list[int]): # Submit tasks concurrently futures = [process_item.submit(id) for id in item_ids] results = [f.result() for f in futures] succeeded = [r for r in results if r["status"] == "done"] print(f"Processed {len(succeeded)}/{len(item_ids)} items") @flow async def async_pipeline(): # Async flow for I/O-bound work results = await asyncio.gather( fetch_from_api("source_a"), fetch_from_api("source_b"), ) return results
Blocks and Infrastructure
# flows/blocks.py: Use blocks for reusable configuration from prefect.blocks.system import Secret, JSON from prefect_sqlalchemy import SqlAlchemyConnector # Store secrets (set via UI or CLI) # prefect block register -m prefect_sqlalchemy # Then configure in UI at http://localhost:4200/blocks # Use in flows @flow def db_flow(): api_key = Secret.load("my-api-key").get() config = JSON.load("pipeline-config").value with SqlAlchemyConnector.load("prod-db") as conn: result = conn.fetch_all("SELECT count(*) FROM users") print(result)
Notifications
# flows/notifications.py: Send alerts on failure from prefect import flow from prefect.blocks.notifications import SlackWebhook @flow def monitored_flow(): try: # ... do work pass except Exception as e: slack = SlackWebhook.load("alerts-channel") slack.notify(f"❌ Pipeline failed: {e}") raise # Or use automations in Prefect UI: # Automations → Create → Trigger: Flow run failed → Action: Send Slack notification
CLI Reference
# cli.sh: Common Prefect CLI commands # Check connection prefect version prefect config view # List flows and deployments prefect flow-run ls prefect deployment ls # View logs prefect flow-run logs <flow-run-id> # Manage work pools prefect work-pool create my-pool --type process prefect work-pool ls