SciAgent-Skills dask-parallel-computing
Distributed and parallel computing for larger-than-RAM datasets. Five components: DataFrames (parallel pandas), Arrays (parallel NumPy), Bags (unstructured data), Futures (task-based parallelism), Schedulers (threads/processes/distributed). Scales from laptops to HPC clusters. For in-memory speed on single machine use polars; for out-of-core analytics without cluster use vaex.
git clone https://github.com/jaechang-hits/SciAgent-Skills
T=$(mktemp -d) && git clone --depth=1 https://github.com/jaechang-hits/SciAgent-Skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/scientific-computing/dask-parallel-computing" ~/.claude/skills/jaechang-hits-sciagent-skills-dask-parallel-computing && rm -rf "$T"
skills/scientific-computing/dask-parallel-computing/SKILL.mdDask — Parallel & Distributed Computing
Overview
Dask is a Python library for parallel and distributed computing that scales familiar pandas/NumPy APIs to larger-than-memory datasets. It provides five main components (DataFrames, Arrays, Bags, Futures, Schedulers) and scales from single-machine multi-core to multi-node HPC clusters.
When to Use
- Processing datasets that exceed available RAM (10 GB–100 TB)
- Parallelizing pandas or NumPy operations across multiple cores
- Processing multiple files efficiently (CSV, Parquet, JSON, HDF5, Zarr)
- Building custom parallel workflows with task dependencies
- Distributing workloads across HPC clusters (SLURM, Kubernetes)
- Streaming/ETL pipelines for unstructured data (logs, JSON records)
- For in-memory single-machine speed: use polars instead
- For out-of-core single-machine analytics: use vaex instead
Prerequisites
pip install dask[complete] # All components pip install dask[dataframe] # DataFrames only pip install dask[distributed] # Distributed scheduler + dashboard pip install dask-jobqueue # HPC cluster integration (SLURM, PBS)
Core API
1. DataFrames — Parallel Pandas
import dask.dataframe as dd # Read multiple files as a single DataFrame ddf = dd.read_csv('data/2024-*.csv') ddf = dd.read_parquet('data/', columns=['id', 'value', 'category']) # Operations are lazy until .compute() filtered = ddf[ddf['value'] > 100] result = filtered.groupby('category').agg({'value': ['mean', 'sum']}).compute() print(result.shape) # (n_categories, 2) # Custom operations via map_partitions (preferred over apply) def normalize_partition(df): df['norm_value'] = (df['value'] - df['value'].mean()) / df['value'].std() return df ddf = ddf.map_partitions(normalize_partition) # Joins ddf_merged = ddf.merge(lookup_ddf, on='category', how='left') # Write results ddf.to_parquet('output/', engine='pyarrow')
# Repartitioning for optimal chunk sizes ddf = ddf.repartition(npartitions=20) # By count ddf = ddf.repartition(partition_size='100MB') # By size # Index management for sorted operations ddf = ddf.set_index('timestamp', sorted=True) # Debugging print(f"Partitions: {ddf.npartitions}") print(f"Dtypes: {ddf.dtypes}") sample = ddf.get_partition(0).compute() # Inspect first partition
2. Arrays — Parallel NumPy
import dask.array as da import numpy as np # Create from various sources x = da.random.random((100000, 1000), chunks=(10000, 1000)) x = da.from_array(np_array, chunks=(10000, 1000)) x = da.from_zarr('large_dataset.zarr') # Standard operations (lazy) y = (x - x.mean(axis=0)) / x.std(axis=0) # Normalize z = da.dot(x.T, x) # Matrix multiply u, s, v = da.linalg.svd(x) # SVD # Compute and persist result = y.mean(axis=0).compute() print(result.shape) # (1000,)
# Custom operations with map_blocks def custom_filter(block): from scipy.ndimage import gaussian_filter return gaussian_filter(block, sigma=2) filtered = da.map_blocks(custom_filter, x, dtype=x.dtype) # Rechunking for different access patterns x_rechunked = x.rechunk({0: 5000, 1: 500}) # Save to disk da.to_zarr(y, 'normalized.zarr')
3. Bags — Unstructured Data Processing
import dask.bag as db import json # Read unstructured data bag = db.read_text('logs/*.json').map(json.loads) # Functional operations valid = bag.filter(lambda x: x['status'] == 'success') ids = valid.pluck('user_id') flat = bag.map(lambda x: x['tags']).flatten() # Aggregation — use foldby instead of groupby (much faster) counts = bag.foldby( key='category', binop=lambda total, x: total + x['amount'], initial=0, combine=lambda a, b: a + b, combine_initial=0 ).compute() # Convert to DataFrame for structured analysis ddf = valid.to_dataframe(meta={'user_id': 'str', 'amount': 'float64', 'category': 'str'})
4. Futures — Task-Based Parallelism
from dask.distributed import Client client = Client() # Local cluster with all cores print(client.dashboard_link) # http://localhost:8787 # Submit individual tasks (executes immediately, not lazy) def process(x, param): return x ** param future = client.submit(process, 42, param=2) print(future.result()) # 1764 # Map over many inputs futures = client.map(process, range(100), param=2) results = client.gather(futures) print(len(results)) # 100
# Scatter large data to workers (avoids repeated transfers) import numpy as np big_data = np.random.random((10000, 1000)) data_future = client.scatter(big_data, broadcast=True) # Submit tasks using scattered data futures = [client.submit(process_chunk, data_future, i) for i in range(10)] results = client.gather(futures) # Progressive result processing from dask.distributed import as_completed for future in as_completed(futures): result = future.result() print(f"Completed: {result}") # Coordination primitives from dask.distributed import Lock, Queue, Event lock = Lock('resource-lock') with lock: # Thread-safe operation across workers pass client.close()
5. Schedulers & Configuration
import dask # Global scheduler setting dask.config.set(scheduler='threads') # Default: GIL-releasing numeric work dask.config.set(scheduler='processes') # Pure Python, GIL-bound work dask.config.set(scheduler='synchronous') # Debugging with pdb # Context manager for temporary change with dask.config.set(scheduler='synchronous'): result = computation.compute() # Can use pdb here # Per-compute override result = ddf.mean().compute(scheduler='processes') # Distributed scheduler with resource control from dask.distributed import Client client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB') print(client.dashboard_link)
# HPC cluster integration from dask_jobqueue import SLURMCluster from dask.distributed import Client cluster = SLURMCluster( cores=24, memory='100GB', walltime='02:00:00', queue='regular' ) cluster.scale(jobs=10) # Request 10 SLURM jobs client = Client(cluster) # Adaptive scaling cluster.adapt(minimum=2, maximum=20) result = computation.compute() client.close()
Key Concepts
Component Selection Guide
| Data Type | Component | When to Use |
|---|---|---|
| Tabular (CSV, Parquet) | DataFrames | Standard pandas-like operations at scale |
| Numeric arrays (HDF5, Zarr) | Arrays | NumPy operations, linear algebra, image processing |
| Text, JSON, logs | Bags | ETL/cleaning → convert to DataFrame for analysis |
| Custom parallel tasks | Futures | Dynamic workflows, parameter sweeps, task dependencies |
| Any of above | Schedulers | Control execution backend (threads/processes/distributed) |
Control level: DataFrames/Arrays/Bags = high-level lazy API. Futures = low-level immediate execution.
Lazy Evaluation Model
All DataFrames, Arrays, and Bags build a task graph — nothing executes until
.compute() or .persist().
— execute and return result to local memory.compute()
— execute and keep result on workers (for reuse across multiple computations).persist()
— compute multiple results in a single pass (shares intermediates)dask.compute(a, b, c)
Chunk Size Strategy
Target: ~100 MB per chunk (or 10 chunks per core in worker memory).
| Chunk Size | Effect |
|---|---|
| Too large (>1 GB) | Memory overflow, poor parallelization |
| Optimal (~100 MB) | Good parallelism, manageable memory |
| Too small (<1 MB) | Excessive scheduling overhead |
Example: 8 cores, 32 GB RAM → target ~400 MB per chunk (32 GB / 8 cores / 10).
Scheduler Selection Guide
| Scheduler | Overhead | Best For | GIL |
|---|---|---|---|
(default) | ~10 µs/task | NumPy, pandas, scikit-learn | Affected |
| ~10 ms/task | Pure Python, text processing | Not affected |
| ~1 µs/task | Debugging with pdb | N/A |
| ~1 ms/task | Dashboard, clusters, advanced features | Configurable |
Common Workflows
Workflow 1: Multi-File ETL Pipeline
import dask.dataframe as dd import dask # Extract: Read all CSV files ddf = dd.read_csv('raw_data/*.csv', dtype={'amount': 'float64'}) # Transform: Clean and process ddf = ddf[ddf['status'] == 'valid'] ddf['amount'] = ddf['amount'].fillna(0) ddf = ddf.dropna(subset=['category']) # Aggregate summary = ddf.groupby('category').agg({'amount': ['sum', 'mean', 'count']}) # Load: Save as Parquet (columnar, compressed) summary.to_parquet('output/summary.parquet') print(f"Processed {len(ddf)} rows across {ddf.npartitions} partitions")
Workflow 2: Large-Scale Array Processing
import dask.array as da # Load large scientific dataset x = da.from_zarr('experiment_data.zarr') # e.g., (50000, 50000) float64 print(f"Shape: {x.shape}, Chunks: {x.chunks}") # Normalize per-column x_norm = (x - x.mean(axis=0)) / x.std(axis=0) # Compute covariance matrix cov = da.dot(x_norm.T, x_norm) / (x_norm.shape[0] - 1) # SVD for dimensionality reduction (top-k) u, s, v = da.linalg.svd_compressed(x_norm, k=50) # Save results da.to_zarr(u, 'pca_components.zarr') print(f"Explained variance (top 5): {(s[:5]**2 / (s**2).sum()).compute()}")
Workflow 3: Unstructured Data to Analysis
This workflow is a simple combination of Bags (Section 3) → DataFrames (Section 1): read JSON logs with Bags, filter/transform, convert to DataFrame for groupby analysis. Each step maps directly to Core API examples above.
Key Parameters
| Parameter | Module | Default | Description |
|---|---|---|---|
| DataFrame | auto | Number of partitions (controls parallelism) |
| DataFrame | — | Target size per partition (e.g., '100MB') |
| Array | required | Chunk dimensions (e.g., ) |
| Bag | '128 MiB' | File read block size |
| All | 'threads' | Execution backend ('threads', 'processes', 'synchronous') |
| Distributed | auto | Number of worker processes |
| Distributed | auto | Threads per worker |
| Distributed | auto | Per-worker memory limit (e.g., '4GB') |
| | False | Whether data is pre-sorted (enables optimizations) |
| | — | Output DataFrame/Series structure template |
Best Practices
-
Let Dask handle data loading — Never load data into pandas/numpy first then convert. Use
/dd.read_csv()
directly.da.from_zarr() -
Batch compute calls — Use
instead of callingdask.compute(a, b, c)
in loops. Allows sharing intermediates..compute() -
Use
overmap_partitions
—apply
creates one task per row.ddf.apply(func, axis=1)
creates one task per partition.ddf.map_partitions(func) -
Persist reused intermediates — Call
on data accessed multiple times, then.persist()
when done.del -
Use the dashboard —
shows task progress, memory usage, worker states. Essential for diagnosing performance issues.client.dashboard_link -
Anti-pattern — Excessively large task graphs: If
returns millions, increase chunk sizes or uselen(ddf.__dask_graph__())
/map_partitions
to fuse operations.map_blocks -
Anti-pattern — Wrong scheduler for workload: Using threads for pure Python text processing (GIL-bound) or processes for NumPy operations (unnecessary serialization overhead).
Common Recipes
Recipe: Dask-ML Integration
from dask_ml.preprocessing import StandardScaler from dask_ml.model_selection import train_test_split import dask.array as da X = da.random.random((100000, 50), chunks=(10000, 50)) y = da.random.randint(0, 2, size=100000, chunks=10000) # Preprocessing scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # Train/test split X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2) print(f"Train: {X_train.shape}, Test: {X_test.shape}")
Recipe: Actors for Stateful Computation
from dask.distributed import Client client = Client() class RunningStats: def __init__(self): self.count = 0 self.total = 0.0 def add(self, value): self.count += 1 self.total += value return self.total / self.count # Create actor on a worker stats = client.submit(RunningStats, actor=True).result() # Call methods (~1ms roundtrip) for v in [10, 20, 30]: mean = stats.add(v).result() print(f"Running mean: {mean}") client.close()
Recipe: Kubernetes Cluster
from dask_kubernetes import KubeCluster from dask.distributed import Client cluster = KubeCluster() cluster.adapt(minimum=2, maximum=50) client = Client(cluster) # Run computation on auto-scaling cluster result = large_computation.compute() client.close()
Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
during | Result too large for local memory | Use or instead of |
| Slow computation start | Task graph has millions of tasks | Increase chunk sizes; use / |
| Poor parallelization | GIL contention with threads scheduler | Switch to for Python-heavy code |
in | Missing or wrong parameter | Provide |
| Workers killed (OOM) | Chunks exceed worker memory | Decrease chunk size; increase |
exception | Worker process crashed | Check worker logs; reduce memory per task; increase |
| Slow joins/merges | Data not pre-sorted on join key | Call before join |
| Operation not supported by Dask | Use with pandas equivalent |
| Dashboard not accessible | Distributed client not started | Use to enable distributed scheduler |
| Data type mismatch across partitions | Inconsistent CSV files | Specify explicitly in |
Bundled Resources
— Detailed DataFrames, Arrays, and Bags guide with comprehensive code examples for reading, transforming, aggregating, and writing data. Coversreferences/collections_guide.md
patterns, meta parameter, chunking strategies,map_partitions
,map_blocks
, and collection conversion. Consolidated from originalfoldby
,dataframes.md
, andarrays.md
. Originalbags.md
content relocated to Best Practices section and Key Concepts inline.best-practices.md
— Futures API, distributed coordination primitives (Locks, Queues, Events, Variables), Actors, scheduler configuration, HPC cluster setup (SLURM, Kubernetes), adaptive scaling, dashboard monitoring, and performance profiling. Consolidated from originalreferences/distributed_computing.md
andfutures.md
.schedulers.md
Not migrated: Original had 6 reference files.
best-practices.md content consolidated into Best Practices section and Key Concepts (chunk strategy, scheduler selection). Remaining content organized into 2 reference files covering the 5 main components.
Related Skills
- polars-dataframes — In-memory single-machine DataFrame library; faster than Dask for data that fits in RAM
- zarr-python — Chunked array storage format; primary Dask Array persistence backend
- scikit-learn-machine-learning — ML library; Dask-ML provides distributed wrappers
References
- Official Documentation: https://docs.dask.org/
- Dask Best Practices: https://docs.dask.org/en/stable/best-practices.html
- Dask-ML: https://ml.dask.org/
- Dask-Jobqueue (HPC): https://jobqueue.dask.org/