install
source · Clone the upstream repo
git clone https://github.com/ComeOnOliver/skillshub
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/ComeOnOliver/skillshub "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/TerminalSkills/skills/ray" ~/.claude/skills/comeonoliver-skillshub-ray && rm -rf "$T"
manifest:
skills/TerminalSkills/skills/ray/SKILL.mdsource content
Ray
Installation
# Install Ray with all components pip install "ray[default]" # Or specific components pip install "ray[serve]" # Model serving pip install "ray[tune]" # Hyperparameter tuning pip install "ray[data]" # Distributed data processing
Ray Core — Distributed Functions
# ray_basics.py — Parallelize Python functions across CPUs/GPUs import ray import time ray.init() # Connects to or starts a local Ray cluster @ray.remote def process_item(item: int) -> int: time.sleep(1) # Simulate work return item ** 2 # Run 10 tasks in parallel (takes ~1s instead of 10s) futures = [process_item.remote(i) for i in range(10)] results = ray.get(futures) print(f"Results: {results}") # GPU tasks @ray.remote(num_gpus=1) def train_on_gpu(data): import torch device = torch.device("cuda") tensor = torch.tensor(data, device=device) return tensor.sum().item()
Ray Actors — Stateful Workers
# ray_actors.py — Stateful distributed objects for maintaining state across calls import ray @ray.remote class ModelServer: def __init__(self, model_name: str): from transformers import pipeline self.pipe = pipeline("sentiment-analysis", model=model_name) self.request_count = 0 def predict(self, text: str) -> dict: self.request_count += 1 return self.pipe(text)[0] def get_stats(self) -> dict: return {"requests": self.request_count} # Create 3 actor replicas servers = [ModelServer.remote("distilbert-base-uncased-finetuned-sst-2-english") for _ in range(3)] # Distribute requests across actors texts = ["Great product!", "Terrible service", "It's okay"] * 10 futures = [servers[i % 3].predict.remote(text) for i, text in enumerate(texts)] results = ray.get(futures)
Ray Serve — Model Serving
# serve_model.py — Deploy ML models as scalable HTTP endpoints from ray import serve from starlette.requests import Request @serve.deployment(num_replicas=2, ray_actor_options={"num_gpus": 0.5}) class SentimentService: def __init__(self): from transformers import pipeline self.classifier = pipeline("sentiment-analysis") async def __call__(self, request: Request) -> dict: body = await request.json() text = body.get("text", "") result = self.classifier(text)[0] return {"label": result["label"], "score": result["score"]} app = SentimentService.bind() serve.run(app, host="0.0.0.0", port=8000)
# Test the endpoint curl -X POST http://localhost:8000 \ -H "Content-Type: application/json" \ -d '{"text": "Ray Serve is excellent!"}'
Ray Serve — Composition (Multi-Model Pipeline)
# serve_pipeline.py — Chain multiple models in a serving pipeline from ray import serve from starlette.requests import Request @serve.deployment class Preprocessor: def preprocess(self, text: str) -> str: return text.strip().lower() @serve.deployment class Classifier: def __init__(self): from transformers import pipeline self.pipe = pipeline("sentiment-analysis") def classify(self, text: str) -> dict: return self.pipe(text)[0] @serve.deployment class Pipeline: def __init__(self, preprocessor, classifier): self.preprocessor = preprocessor self.classifier = classifier async def __call__(self, request: Request) -> dict: body = await request.json() clean_text = await self.preprocessor.preprocess.remote(body["text"]) result = await self.classifier.classify.remote(clean_text) return result preprocessor = Preprocessor.bind() classifier = Classifier.bind() app = Pipeline.bind(preprocessor, classifier)
Ray Tune — Hyperparameter Optimization
# tune_experiment.py — Run hyperparameter search across a cluster from ray import tune from ray.tune.schedulers import ASHAScheduler def train_model(config): import torch import torch.nn as nn model = nn.Sequential( nn.Linear(10, config["hidden_size"]), nn.ReLU(), nn.Linear(config["hidden_size"], 1), ) optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"]) for epoch in range(20): x = torch.randn(64, 10) y = torch.randn(64, 1) loss = nn.MSELoss()(model(x), y) optimizer.zero_grad() loss.backward() optimizer.step() tune.report({"loss": loss.item(), "epoch": epoch}) scheduler = ASHAScheduler(max_t=20, grace_period=5, reduction_factor=2) results = tune.run( train_model, config={ "lr": tune.loguniform(1e-4, 1e-1), "hidden_size": tune.choice([32, 64, 128, 256]), }, num_samples=20, scheduler=scheduler, metric="loss", mode="min", resources_per_trial={"cpu": 2, "gpu": 0}, ) best = results.get_best_result() print(f"Best config: {best.config}") print(f"Best loss: {best.metrics['loss']:.4f}")
Ray Data — Distributed Processing
# ray_data.py — Process large datasets in parallel with Ray Data import ray # Read and process a large dataset ds = ray.data.read_parquet("s3://my-bucket/data/") # Map transformations in parallel def preprocess(batch): batch["text_length"] = [len(t) for t in batch["text"]] return batch processed = ds.map_batches(preprocess, batch_format="pandas") # Filter filtered = processed.filter(lambda row: row["text_length"] > 50) # Write results filtered.write_parquet("s3://my-bucket/processed/") print(f"Processed {filtered.count()} records")
Cluster Setup
# ray-cluster.yaml — Ray cluster configuration for Kubernetes cluster_name: ml-cluster max_workers: 4 provider: type: kubernetes namespace: ray head_node_type: node_config: resources: cpu: "4" memory: "16Gi" worker_node_types: - name: gpu-worker min_workers: 0 max_workers: 4 node_config: resources: cpu: "8" memory: "32Gi" nvidia.com/gpu: "1"
Key Concepts
- Ray Core:
turns any function/class into a distributed task/actor@ray.remote - Ray Serve: Production model serving with autoscaling, batching, and multi-model composition
- Ray Tune: Hyperparameter search with ASHA, Bayesian optimization, PBT, and more
- Ray Data: Distributed data loading and preprocessing for ML training pipelines
- Autoscaling: Automatically scales workers up/down based on demand
- Resource management: Specify CPU, GPU, and memory requirements per task