Vibeship-spawner-skills monte-carlo

id: monte-carlo

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: simulation/monte-carlo/skill.yaml
source content

id: monte-carlo name: Monte Carlo Simulation category: simulation description: | Design and implement Monte Carlo methods for uncertainty quantification, risk analysis, and probabilistic simulations across scientific and financial domains. version: 1.0.0

triggers:

  • "monte carlo"
  • "random sampling"
  • "uncertainty quantification"
  • "risk analysis"
  • "stochastic simulation"
  • "MCMC"
  • "variance reduction"
  • "probabilistic"

provides:

  • Monte Carlo simulation design
  • Variance reduction techniques
  • Convergence analysis
  • MCMC sampling methods
  • Sensitivity analysis
  • Risk quantification

patterns: basic_monte_carlo: description: "Fundamental Monte Carlo estimation" example: | import numpy as np from typing import Callable, Tuple, List from dataclasses import dataclass

  @dataclass
  class MCResult:
      """Result of Monte Carlo simulation."""
      mean: float
      std_error: float
      confidence_interval: Tuple[float, float]
      n_samples: int
      converged: bool

  class MonteCarloEstimator:
      """
      Basic Monte Carlo estimator with convergence checking.
      """

      def __init__(
          self,
          target_std_error: float = 0.01,
          confidence_level: float = 0.95,
          max_samples: int = 1_000_000,
          batch_size: int = 10_000
      ):
          self.target_std_error = target_std_error
          self.confidence_level = confidence_level
          self.max_samples = max_samples
          self.batch_size = batch_size

          # z-score for confidence interval
          from scipy.stats import norm
          self.z = norm.ppf((1 + confidence_level) / 2)

      def estimate(
          self,
          sampler: Callable[[], np.ndarray],
          estimand: Callable[[np.ndarray], float]
      ) -> MCResult:
          """
          Estimate expected value via Monte Carlo.

          Args:
              sampler: Function that returns random samples
              estimand: Function to compute from samples

          Returns:
              MCResult with mean, uncertainty, and convergence info
          """
          running_sum = 0.0
          running_sum_sq = 0.0
          n_total = 0

          while n_total < self.max_samples:
              # Generate batch of samples
              samples = sampler()
              values = np.array([estimand(s) for s in samples])

              # Update running statistics
              batch_n = len(values)
              running_sum += np.sum(values)
              running_sum_sq += np.sum(values ** 2)
              n_total += batch_n

              # Compute current estimate
              mean = running_sum / n_total
              variance = (running_sum_sq / n_total) - mean ** 2
              std_error = np.sqrt(variance / n_total)

              # Check convergence
              if std_error < self.target_std_error:
                  return MCResult(
                      mean=mean,
                      std_error=std_error,
                      confidence_interval=(
                          mean - self.z * std_error,
                          mean + self.z * std_error
                      ),
                      n_samples=n_total,
                      converged=True
                  )

          # Max samples reached without convergence
          mean = running_sum / n_total
          variance = (running_sum_sq / n_total) - mean ** 2
          std_error = np.sqrt(variance / n_total)

          return MCResult(
              mean=mean,
              std_error=std_error,
              confidence_interval=(
                  mean - self.z * std_error,
                  mean + self.z * std_error
              ),
              n_samples=n_total,
              converged=False
          )

  # Example: Estimate pi
  def sample_unit_square():
      return np.random.uniform(-1, 1, size=(1000, 2))

  def inside_circle(points):
      return np.mean(np.sum(points ** 2, axis=1) <= 1) * 4

  mc = MonteCarloEstimator(target_std_error=0.001)
  result = mc.estimate(sample_unit_square, inside_circle)
  print(f"Pi estimate: {result.mean:.6f} +/- {result.std_error:.6f}")

variance_reduction: description: "Techniques to reduce Monte Carlo variance" example: | import numpy as np from typing import Callable, Optional

  class VarianceReduction:
      """Collection of variance reduction techniques."""

      @staticmethod
      def antithetic_variates(
          sampler: Callable[[int], np.ndarray],
          estimand: Callable[[np.ndarray], float],
          n_samples: int
      ) -> tuple:
          """
          Antithetic variates: use negative correlation.

          For uniform U, also use 1-U.
          Reduces variance when estimand is monotonic.
          """
          # Generate primary samples
          u = sampler(n_samples // 2)
          # Generate antithetic samples (1 - u for uniform)
          u_anti = 1 - u

          # Evaluate on both
          y = np.array([estimand(s) for s in u])
          y_anti = np.array([estimand(s) for s in u_anti])

          # Average pairs
          y_combined = (y + y_anti) / 2

          mean = np.mean(y_combined)
          std_error = np.std(y_combined) / np.sqrt(len(y_combined))

          return mean, std_error

      @staticmethod
      def control_variates(
          samples: np.ndarray,
          y: np.ndarray,
          control: np.ndarray,
          control_mean: float
      ) -> tuple:
          """
          Control variates: use known-mean correlated variable.

          y_cv = y - c * (control - E[control])
          c chosen to minimize variance.
          """
          # Optimal coefficient
          cov_y_control = np.cov(y, control)[0, 1]
          var_control = np.var(control)
          c_optimal = cov_y_control / var_control

          # Adjusted estimator
          y_cv = y - c_optimal * (control - control_mean)

          mean = np.mean(y_cv)
          std_error = np.std(y_cv) / np.sqrt(len(y_cv))

          # Variance reduction ratio
          var_ratio = np.var(y_cv) / np.var(y)

          return mean, std_error, var_ratio

      @staticmethod
      def importance_sampling(
          target_density: Callable[[np.ndarray], float],
          proposal_sampler: Callable[[int], np.ndarray],
          proposal_density: Callable[[np.ndarray], float],
          estimand: Callable[[np.ndarray], float],
          n_samples: int
      ) -> tuple:
          """
          Importance sampling: sample from easier distribution.

          E_p[f(x)] = E_q[f(x) * p(x) / q(x)]

          Use when target is hard to sample but easy to evaluate.
          """
          # Sample from proposal
          samples = proposal_sampler(n_samples)

          # Compute importance weights
          weights = np.array([
              target_density(s) / proposal_density(s)
              for s in samples
          ])

          # Weighted estimator
          values = np.array([estimand(s) for s in samples])
          weighted_values = values * weights

          # Self-normalized importance sampling (more stable)
          mean = np.sum(weighted_values) / np.sum(weights)

          # Effective sample size (diagnostic)
          ess = np.sum(weights) ** 2 / np.sum(weights ** 2)

          return mean, ess

      @staticmethod
      def stratified_sampling(
          sampler: Callable[[float, float, int], np.ndarray],
          estimand: Callable[[np.ndarray], float],
          n_strata: int,
          samples_per_stratum: int
      ) -> tuple:
          """
          Stratified sampling: divide domain into strata.

          Reduces variance by ensuring coverage of entire domain.
          """
          stratum_means = []

          for i in range(n_strata):
              # Stratum boundaries
              low = i / n_strata
              high = (i + 1) / n_strata

              # Sample within stratum
              samples = sampler(low, high, samples_per_stratum)
              values = np.array([estimand(s) for s in samples])
              stratum_means.append(np.mean(values))

          # Overall mean (equal stratum weights)
          mean = np.mean(stratum_means)
          std_error = np.std(stratum_means) / np.sqrt(n_strata)

          return mean, std_error

mcmc_sampling: description: "Markov Chain Monte Carlo for complex distributions" example: | import numpy as np from typing import Callable, Optional, List from dataclasses import dataclass

  @dataclass
  class MCMCResult:
      """Result of MCMC sampling."""
      samples: np.ndarray
      acceptance_rate: float
      effective_sample_size: float
      r_hat: Optional[float] = None  # Gelman-Rubin diagnostic

  class MetropolisHastings:
      """
      Metropolis-Hastings MCMC sampler.

      For sampling from distributions known up to normalizing constant.
      """

      def __init__(
          self,
          log_target: Callable[[np.ndarray], float],
          proposal_std: float = 1.0,
          dim: int = 1
      ):
          self.log_target = log_target
          self.proposal_std = proposal_std
          self.dim = dim

      def sample(
          self,
          n_samples: int,
          initial: Optional[np.ndarray] = None,
          burn_in: int = 1000,
          thin: int = 1
      ) -> MCMCResult:
          """
          Generate samples using Metropolis-Hastings.

          Args:
              n_samples: Number of samples to return
              initial: Starting point (default: zeros)
              burn_in: Samples to discard for equilibration
              thin: Keep every thin-th sample
          """
          if initial is None:
              initial = np.zeros(self.dim)

          current = initial.copy()
          current_log_p = self.log_target(current)

          samples = []
          n_accepted = 0
          n_total = burn_in + n_samples * thin

          for i in range(n_total):
              # Propose new state
              proposal = current + np.random.normal(
                  0, self.proposal_std, size=self.dim
              )
              proposal_log_p = self.log_target(proposal)

              # Accept/reject
              log_alpha = proposal_log_p - current_log_p
              if np.log(np.random.uniform()) < log_alpha:
                  current = proposal
                  current_log_p = proposal_log_p
                  n_accepted += 1

              # Store after burn-in, with thinning
              if i >= burn_in and (i - burn_in) % thin == 0:
                  samples.append(current.copy())

          samples = np.array(samples)

          return MCMCResult(
              samples=samples,
              acceptance_rate=n_accepted / n_total,
              effective_sample_size=self._compute_ess(samples)
          )

      def _compute_ess(self, samples: np.ndarray) -> float:
          """Compute effective sample size from autocorrelation."""
          n = len(samples)
          if samples.ndim > 1:
              samples = samples[:, 0]  # Use first dimension

          # Compute autocorrelation
          mean = np.mean(samples)
          var = np.var(samples)
          if var == 0:
              return n

          autocorr = np.correlate(samples - mean, samples - mean, mode='full')
          autocorr = autocorr[n-1:] / (var * n)

          # Sum autocorrelations until negative
          tau = 1.0
          for k in range(1, n):
              if autocorr[k] < 0:
                  break
              tau += 2 * autocorr[k]

          return n / tau

  class HamiltonianMonteCarlo:
      """
      Hamiltonian Monte Carlo (HMC) sampler.

      Uses gradient information for efficient exploration.
      """

      def __init__(
          self,
          log_target: Callable[[np.ndarray], float],
          grad_log_target: Callable[[np.ndarray], np.ndarray],
          step_size: float = 0.1,
          n_leapfrog: int = 10,
          dim: int = 1
      ):
          self.log_target = log_target
          self.grad_log_target = grad_log_target
          self.step_size = step_size
          self.n_leapfrog = n_leapfrog
          self.dim = dim

      def sample(
          self,
          n_samples: int,
          initial: Optional[np.ndarray] = None,
          burn_in: int = 1000
      ) -> MCMCResult:
          """Generate samples using HMC."""
          if initial is None:
              initial = np.zeros(self.dim)

          current_q = initial.copy()
          samples = []
          n_accepted = 0

          for i in range(burn_in + n_samples):
              # Sample momentum
              current_p = np.random.normal(size=self.dim)

              # Leapfrog integration
              q = current_q.copy()
              p = current_p.copy()

              # Half step for momentum
              p += 0.5 * self.step_size * self.grad_log_target(q)

              # Full steps
              for _ in range(self.n_leapfrog - 1):
                  q += self.step_size * p
                  p += self.step_size * self.grad_log_target(q)

              # Final half step
              q += self.step_size * p
              p += 0.5 * self.step_size * self.grad_log_target(q)

              # Negate momentum for reversibility
              p = -p

              # Compute Hamiltonian
              current_H = -self.log_target(current_q) + 0.5 * np.sum(current_p ** 2)
              proposed_H = -self.log_target(q) + 0.5 * np.sum(p ** 2)

              # Accept/reject
              if np.log(np.random.uniform()) < current_H - proposed_H:
                  current_q = q
                  n_accepted += 1

              if i >= burn_in:
                  samples.append(current_q.copy())

          return MCMCResult(
              samples=np.array(samples),
              acceptance_rate=n_accepted / (burn_in + n_samples),
              effective_sample_size=len(samples)  # HMC typically has high ESS
          )

quasi_monte_carlo: description: "Low-discrepancy sequences for faster convergence" example: | import numpy as np from typing import Generator

  class QuasiMonteCarlo:
      """
      Quasi-Monte Carlo using low-discrepancy sequences.

      Converges as O(1/N) vs O(1/sqrt(N)) for standard MC.
      """

      @staticmethod
      def halton_sequence(dim: int, n: int, skip: int = 0) -> np.ndarray:
          """
          Generate Halton sequence.

          Low-discrepancy sequence using prime bases.
          """
          def halton_single(index: int, base: int) -> float:
              result = 0.0
              f = 1.0 / base
              i = index
              while i > 0:
                  result += f * (i % base)
                  i //= base
                  f /= base
              return result

          # First dim primes
          primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37][:dim]

          points = np.zeros((n, dim))
          for i in range(n):
              for d in range(dim):
                  points[i, d] = halton_single(i + skip + 1, primes[d])

          return points

      @staticmethod
      def sobol_sequence(dim: int, n: int) -> np.ndarray:
          """
          Generate Sobol sequence.

          Better uniformity than Halton for high dimensions.
          """
          from scipy.stats import qmc
          sampler = qmc.Sobol(d=dim, scramble=True)
          return sampler.random(n)

      @staticmethod
      def latin_hypercube(dim: int, n: int) -> np.ndarray:
          """
          Latin Hypercube Sampling.

          Ensures each dimension has uniform marginal coverage.
          """
          from scipy.stats import qmc
          sampler = qmc.LatinHypercube(d=dim)
          return sampler.random(n)

  # Comparison: QMC vs MC for integration
  def integrate_comparison(f, dim, n_points):
      # Standard Monte Carlo
      mc_samples = np.random.uniform(size=(n_points, dim))
      mc_estimate = np.mean([f(s) for s in mc_samples])

      # Quasi Monte Carlo (Sobol)
      qmc_samples = QuasiMonteCarlo.sobol_sequence(dim, n_points)
      qmc_estimate = np.mean([f(s) for s in qmc_samples])

      return mc_estimate, qmc_estimate

sensitivity_analysis: description: "Global sensitivity analysis with Monte Carlo" example: | import numpy as np from typing import Callable, Dict, List from dataclasses import dataclass

  @dataclass
  class SobolIndices:
      """Sobol sensitivity indices."""
      first_order: Dict[str, float]   # S_i
      total_order: Dict[str, float]   # ST_i
      second_order: Dict[tuple, float]  # S_ij

  class SensitivityAnalysis:
      """
      Global sensitivity analysis using Sobol indices.

      Quantifies how much each input contributes to output variance.
      """

      def __init__(
          self,
          model: Callable[[np.ndarray], float],
          param_bounds: Dict[str, tuple],
          n_samples: int = 10000
      ):
          self.model = model
          self.param_names = list(param_bounds.keys())
          self.bounds = np.array(list(param_bounds.values()))
          self.n_samples = n_samples
          self.dim = len(self.param_names)

      def compute_sobol_indices(self) -> SobolIndices:
          """
          Compute first-order and total-order Sobol indices.

          Uses Saltelli's extension of Sobol sequence.
          """
          # Generate two independent sample matrices
          from scipy.stats import qmc
          sampler = qmc.Sobol(d=self.dim, scramble=True)

          A = sampler.random(self.n_samples)
          B = sampler.random(self.n_samples)

          # Scale to bounds
          A = self._scale_samples(A)
          B = self._scale_samples(B)

          # Evaluate model on base matrices
          f_A = np.array([self.model(a) for a in A])
          f_B = np.array([self.model(b) for b in B])

          # Compute indices for each parameter
          first_order = {}
          total_order = {}

          for i, name in enumerate(self.param_names):
              # Matrix with column i from B, rest from A
              AB_i = A.copy()
              AB_i[:, i] = B[:, i]
              f_AB_i = np.array([self.model(ab) for ab in AB_i])

              # Matrix with column i from A, rest from B
              BA_i = B.copy()
              BA_i[:, i] = A[:, i]
              f_BA_i = np.array([self.model(ba) for ba in BA_i])

              # First-order index
              var_total = np.var(np.concatenate([f_A, f_B]))
              first_order[name] = np.mean(f_B * (f_AB_i - f_A)) / var_total

              # Total-order index
              total_order[name] = 0.5 * np.mean((f_A - f_AB_i) ** 2) / var_total

          return SobolIndices(
              first_order=first_order,
              total_order=total_order,
              second_order={}  # Computed separately if needed
          )

      def _scale_samples(self, samples: np.ndarray) -> np.ndarray:
          """Scale [0,1] samples to parameter bounds."""
          return samples * (self.bounds[:, 1] - self.bounds[:, 0]) + self.bounds[:, 0]

      def morris_screening(self, n_trajectories: int = 10) -> Dict[str, tuple]:
          """
          Morris method for parameter screening.

          Cheaper than Sobol, identifies important vs unimportant parameters.
          """
          from scipy.stats import qmc

          delta = 1.0 / (2 * (n_trajectories - 1))
          elementary_effects = {name: [] for name in self.param_names}

          for _ in range(n_trajectories):
              # Random starting point
              x = np.random.uniform(size=self.dim) * (1 - delta)

              # Random permutation of parameters
              order = np.random.permutation(self.dim)

              x_scaled = self._scale_samples(x.reshape(1, -1))[0]
              f_current = self.model(x_scaled)

              for i in order:
                  x_new = x.copy()
                  x_new[i] += delta

                  x_new_scaled = self._scale_samples(x_new.reshape(1, -1))[0]
                  f_new = self.model(x_new_scaled)

                  # Elementary effect
                  ee = (f_new - f_current) / delta
                  elementary_effects[self.param_names[i]].append(ee)

                  x = x_new
                  f_current = f_new

          # Compute mean and std of elementary effects
          return {
              name: (np.mean(np.abs(ee)), np.std(ee))
              for name, ee in elementary_effects.items()
          }

anti_patterns:

  • pattern: "Fixed sample size without convergence check" problem: "May waste samples or stop before convergence" solution: "Use adaptive sampling with target error threshold"

  • pattern: "Single random seed for all runs" problem: "Results not reproducible, can't parallelize properly" solution: "Use separate streams: np.random.SeedSequence for spawning"

  • pattern: "Ignoring autocorrelation in MCMC" problem: "Effective samples much less than nominal" solution: "Compute ESS, use thinning or better sampler (HMC)"

  • pattern: "Uniform sampling for peaked functions" problem: "Most samples contribute nothing to estimate" solution: "Use importance sampling with proposal near peak"

  • pattern: "No burn-in for MCMC" problem: "Samples biased by initial state" solution: "Discard burn-in samples, check convergence diagnostics"

  • pattern: "High-dimensional Halton sequence" problem: "Correlations appear in high dimensions" solution: "Use Sobol with scrambling for d > 10"

handoffs:

  • to: physics-simulation when: "Monte Carlo applied to physics models" context: "Uncertainty propagation through simulation"

  • to: digital-twin when: "Probabilistic digital twin predictions" context: "Twin with uncertainty quantification"

  • to: agent-based-modeling when: "Stochastic agent behavior" context: "Random agent decisions and interactions"

  • to: statistical-analysis when: "Analysis of simulation results" context: "Statistical inference from MC samples"

ecosystem: python_libraries: - "NumPy - Random sampling, vectorized operations" - "SciPy - QMC sequences, distributions" - "PyMC - Probabilistic programming, MCMC" - "emcee - Ensemble MCMC sampler" - "SALib - Sensitivity analysis" - "Chaospy - Polynomial chaos, UQ"

specialized: - "Stan - High-performance MCMC" - "JAGS - Bayesian MCMC" - "OpenTURNS - Uncertainty quantification" - "UQLab - MATLAB UQ toolkit"

hpc: - "DAKOTA - Design/uncertainty analysis" - "UM-Bridge - Uncertainty models"

references: books: - "Monte Carlo Methods - Kroese et al." - "Monte Carlo Statistical Methods - Robert & Casella" - "Handbook of Monte Carlo Methods - Kroese et al."

papers: - "Equation of State Calculations by Fast Computing Machines - Metropolis et al." - "Monte Carlo Sampling Methods Using Markov Chains - Hastings" - "Global Sensitivity Indices for Nonlinear Models - Sobol"