git clone https://github.com/vibeforge1111/vibeship-spawner-skills
simulation/monte-carlo/skill.yamlid: monte-carlo name: Monte Carlo Simulation category: simulation description: | Design and implement Monte Carlo methods for uncertainty quantification, risk analysis, and probabilistic simulations across scientific and financial domains. version: 1.0.0
triggers:
- "monte carlo"
- "random sampling"
- "uncertainty quantification"
- "risk analysis"
- "stochastic simulation"
- "MCMC"
- "variance reduction"
- "probabilistic"
provides:
- Monte Carlo simulation design
- Variance reduction techniques
- Convergence analysis
- MCMC sampling methods
- Sensitivity analysis
- Risk quantification
patterns: basic_monte_carlo: description: "Fundamental Monte Carlo estimation" example: | import numpy as np from typing import Callable, Tuple, List from dataclasses import dataclass
@dataclass class MCResult: """Result of Monte Carlo simulation.""" mean: float std_error: float confidence_interval: Tuple[float, float] n_samples: int converged: bool class MonteCarloEstimator: """ Basic Monte Carlo estimator with convergence checking. """ def __init__( self, target_std_error: float = 0.01, confidence_level: float = 0.95, max_samples: int = 1_000_000, batch_size: int = 10_000 ): self.target_std_error = target_std_error self.confidence_level = confidence_level self.max_samples = max_samples self.batch_size = batch_size # z-score for confidence interval from scipy.stats import norm self.z = norm.ppf((1 + confidence_level) / 2) def estimate( self, sampler: Callable[[], np.ndarray], estimand: Callable[[np.ndarray], float] ) -> MCResult: """ Estimate expected value via Monte Carlo. Args: sampler: Function that returns random samples estimand: Function to compute from samples Returns: MCResult with mean, uncertainty, and convergence info """ running_sum = 0.0 running_sum_sq = 0.0 n_total = 0 while n_total < self.max_samples: # Generate batch of samples samples = sampler() values = np.array([estimand(s) for s in samples]) # Update running statistics batch_n = len(values) running_sum += np.sum(values) running_sum_sq += np.sum(values ** 2) n_total += batch_n # Compute current estimate mean = running_sum / n_total variance = (running_sum_sq / n_total) - mean ** 2 std_error = np.sqrt(variance / n_total) # Check convergence if std_error < self.target_std_error: return MCResult( mean=mean, std_error=std_error, confidence_interval=( mean - self.z * std_error, mean + self.z * std_error ), n_samples=n_total, converged=True ) # Max samples reached without convergence mean = running_sum / n_total variance = (running_sum_sq / n_total) - mean ** 2 std_error = np.sqrt(variance / n_total) return MCResult( mean=mean, std_error=std_error, confidence_interval=( mean - self.z * std_error, mean + self.z * std_error ), n_samples=n_total, converged=False ) # Example: Estimate pi def sample_unit_square(): return np.random.uniform(-1, 1, size=(1000, 2)) def inside_circle(points): return np.mean(np.sum(points ** 2, axis=1) <= 1) * 4 mc = MonteCarloEstimator(target_std_error=0.001) result = mc.estimate(sample_unit_square, inside_circle) print(f"Pi estimate: {result.mean:.6f} +/- {result.std_error:.6f}")
variance_reduction: description: "Techniques to reduce Monte Carlo variance" example: | import numpy as np from typing import Callable, Optional
class VarianceReduction: """Collection of variance reduction techniques.""" @staticmethod def antithetic_variates( sampler: Callable[[int], np.ndarray], estimand: Callable[[np.ndarray], float], n_samples: int ) -> tuple: """ Antithetic variates: use negative correlation. For uniform U, also use 1-U. Reduces variance when estimand is monotonic. """ # Generate primary samples u = sampler(n_samples // 2) # Generate antithetic samples (1 - u for uniform) u_anti = 1 - u # Evaluate on both y = np.array([estimand(s) for s in u]) y_anti = np.array([estimand(s) for s in u_anti]) # Average pairs y_combined = (y + y_anti) / 2 mean = np.mean(y_combined) std_error = np.std(y_combined) / np.sqrt(len(y_combined)) return mean, std_error @staticmethod def control_variates( samples: np.ndarray, y: np.ndarray, control: np.ndarray, control_mean: float ) -> tuple: """ Control variates: use known-mean correlated variable. y_cv = y - c * (control - E[control]) c chosen to minimize variance. """ # Optimal coefficient cov_y_control = np.cov(y, control)[0, 1] var_control = np.var(control) c_optimal = cov_y_control / var_control # Adjusted estimator y_cv = y - c_optimal * (control - control_mean) mean = np.mean(y_cv) std_error = np.std(y_cv) / np.sqrt(len(y_cv)) # Variance reduction ratio var_ratio = np.var(y_cv) / np.var(y) return mean, std_error, var_ratio @staticmethod def importance_sampling( target_density: Callable[[np.ndarray], float], proposal_sampler: Callable[[int], np.ndarray], proposal_density: Callable[[np.ndarray], float], estimand: Callable[[np.ndarray], float], n_samples: int ) -> tuple: """ Importance sampling: sample from easier distribution. E_p[f(x)] = E_q[f(x) * p(x) / q(x)] Use when target is hard to sample but easy to evaluate. """ # Sample from proposal samples = proposal_sampler(n_samples) # Compute importance weights weights = np.array([ target_density(s) / proposal_density(s) for s in samples ]) # Weighted estimator values = np.array([estimand(s) for s in samples]) weighted_values = values * weights # Self-normalized importance sampling (more stable) mean = np.sum(weighted_values) / np.sum(weights) # Effective sample size (diagnostic) ess = np.sum(weights) ** 2 / np.sum(weights ** 2) return mean, ess @staticmethod def stratified_sampling( sampler: Callable[[float, float, int], np.ndarray], estimand: Callable[[np.ndarray], float], n_strata: int, samples_per_stratum: int ) -> tuple: """ Stratified sampling: divide domain into strata. Reduces variance by ensuring coverage of entire domain. """ stratum_means = [] for i in range(n_strata): # Stratum boundaries low = i / n_strata high = (i + 1) / n_strata # Sample within stratum samples = sampler(low, high, samples_per_stratum) values = np.array([estimand(s) for s in samples]) stratum_means.append(np.mean(values)) # Overall mean (equal stratum weights) mean = np.mean(stratum_means) std_error = np.std(stratum_means) / np.sqrt(n_strata) return mean, std_error
mcmc_sampling: description: "Markov Chain Monte Carlo for complex distributions" example: | import numpy as np from typing import Callable, Optional, List from dataclasses import dataclass
@dataclass class MCMCResult: """Result of MCMC sampling.""" samples: np.ndarray acceptance_rate: float effective_sample_size: float r_hat: Optional[float] = None # Gelman-Rubin diagnostic class MetropolisHastings: """ Metropolis-Hastings MCMC sampler. For sampling from distributions known up to normalizing constant. """ def __init__( self, log_target: Callable[[np.ndarray], float], proposal_std: float = 1.0, dim: int = 1 ): self.log_target = log_target self.proposal_std = proposal_std self.dim = dim def sample( self, n_samples: int, initial: Optional[np.ndarray] = None, burn_in: int = 1000, thin: int = 1 ) -> MCMCResult: """ Generate samples using Metropolis-Hastings. Args: n_samples: Number of samples to return initial: Starting point (default: zeros) burn_in: Samples to discard for equilibration thin: Keep every thin-th sample """ if initial is None: initial = np.zeros(self.dim) current = initial.copy() current_log_p = self.log_target(current) samples = [] n_accepted = 0 n_total = burn_in + n_samples * thin for i in range(n_total): # Propose new state proposal = current + np.random.normal( 0, self.proposal_std, size=self.dim ) proposal_log_p = self.log_target(proposal) # Accept/reject log_alpha = proposal_log_p - current_log_p if np.log(np.random.uniform()) < log_alpha: current = proposal current_log_p = proposal_log_p n_accepted += 1 # Store after burn-in, with thinning if i >= burn_in and (i - burn_in) % thin == 0: samples.append(current.copy()) samples = np.array(samples) return MCMCResult( samples=samples, acceptance_rate=n_accepted / n_total, effective_sample_size=self._compute_ess(samples) ) def _compute_ess(self, samples: np.ndarray) -> float: """Compute effective sample size from autocorrelation.""" n = len(samples) if samples.ndim > 1: samples = samples[:, 0] # Use first dimension # Compute autocorrelation mean = np.mean(samples) var = np.var(samples) if var == 0: return n autocorr = np.correlate(samples - mean, samples - mean, mode='full') autocorr = autocorr[n-1:] / (var * n) # Sum autocorrelations until negative tau = 1.0 for k in range(1, n): if autocorr[k] < 0: break tau += 2 * autocorr[k] return n / tau class HamiltonianMonteCarlo: """ Hamiltonian Monte Carlo (HMC) sampler. Uses gradient information for efficient exploration. """ def __init__( self, log_target: Callable[[np.ndarray], float], grad_log_target: Callable[[np.ndarray], np.ndarray], step_size: float = 0.1, n_leapfrog: int = 10, dim: int = 1 ): self.log_target = log_target self.grad_log_target = grad_log_target self.step_size = step_size self.n_leapfrog = n_leapfrog self.dim = dim def sample( self, n_samples: int, initial: Optional[np.ndarray] = None, burn_in: int = 1000 ) -> MCMCResult: """Generate samples using HMC.""" if initial is None: initial = np.zeros(self.dim) current_q = initial.copy() samples = [] n_accepted = 0 for i in range(burn_in + n_samples): # Sample momentum current_p = np.random.normal(size=self.dim) # Leapfrog integration q = current_q.copy() p = current_p.copy() # Half step for momentum p += 0.5 * self.step_size * self.grad_log_target(q) # Full steps for _ in range(self.n_leapfrog - 1): q += self.step_size * p p += self.step_size * self.grad_log_target(q) # Final half step q += self.step_size * p p += 0.5 * self.step_size * self.grad_log_target(q) # Negate momentum for reversibility p = -p # Compute Hamiltonian current_H = -self.log_target(current_q) + 0.5 * np.sum(current_p ** 2) proposed_H = -self.log_target(q) + 0.5 * np.sum(p ** 2) # Accept/reject if np.log(np.random.uniform()) < current_H - proposed_H: current_q = q n_accepted += 1 if i >= burn_in: samples.append(current_q.copy()) return MCMCResult( samples=np.array(samples), acceptance_rate=n_accepted / (burn_in + n_samples), effective_sample_size=len(samples) # HMC typically has high ESS )
quasi_monte_carlo: description: "Low-discrepancy sequences for faster convergence" example: | import numpy as np from typing import Generator
class QuasiMonteCarlo: """ Quasi-Monte Carlo using low-discrepancy sequences. Converges as O(1/N) vs O(1/sqrt(N)) for standard MC. """ @staticmethod def halton_sequence(dim: int, n: int, skip: int = 0) -> np.ndarray: """ Generate Halton sequence. Low-discrepancy sequence using prime bases. """ def halton_single(index: int, base: int) -> float: result = 0.0 f = 1.0 / base i = index while i > 0: result += f * (i % base) i //= base f /= base return result # First dim primes primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37][:dim] points = np.zeros((n, dim)) for i in range(n): for d in range(dim): points[i, d] = halton_single(i + skip + 1, primes[d]) return points @staticmethod def sobol_sequence(dim: int, n: int) -> np.ndarray: """ Generate Sobol sequence. Better uniformity than Halton for high dimensions. """ from scipy.stats import qmc sampler = qmc.Sobol(d=dim, scramble=True) return sampler.random(n) @staticmethod def latin_hypercube(dim: int, n: int) -> np.ndarray: """ Latin Hypercube Sampling. Ensures each dimension has uniform marginal coverage. """ from scipy.stats import qmc sampler = qmc.LatinHypercube(d=dim) return sampler.random(n) # Comparison: QMC vs MC for integration def integrate_comparison(f, dim, n_points): # Standard Monte Carlo mc_samples = np.random.uniform(size=(n_points, dim)) mc_estimate = np.mean([f(s) for s in mc_samples]) # Quasi Monte Carlo (Sobol) qmc_samples = QuasiMonteCarlo.sobol_sequence(dim, n_points) qmc_estimate = np.mean([f(s) for s in qmc_samples]) return mc_estimate, qmc_estimate
sensitivity_analysis: description: "Global sensitivity analysis with Monte Carlo" example: | import numpy as np from typing import Callable, Dict, List from dataclasses import dataclass
@dataclass class SobolIndices: """Sobol sensitivity indices.""" first_order: Dict[str, float] # S_i total_order: Dict[str, float] # ST_i second_order: Dict[tuple, float] # S_ij class SensitivityAnalysis: """ Global sensitivity analysis using Sobol indices. Quantifies how much each input contributes to output variance. """ def __init__( self, model: Callable[[np.ndarray], float], param_bounds: Dict[str, tuple], n_samples: int = 10000 ): self.model = model self.param_names = list(param_bounds.keys()) self.bounds = np.array(list(param_bounds.values())) self.n_samples = n_samples self.dim = len(self.param_names) def compute_sobol_indices(self) -> SobolIndices: """ Compute first-order and total-order Sobol indices. Uses Saltelli's extension of Sobol sequence. """ # Generate two independent sample matrices from scipy.stats import qmc sampler = qmc.Sobol(d=self.dim, scramble=True) A = sampler.random(self.n_samples) B = sampler.random(self.n_samples) # Scale to bounds A = self._scale_samples(A) B = self._scale_samples(B) # Evaluate model on base matrices f_A = np.array([self.model(a) for a in A]) f_B = np.array([self.model(b) for b in B]) # Compute indices for each parameter first_order = {} total_order = {} for i, name in enumerate(self.param_names): # Matrix with column i from B, rest from A AB_i = A.copy() AB_i[:, i] = B[:, i] f_AB_i = np.array([self.model(ab) for ab in AB_i]) # Matrix with column i from A, rest from B BA_i = B.copy() BA_i[:, i] = A[:, i] f_BA_i = np.array([self.model(ba) for ba in BA_i]) # First-order index var_total = np.var(np.concatenate([f_A, f_B])) first_order[name] = np.mean(f_B * (f_AB_i - f_A)) / var_total # Total-order index total_order[name] = 0.5 * np.mean((f_A - f_AB_i) ** 2) / var_total return SobolIndices( first_order=first_order, total_order=total_order, second_order={} # Computed separately if needed ) def _scale_samples(self, samples: np.ndarray) -> np.ndarray: """Scale [0,1] samples to parameter bounds.""" return samples * (self.bounds[:, 1] - self.bounds[:, 0]) + self.bounds[:, 0] def morris_screening(self, n_trajectories: int = 10) -> Dict[str, tuple]: """ Morris method for parameter screening. Cheaper than Sobol, identifies important vs unimportant parameters. """ from scipy.stats import qmc delta = 1.0 / (2 * (n_trajectories - 1)) elementary_effects = {name: [] for name in self.param_names} for _ in range(n_trajectories): # Random starting point x = np.random.uniform(size=self.dim) * (1 - delta) # Random permutation of parameters order = np.random.permutation(self.dim) x_scaled = self._scale_samples(x.reshape(1, -1))[0] f_current = self.model(x_scaled) for i in order: x_new = x.copy() x_new[i] += delta x_new_scaled = self._scale_samples(x_new.reshape(1, -1))[0] f_new = self.model(x_new_scaled) # Elementary effect ee = (f_new - f_current) / delta elementary_effects[self.param_names[i]].append(ee) x = x_new f_current = f_new # Compute mean and std of elementary effects return { name: (np.mean(np.abs(ee)), np.std(ee)) for name, ee in elementary_effects.items() }
anti_patterns:
-
pattern: "Fixed sample size without convergence check" problem: "May waste samples or stop before convergence" solution: "Use adaptive sampling with target error threshold"
-
pattern: "Single random seed for all runs" problem: "Results not reproducible, can't parallelize properly" solution: "Use separate streams: np.random.SeedSequence for spawning"
-
pattern: "Ignoring autocorrelation in MCMC" problem: "Effective samples much less than nominal" solution: "Compute ESS, use thinning or better sampler (HMC)"
-
pattern: "Uniform sampling for peaked functions" problem: "Most samples contribute nothing to estimate" solution: "Use importance sampling with proposal near peak"
-
pattern: "No burn-in for MCMC" problem: "Samples biased by initial state" solution: "Discard burn-in samples, check convergence diagnostics"
-
pattern: "High-dimensional Halton sequence" problem: "Correlations appear in high dimensions" solution: "Use Sobol with scrambling for d > 10"
handoffs:
-
to: physics-simulation when: "Monte Carlo applied to physics models" context: "Uncertainty propagation through simulation"
-
to: digital-twin when: "Probabilistic digital twin predictions" context: "Twin with uncertainty quantification"
-
to: agent-based-modeling when: "Stochastic agent behavior" context: "Random agent decisions and interactions"
-
to: statistical-analysis when: "Analysis of simulation results" context: "Statistical inference from MC samples"
ecosystem: python_libraries: - "NumPy - Random sampling, vectorized operations" - "SciPy - QMC sequences, distributions" - "PyMC - Probabilistic programming, MCMC" - "emcee - Ensemble MCMC sampler" - "SALib - Sensitivity analysis" - "Chaospy - Polynomial chaos, UQ"
specialized: - "Stan - High-performance MCMC" - "JAGS - Bayesian MCMC" - "OpenTURNS - Uncertainty quantification" - "UQLab - MATLAB UQ toolkit"
hpc: - "DAKOTA - Design/uncertainty analysis" - "UM-Bridge - Uncertainty models"
references: books: - "Monte Carlo Methods - Kroese et al." - "Monte Carlo Statistical Methods - Robert & Casella" - "Handbook of Monte Carlo Methods - Kroese et al."
papers: - "Equation of State Calculations by Fast Computing Machines - Metropolis et al." - "Monte Carlo Sampling Methods Using Markov Chains - Hastings" - "Global Sensitivity Indices for Nonlinear Models - Sobol"