SciAgent-Skills dask-parallel-computing

Distributed and parallel computing for larger-than-RAM datasets. Five components: DataFrames (parallel pandas), Arrays (parallel NumPy), Bags (unstructured data), Futures (task-based parallelism), Schedulers (threads/processes/distributed). Scales from laptops to HPC clusters. For in-memory speed on single machine use polars; for out-of-core analytics without cluster use vaex.

install
source · Clone the upstream repo
git clone https://github.com/jaechang-hits/SciAgent-Skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jaechang-hits/SciAgent-Skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/scientific-computing/dask-parallel-computing" ~/.claude/skills/jaechang-hits-sciagent-skills-dask-parallel-computing && rm -rf "$T"
manifest: skills/scientific-computing/dask-parallel-computing/SKILL.md
source content

Dask — Parallel & Distributed Computing

Overview

Dask is a Python library for parallel and distributed computing that scales familiar pandas/NumPy APIs to larger-than-memory datasets. It provides five main components (DataFrames, Arrays, Bags, Futures, Schedulers) and scales from single-machine multi-core to multi-node HPC clusters.

When to Use

  • Processing datasets that exceed available RAM (10 GB–100 TB)
  • Parallelizing pandas or NumPy operations across multiple cores
  • Processing multiple files efficiently (CSV, Parquet, JSON, HDF5, Zarr)
  • Building custom parallel workflows with task dependencies
  • Distributing workloads across HPC clusters (SLURM, Kubernetes)
  • Streaming/ETL pipelines for unstructured data (logs, JSON records)
  • For in-memory single-machine speed: use polars instead
  • For out-of-core single-machine analytics: use vaex instead

Prerequisites

pip install dask[complete]        # All components
pip install dask[dataframe]       # DataFrames only
pip install dask[distributed]     # Distributed scheduler + dashboard
pip install dask-jobqueue          # HPC cluster integration (SLURM, PBS)

Core API

1. DataFrames — Parallel Pandas

import dask.dataframe as dd

# Read multiple files as a single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
ddf = dd.read_parquet('data/', columns=['id', 'value', 'category'])

# Operations are lazy until .compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').agg({'value': ['mean', 'sum']}).compute()
print(result.shape)  # (n_categories, 2)

# Custom operations via map_partitions (preferred over apply)
def normalize_partition(df):
    df['norm_value'] = (df['value'] - df['value'].mean()) / df['value'].std()
    return df

ddf = ddf.map_partitions(normalize_partition)

# Joins
ddf_merged = ddf.merge(lookup_ddf, on='category', how='left')

# Write results
ddf.to_parquet('output/', engine='pyarrow')
# Repartitioning for optimal chunk sizes
ddf = ddf.repartition(npartitions=20)         # By count
ddf = ddf.repartition(partition_size='100MB')  # By size

# Index management for sorted operations
ddf = ddf.set_index('timestamp', sorted=True)

# Debugging
print(f"Partitions: {ddf.npartitions}")
print(f"Dtypes: {ddf.dtypes}")
sample = ddf.get_partition(0).compute()  # Inspect first partition

2. Arrays — Parallel NumPy

import dask.array as da
import numpy as np

# Create from various sources
x = da.random.random((100000, 1000), chunks=(10000, 1000))
x = da.from_array(np_array, chunks=(10000, 1000))
x = da.from_zarr('large_dataset.zarr')

# Standard operations (lazy)
y = (x - x.mean(axis=0)) / x.std(axis=0)  # Normalize
z = da.dot(x.T, x)                          # Matrix multiply
u, s, v = da.linalg.svd(x)                  # SVD

# Compute and persist
result = y.mean(axis=0).compute()
print(result.shape)  # (1000,)
# Custom operations with map_blocks
def custom_filter(block):
    from scipy.ndimage import gaussian_filter
    return gaussian_filter(block, sigma=2)

filtered = da.map_blocks(custom_filter, x, dtype=x.dtype)

# Rechunking for different access patterns
x_rechunked = x.rechunk({0: 5000, 1: 500})

# Save to disk
da.to_zarr(y, 'normalized.zarr')

3. Bags — Unstructured Data Processing

import dask.bag as db
import json

# Read unstructured data
bag = db.read_text('logs/*.json').map(json.loads)

# Functional operations
valid = bag.filter(lambda x: x['status'] == 'success')
ids = valid.pluck('user_id')
flat = bag.map(lambda x: x['tags']).flatten()

# Aggregation — use foldby instead of groupby (much faster)
counts = bag.foldby(
    key='category',
    binop=lambda total, x: total + x['amount'],
    initial=0,
    combine=lambda a, b: a + b,
    combine_initial=0
).compute()

# Convert to DataFrame for structured analysis
ddf = valid.to_dataframe(meta={'user_id': 'str', 'amount': 'float64', 'category': 'str'})

4. Futures — Task-Based Parallelism

from dask.distributed import Client

client = Client()  # Local cluster with all cores
print(client.dashboard_link)  # http://localhost:8787

# Submit individual tasks (executes immediately, not lazy)
def process(x, param):
    return x ** param

future = client.submit(process, 42, param=2)
print(future.result())  # 1764

# Map over many inputs
futures = client.map(process, range(100), param=2)
results = client.gather(futures)
print(len(results))  # 100
# Scatter large data to workers (avoids repeated transfers)
import numpy as np
big_data = np.random.random((10000, 1000))
data_future = client.scatter(big_data, broadcast=True)

# Submit tasks using scattered data
futures = [client.submit(process_chunk, data_future, i) for i in range(10)]
results = client.gather(futures)

# Progressive result processing
from dask.distributed import as_completed
for future in as_completed(futures):
    result = future.result()
    print(f"Completed: {result}")

# Coordination primitives
from dask.distributed import Lock, Queue, Event
lock = Lock('resource-lock')
with lock:
    # Thread-safe operation across workers
    pass

client.close()

5. Schedulers & Configuration

import dask

# Global scheduler setting
dask.config.set(scheduler='threads')       # Default: GIL-releasing numeric work
dask.config.set(scheduler='processes')     # Pure Python, GIL-bound work
dask.config.set(scheduler='synchronous')   # Debugging with pdb

# Context manager for temporary change
with dask.config.set(scheduler='synchronous'):
    result = computation.compute()  # Can use pdb here

# Per-compute override
result = ddf.mean().compute(scheduler='processes')

# Distributed scheduler with resource control
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')
print(client.dashboard_link)
# HPC cluster integration
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(
    cores=24, memory='100GB',
    walltime='02:00:00', queue='regular'
)
cluster.scale(jobs=10)  # Request 10 SLURM jobs
client = Client(cluster)

# Adaptive scaling
cluster.adapt(minimum=2, maximum=20)

result = computation.compute()
client.close()

Key Concepts

Component Selection Guide

Data TypeComponentWhen to Use
Tabular (CSV, Parquet)DataFramesStandard pandas-like operations at scale
Numeric arrays (HDF5, Zarr)ArraysNumPy operations, linear algebra, image processing
Text, JSON, logsBagsETL/cleaning → convert to DataFrame for analysis
Custom parallel tasksFuturesDynamic workflows, parameter sweeps, task dependencies
Any of aboveSchedulersControl execution backend (threads/processes/distributed)

Control level: DataFrames/Arrays/Bags = high-level lazy API. Futures = low-level immediate execution.

Lazy Evaluation Model

All DataFrames, Arrays, and Bags build a task graph — nothing executes until

.compute()
or
.persist()
.

  • .compute()
    — execute and return result to local memory
  • .persist()
    — execute and keep result on workers (for reuse across multiple computations)
  • dask.compute(a, b, c)
    — compute multiple results in a single pass (shares intermediates)

Chunk Size Strategy

Target: ~100 MB per chunk (or 10 chunks per core in worker memory).

Chunk SizeEffect
Too large (>1 GB)Memory overflow, poor parallelization
Optimal (~100 MB)Good parallelism, manageable memory
Too small (<1 MB)Excessive scheduling overhead

Example: 8 cores, 32 GB RAM → target ~400 MB per chunk (32 GB / 8 cores / 10).

Scheduler Selection Guide

SchedulerOverheadBest ForGIL
threads
(default)
~10 µs/taskNumPy, pandas, scikit-learnAffected
processes
~10 ms/taskPure Python, text processingNot affected
synchronous
~1 µs/taskDebugging with pdbN/A
distributed
~1 ms/taskDashboard, clusters, advanced featuresConfigurable

Common Workflows

Workflow 1: Multi-File ETL Pipeline

import dask.dataframe as dd
import dask

# Extract: Read all CSV files
ddf = dd.read_csv('raw_data/*.csv', dtype={'amount': 'float64'})

# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].fillna(0)
ddf = ddf.dropna(subset=['category'])

# Aggregate
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean', 'count']})

# Load: Save as Parquet (columnar, compressed)
summary.to_parquet('output/summary.parquet')
print(f"Processed {len(ddf)} rows across {ddf.npartitions} partitions")

Workflow 2: Large-Scale Array Processing

import dask.array as da

# Load large scientific dataset
x = da.from_zarr('experiment_data.zarr')  # e.g., (50000, 50000) float64
print(f"Shape: {x.shape}, Chunks: {x.chunks}")

# Normalize per-column
x_norm = (x - x.mean(axis=0)) / x.std(axis=0)

# Compute covariance matrix
cov = da.dot(x_norm.T, x_norm) / (x_norm.shape[0] - 1)

# SVD for dimensionality reduction (top-k)
u, s, v = da.linalg.svd_compressed(x_norm, k=50)

# Save results
da.to_zarr(u, 'pca_components.zarr')
print(f"Explained variance (top 5): {(s[:5]**2 / (s**2).sum()).compute()}")

Workflow 3: Unstructured Data to Analysis

This workflow is a simple combination of Bags (Section 3) → DataFrames (Section 1): read JSON logs with Bags, filter/transform, convert to DataFrame for groupby analysis. Each step maps directly to Core API examples above.

Key Parameters

ParameterModuleDefaultDescription
npartitions
DataFrameautoNumber of partitions (controls parallelism)
partition_size
DataFrameTarget size per partition (e.g., '100MB')
chunks
ArrayrequiredChunk dimensions (e.g.,
(10000, 1000)
)
blocksize
Bag'128 MiB'File read block size
scheduler
All'threads'Execution backend ('threads', 'processes', 'synchronous')
n_workers
DistributedautoNumber of worker processes
threads_per_worker
DistributedautoThreads per worker
memory_limit
DistributedautoPer-worker memory limit (e.g., '4GB')
sorted
set_index
FalseWhether data is pre-sorted (enables optimizations)
meta
map_partitions
Output DataFrame/Series structure template

Best Practices

  1. Let Dask handle data loading — Never load data into pandas/numpy first then convert. Use

    dd.read_csv()
    /
    da.from_zarr()
    directly.

  2. Batch compute calls — Use

    dask.compute(a, b, c)
    instead of calling
    .compute()
    in loops. Allows sharing intermediates.

  3. Use

    map_partitions
    over
    apply
    ddf.apply(func, axis=1)
    creates one task per row.
    ddf.map_partitions(func)
    creates one task per partition.

  4. Persist reused intermediates — Call

    .persist()
    on data accessed multiple times, then
    del
    when done.

  5. Use the dashboard

    client.dashboard_link
    shows task progress, memory usage, worker states. Essential for diagnosing performance issues.

  6. Anti-pattern — Excessively large task graphs: If

    len(ddf.__dask_graph__())
    returns millions, increase chunk sizes or use
    map_partitions
    /
    map_blocks
    to fuse operations.

  7. Anti-pattern — Wrong scheduler for workload: Using threads for pure Python text processing (GIL-bound) or processes for NumPy operations (unnecessary serialization overhead).

Common Recipes

Recipe: Dask-ML Integration

from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import train_test_split
import dask.array as da

X = da.random.random((100000, 50), chunks=(10000, 50))
y = da.random.randint(0, 2, size=100000, chunks=10000)

# Preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
print(f"Train: {X_train.shape}, Test: {X_test.shape}")

Recipe: Actors for Stateful Computation

from dask.distributed import Client

client = Client()

class RunningStats:
    def __init__(self):
        self.count = 0
        self.total = 0.0

    def add(self, value):
        self.count += 1
        self.total += value
        return self.total / self.count

# Create actor on a worker
stats = client.submit(RunningStats, actor=True).result()

# Call methods (~1ms roundtrip)
for v in [10, 20, 30]:
    mean = stats.add(v).result()
    print(f"Running mean: {mean}")

client.close()

Recipe: Kubernetes Cluster

from dask_kubernetes import KubeCluster
from dask.distributed import Client

cluster = KubeCluster()
cluster.adapt(minimum=2, maximum=50)
client = Client(cluster)

# Run computation on auto-scaling cluster
result = large_computation.compute()
client.close()

Troubleshooting

ProblemCauseSolution
MemoryError
during
.compute()
Result too large for local memoryUse
.to_parquet()
or
.persist()
instead of
.compute()
Slow computation startTask graph has millions of tasksIncrease chunk sizes; use
map_partitions
/
map_blocks
Poor parallelizationGIL contention with threads schedulerSwitch to
scheduler='processes'
for Python-heavy code
TypeError
in
map_partitions
Missing or wrong
meta
parameter
Provide
meta=pd.DataFrame({'col': pd.Series(dtype='float64')})
Workers killed (OOM)Chunks exceed worker memoryDecrease chunk size; increase
memory_limit
KilledWorker
exception
Worker process crashedCheck worker logs; reduce memory per task; increase
memory_limit
Slow joins/mergesData not pre-sorted on join keyCall
ddf.set_index('key', sorted=True)
before join
NotImplementedError
Operation not supported by DaskUse
map_partitions
with pandas equivalent
Dashboard not accessibleDistributed client not startedUse
client = Client()
to enable distributed scheduler
Data type mismatch across partitionsInconsistent CSV filesSpecify
dtype
explicitly in
dd.read_csv()

Bundled Resources

  • references/collections_guide.md
    — Detailed DataFrames, Arrays, and Bags guide with comprehensive code examples for reading, transforming, aggregating, and writing data. Covers
    map_partitions
    patterns, meta parameter, chunking strategies,
    map_blocks
    ,
    foldby
    , and collection conversion. Consolidated from original
    dataframes.md
    ,
    arrays.md
    , and
    bags.md
    . Original
    best-practices.md
    content relocated to Best Practices section and Key Concepts inline.
  • references/distributed_computing.md
    — Futures API, distributed coordination primitives (Locks, Queues, Events, Variables), Actors, scheduler configuration, HPC cluster setup (SLURM, Kubernetes), adaptive scaling, dashboard monitoring, and performance profiling. Consolidated from original
    futures.md
    and
    schedulers.md
    .

Not migrated: Original had 6 reference files.

best-practices.md
content consolidated into Best Practices section and Key Concepts (chunk strategy, scheduler selection). Remaining content organized into 2 reference files covering the 5 main components.

Related Skills

  • polars-dataframes — In-memory single-machine DataFrame library; faster than Dask for data that fits in RAM
  • zarr-python — Chunked array storage format; primary Dask Array persistence backend
  • scikit-learn-machine-learning — ML library; Dask-ML provides distributed wrappers

References