Claude-skill-registry fal-optimization
Complete fal.ai optimization system. PROACTIVELY activate for: (1) Queue vs run performance, (2) Parallel request batching, (3) Streaming for real-time UI, (4) WebSocket for interactive apps, (5) Model cost comparison, (6) Image size optimization, (7) Inference step tuning, (8) Webhook vs polling, (9) Result caching by seed, (10) Serverless scaling config. Provides: Parallel patterns, cost strategies, caching examples, monitoring setup. Ensures optimal performance and cost-effective usage.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/fal-optimization" ~/.claude/skills/majiayu000-claude-skill-registry-fal-optimization && rm -rf "$T"
skills/data/fal-optimization/SKILL.mdQuick Reference
| Optimization | Technique | Impact |
|---|---|---|
| Parallel requests | with batches | 5-10x throughput |
| Avoid polling | Use webhooks | Lower API calls |
| Cache by seed | Store results | Avoid regeneration |
| Right-size images | Use needed resolution | Lower cost |
| Fewer steps | Reduce inference steps | Faster, cheaper |
| Model Tier | Development | Production |
|---|---|---|
| Image | FLUX Schnell | FLUX.2 Pro |
| Video | Runway Turbo | Kling 2.6 Pro |
| Serverless Config | Cost-Optimized | Latency-Optimized |
|---|---|---|
| | |
| | |
| Smallest viable | Higher tier |
When to Use This Skill
Use for performance and cost optimization:
- Reducing generation latency
- Lowering API costs
- Implementing parallel processing
- Choosing between polling and webhooks
- Configuring serverless scaling
Related skills:
- For API patterns: see
fal-api-reference - For model selection: see
fal-model-guide - For serverless config: see
fal-serverless-guide
fal.ai Performance and Cost Optimization
Strategies for optimizing performance, reducing costs, and scaling fal.ai integrations.
Performance Optimization
Client-Side Optimizations
1. Use Queue-Based Execution
Always prefer
subscribe() over run() for generation tasks:
// Recommended: Queue-based with progress tracking const result = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test" }, logs: true, onQueueUpdate: (update) => { // Show progress to users if (update.status === "IN_PROGRESS") { console.log("Generating..."); } } }); // Only use run() for fast endpoints (< 30s) const quickResult = await fal.run("fal-ai/fast-sdxl", { input: { prompt: "quick test" } });
2. Parallel Requests
Process multiple requests concurrently:
// JavaScript - Parallel execution async function generateBatch(prompts: string[]) { const results = await Promise.all( prompts.map(prompt => fal.subscribe("fal-ai/flux/dev", { input: { prompt } }) ) ); return results; } // With rate limiting async function generateBatchWithLimit(prompts: string[], limit = 5) { const results = []; for (let i = 0; i < prompts.length; i += limit) { const batch = prompts.slice(i, i + limit); const batchResults = await Promise.all( batch.map(prompt => fal.subscribe("fal-ai/flux/dev", { input: { prompt } }) ) ); results.push(...batchResults); // Small delay between batches if (i + limit < prompts.length) { await new Promise(r => setTimeout(r, 100)); } } return results; }
# Python - Async parallel import asyncio import fal_client async def generate_batch(prompts: list[str]) -> list[dict]: tasks = [ fal_client.run_async("fal-ai/flux/dev", arguments={"prompt": p}) for p in prompts ] return await asyncio.gather(*tasks) # With semaphore for rate limiting async def generate_batch_limited(prompts: list[str], limit: int = 5): semaphore = asyncio.Semaphore(limit) async def generate_one(prompt: str): async with semaphore: return await fal_client.run_async( "fal-ai/flux/dev", arguments={"prompt": prompt} ) return await asyncio.gather(*[generate_one(p) for p in prompts])
3. Streaming for Real-Time Feedback
Use streaming for progressive output:
// Show incremental progress const stream = await fal.stream("fal-ai/flux/dev", { input: { prompt: "A landscape" } }); for await (const event of stream) { updateProgressUI(event); } const result = await stream.done();
4. WebSockets for Interactive Apps
For real-time applications with continuous input:
const connection = fal.realtime.connect("fal-ai/lcm-sd15-i2i", { connectionKey: `user-${userId}`, throttleInterval: 128, // Debounce rapid inputs onResult: (result) => { displayImage(result.images[0].url); } }); // Send updates as user types/draws inputElement.addEventListener('input', (e) => { connection.send({ prompt: e.target.value, image_url: currentImage }); });
Server-Side Optimizations (Serverless)
1. Efficient Model Loading
class OptimizedApp(fal.App): machine_type = "GPU-A100" requirements = ["torch", "transformers", "accelerate"] volumes = { "/data": fal.Volume("model-cache") } def setup(self): import torch from transformers import AutoModelForCausalLM # Use fp16 for faster inference and less memory self.model = AutoModelForCausalLM.from_pretrained( "model-name", torch_dtype=torch.float16, device_map="auto", cache_dir="/data/models" # Persistent cache ) # Enable optimizations if hasattr(self.model, 'enable_attention_slicing'): self.model.enable_attention_slicing()
2. Reduce Cold Starts
class WarmApp(fal.App): machine_type = "GPU-A100" keep_alive = 600 # 10 minutes warm min_concurrency = 1 # Always keep one ready # Use lightweight health check @fal.endpoint("/health") def health(self): return {"status": "ok"}
3. Memory Management
class MemoryEfficientApp(fal.App): def setup(self): import torch # Use mixed precision self.model = load_model(torch_dtype=torch.float16) # Enable memory-efficient attention (if using transformers) self.model.enable_xformers_memory_efficient_attention() def teardown(self): # Clean up GPU memory import torch if hasattr(self, 'model'): del self.model torch.cuda.empty_cache() @fal.endpoint("/generate") def generate(self, request): import torch with torch.inference_mode(): # Disable gradient tracking result = self.model(request.input) return result
Cost Optimization
1. Choose the Right Model
| Need | Cheaper Option | Premium Option |
|---|---|---|
| Quick iteration | FLUX Schnell ($) | FLUX.1 Dev ($$) |
| Production | FLUX.1 Dev ($$) | FLUX.2 Pro ($$$) |
| Video preview | Runway Turbo ($$) | Kling Pro ($$$) |
// Development: Use fast/cheap models const preview = await fal.subscribe("fal-ai/flux/schnell", { input: { prompt: "test", num_inference_steps: 4 } }); // Production: Use quality models const final = await fal.subscribe("fal-ai/flux-2-pro", { input: { prompt: "test" } });
2. Optimize Image Sizes
Generate at the size you need, not larger:
// Don't generate larger than needed const result = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test", // Use preset sizes image_size: "square_hd", // 1024x1024 // Or specific dimensions image_size: { width: 800, height: 600 } } });
3. Reduce Inference Steps
Find the minimum steps for acceptable quality:
// Quick previews: fewer steps const preview = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test", num_inference_steps: 15 // Faster, slightly lower quality } }); // Final render: more steps const final = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test", num_inference_steps: 28 // Default, high quality } });
4. Use Webhooks for High Volume
Avoid polling overhead with webhooks:
// Instead of polling const result = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test" }, pollInterval: 1000 // Polling = more API calls }); // Use webhooks const { request_id } = await fal.queue.submit("fal-ai/flux/dev", { input: { prompt: "test" }, webhookUrl: "https://your-server.com/webhook" }); // No polling needed - result delivered to webhook
5. Cache Results
Use seeds for reproducible outputs:
// Cache key based on prompt + seed const cacheKey = `${prompt}-${seed}`; const cached = await cache.get(cacheKey); if (cached) { return cached; } const result = await fal.subscribe("fal-ai/flux/dev", { input: { prompt, seed } }); await cache.set(cacheKey, result); return result;
6. Serverless Cost Optimization
class CostOptimizedApp(fal.App): machine_type = "GPU-A10G" # Cheaper than A100 if sufficient min_concurrency = 0 # Scale to zero when not used keep_alive = 120 # Shorter keep-alive # Use appropriate GPU for model size # T4: < 16GB VRAM models # A10G: 16-24GB VRAM models # A100: 24-80GB VRAM models
Scaling Strategies
1. Horizontal Scaling
class ScalableApp(fal.App): machine_type = "GPU-A100" min_concurrency = 2 # Always have 2 instances max_concurrency = 20 # Scale up to 20 # fal handles auto-scaling based on queue depth
2. Request Batching
class BatchApp(fal.App): @fal.endpoint("/batch") def batch_generate(self, prompts: list[str]) -> list[dict]: # Process multiple prompts in one request results = [] for prompt in prompts: result = self.model(prompt) results.append(result) return results
3. Priority Queues
Use different endpoints for different priorities:
class PriorityApp(fal.App): machine_type = "GPU-A100" @fal.endpoint("/high-priority") def high_priority(self, request): # Separate endpoint for important requests return self.process(request) @fal.endpoint("/standard") def standard(self, request): # Standard processing return self.process(request)
Monitoring and Debugging
1. Add Logging
import logging class MonitoredApp(fal.App): def setup(self): logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) self.logger.info("App starting up") # Load model self.logger.info("Model loaded successfully") @fal.endpoint("/generate") def generate(self, request): import time start = time.time() result = self.process(request) elapsed = time.time() - start self.logger.info(f"Request processed in {elapsed:.2f}s") return result
2. Track Metrics
// Client-side timing const start = Date.now(); const result = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test" }, onQueueUpdate: (update) => { if (update.status === "IN_QUEUE") { console.log(`Queue position: ${update.queue_position}`); } } }); const elapsed = Date.now() - start; console.log(`Total time: ${elapsed}ms`); // Track in your analytics analytics.track("fal_generation", { model: "flux/dev", elapsed_ms: elapsed, queue_time_ms: result.timings?.queue, inference_time_ms: result.timings?.inference });
3. Error Monitoring
try { const result = await fal.subscribe("fal-ai/flux/dev", { input: { prompt: "test" } }); } catch (error) { // Log to error tracking service errorTracker.captureException(error, { tags: { model: "flux/dev", type: error.constructor.name }, extra: { status: error.status, body: error.body } }); // Handle gracefully return fallbackResult(); }
Checklist
Before Production
- Using queue-based execution (
)subscribe - Appropriate model selected for use case
- Image sizes optimized
- Error handling implemented
- Rate limiting in place
- Caching strategy defined
Serverless Deployment
- Correct machine type for model size
- Models loaded in
, not per-requestsetup() - Persistent volumes for large models
- Secrets properly configured
- Health check endpoint
- Logging enabled
Cost Management
- Scale-to-zero enabled (
)min_concurrency = 0 - Appropriate
settingkeep_alive - Using cheaper models for development
- Batch processing where possible
- Webhook callbacks instead of polling
Monitoring
- Latency tracking
- Error rate monitoring
- Cost tracking
- Queue depth alerts