Vibeship-spawner-skills climate-modeling

id: climate-modeling

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: climate/climate-modeling/skill.yaml
source content

id: climate-modeling name: Climate Modeling & Analysis category: climate description: | Work with climate data, models, and projections for climate impact assessment, downscaling, and scenario analysis using CMIP6 and other climate datasets. version: 1.0.0

triggers:

  • "climate model"
  • "climate projection"
  • "CMIP6"
  • "climate data"
  • "downscaling"
  • "climate scenario"
  • "RCP"
  • "SSP"
  • "global warming"

provides:

  • Climate data access and processing
  • CMIP6 model ensemble analysis
  • Statistical and dynamical downscaling
  • Climate impact indicators
  • Scenario comparison and uncertainty
  • Bias correction methods

patterns: cmip6_data_access: description: "Access and process CMIP6 climate model data" example: | import xarray as xr import numpy as np from typing import List, Dict, Optional, Tuple from dataclasses import dataclass from pathlib import Path import cftime

  @dataclass
  class ClimateScenario:
      """CMIP6 scenario specification."""
      ssp: str  # e.g., 'ssp126', 'ssp245', 'ssp370', 'ssp585'
      description: str
      radiative_forcing_2100: float  # W/m^2

  SCENARIOS = {
      'ssp119': ClimateScenario('ssp119', 'Very low emissions, 1.5°C pathway', 1.9),
      'ssp126': ClimateScenario('ssp126', 'Low emissions, sustainable', 2.6),
      'ssp245': ClimateScenario('ssp245', 'Middle of the road', 4.5),
      'ssp370': ClimateScenario('ssp370', 'Regional rivalry, high emissions', 7.0),
      'ssp585': ClimateScenario('ssp585', 'Fossil-fueled development', 8.5),
  }

  class CMIP6DataLoader:
      """
      Load and process CMIP6 climate model data.

      Handles multiple models, scenarios, and variables.
      """

      def __init__(self, data_dir: str):
          self.data_dir = Path(data_dir)
          self.models_loaded: Dict[str, xr.Dataset] = {}

      def load_model(
          self,
          model: str,
          scenario: str,
          variable: str,
          experiment: str = 'historical'
      ) -> xr.Dataset:
          """
          Load single model output.

          Args:
              model: Model name (e.g., 'CESM2', 'GFDL-ESM4')
              scenario: SSP scenario (e.g., 'ssp245')
              variable: Variable name (e.g., 'tas', 'pr', 'hurs')
              experiment: Experiment type
          """
          # Construct file path based on CMIP6 DRS
          pattern = f"{variable}_*_{model}_{scenario}_*_*.nc"
          files = list(self.data_dir.glob(pattern))

          if not files:
              raise FileNotFoundError(f"No files matching {pattern}")

          # Open and concatenate
          ds = xr.open_mfdataset(files, combine='by_coords')

          # Standardize time coordinate
          ds = self._standardize_time(ds)

          return ds

      def load_ensemble(
          self,
          models: List[str],
          scenario: str,
          variable: str
      ) -> xr.Dataset:
          """
          Load multi-model ensemble.

          Returns dataset with 'model' dimension.
          """
          datasets = []
          for model in models:
              try:
                  ds = self.load_model(model, scenario, variable)
                  ds = ds.expand_dims({'model': [model]})
                  datasets.append(ds)
              except FileNotFoundError:
                  print(f"Warning: {model} not found, skipping")

          # Combine along model dimension
          ensemble = xr.concat(datasets, dim='model')
          return ensemble

      def _standardize_time(self, ds: xr.Dataset) -> xr.Dataset:
          """Convert calendar to standard for comparison."""
          if 'time' in ds.dims:
              # Handle different calendars
              try:
                  ds['time'] = ds.indexes['time'].to_datetimeindex()
              except:
                  # Keep as cftime if conversion fails
                  pass
          return ds

      def regrid(
          self,
          ds: xr.Dataset,
          target_grid: xr.Dataset,
          method: str = 'bilinear'
      ) -> xr.Dataset:
          """
          Regrid to common grid for model comparison.
          """
          import xesmf as xe

          regridder = xe.Regridder(ds, target_grid, method)
          return regridder(ds)

  def compute_climate_normals(
      ds: xr.Dataset,
      variable: str,
      baseline_period: Tuple[str, str] = ('1981', '2010')
  ) -> xr.DataArray:
      """
      Compute climatological normals (30-year averages).
      """
      baseline = ds[variable].sel(
          time=slice(baseline_period[0], baseline_period[1])
      )
      return baseline.groupby('time.month').mean('time')

  def compute_anomaly(
      ds: xr.Dataset,
      variable: str,
      baseline_period: Tuple[str, str] = ('1981', '2010')
  ) -> xr.DataArray:
      """
      Compute anomalies relative to baseline period.
      """
      normals = compute_climate_normals(ds, variable, baseline_period)
      anomaly = ds[variable].groupby('time.month') - normals
      return anomaly

bias_correction: description: "Statistical bias correction for climate projections" example: | import numpy as np import xarray as xr from scipy import stats from typing import Tuple

  class BiasCorrection:
      """
      Bias correction methods for climate model outputs.

      Corrects systematic biases relative to observations.
      """

      @staticmethod
      def delta_method(
          model_hist: xr.DataArray,
          model_fut: xr.DataArray,
          obs: xr.DataArray
      ) -> xr.DataArray:
          """
          Delta change method: add model change to observations.

          Simple but preserves observed variability.
          """
          # Compute model change (future - historical mean)
          hist_mean = model_hist.mean('time')
          fut_mean = model_fut.mean('time')
          delta = fut_mean - hist_mean

          # Apply to observed climatology
          obs_mean = obs.mean('time')
          corrected = obs_mean + delta

          return corrected

      @staticmethod
      def quantile_mapping(
          model: xr.DataArray,
          obs: xr.DataArray,
          n_quantiles: int = 100
      ) -> xr.DataArray:
          """
          Quantile mapping: match model distribution to observed.

          Corrects entire distribution, not just mean.
          """
          # Compute quantiles
          quantiles = np.linspace(0, 1, n_quantiles)
          model_q = np.percentile(model.values.flatten(), quantiles * 100)
          obs_q = np.percentile(obs.values.flatten(), quantiles * 100)

          # Create mapping function
          def map_quantiles(x):
              # Find which quantile x falls into
              idx = np.searchsorted(model_q, x)
              idx = np.clip(idx, 0, len(obs_q) - 1)
              return obs_q[idx]

          corrected = xr.apply_ufunc(
              map_quantiles,
              model,
              vectorize=True
          )

          return corrected

      @staticmethod
      def quantile_delta_mapping(
          model_hist: xr.DataArray,
          model_fut: xr.DataArray,
          obs: xr.DataArray,
          n_quantiles: int = 100
      ) -> xr.DataArray:
          """
          Quantile Delta Mapping (QDM): preserves projected changes.

          Better for extremes than standard QM.
          """
          quantiles = np.linspace(0, 1, n_quantiles)

          # Get quantile values
          hist_q = np.percentile(model_hist.values.flatten(), quantiles * 100)
          fut_q = np.percentile(model_fut.values.flatten(), quantiles * 100)
          obs_q = np.percentile(obs.values.flatten(), quantiles * 100)

          # Compute delta ratio for each quantile
          delta = fut_q / (hist_q + 1e-10)  # Avoid division by zero

          # Apply to observed quantiles
          corrected_q = obs_q * delta

          # Map future values through corrected quantiles
          def map_to_corrected(x):
              idx = np.searchsorted(fut_q, x)
              idx = np.clip(idx, 0, len(corrected_q) - 1)
              return corrected_q[idx]

          corrected = xr.apply_ufunc(
              map_to_corrected,
              model_fut,
              vectorize=True
          )

          return corrected

downscaling: description: "Statistical downscaling from GCM to local scale" example: | import numpy as np import xarray as xr from sklearn.linear_model import Ridge from sklearn.preprocessing import StandardScaler from typing import List, Tuple

  class StatisticalDownscaling:
      """
      Statistical downscaling from GCM to local scale.

      Uses relationship between large-scale predictors and local climate.
      """

      def __init__(self, predictors: List[str]):
          self.predictors = predictors
          self.model = Ridge(alpha=1.0)
          self.scaler = StandardScaler()
          self.fitted = False

      def fit(
          self,
          gcm_data: xr.Dataset,
          obs_data: xr.DataArray,
          train_period: Tuple[str, str]
      ):
          """
          Fit downscaling model on historical data.

          Args:
              gcm_data: GCM output with predictor variables
              obs_data: Observed local data (target)
              train_period: Training period (start, end)
          """
          # Extract predictors
          X = self._extract_predictors(
              gcm_data.sel(time=slice(*train_period))
          )
          y = obs_data.sel(time=slice(*train_period)).values.flatten()

          # Remove NaN
          mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y)
          X = X[mask]
          y = y[mask]

          # Scale and fit
          X_scaled = self.scaler.fit_transform(X)
          self.model.fit(X_scaled, y)
          self.fitted = True

      def predict(self, gcm_data: xr.Dataset) -> xr.DataArray:
          """
          Apply downscaling to GCM data.
          """
          if not self.fitted:
              raise RuntimeError("Model must be fitted first")

          X = self._extract_predictors(gcm_data)
          X_scaled = self.scaler.transform(X)
          predictions = self.model.predict(X_scaled)

          # Reconstruct as DataArray
          result = xr.DataArray(
              predictions.reshape(gcm_data.time.shape),
              dims=['time'],
              coords={'time': gcm_data.time}
          )

          return result

      def _extract_predictors(self, ds: xr.Dataset) -> np.ndarray:
          """Extract predictor variables as feature matrix."""
          features = []
          for var in self.predictors:
              if var in ds:
                  # Flatten spatial dimensions, keep time
                  data = ds[var].values
                  if data.ndim > 1:
                      data = data.reshape(data.shape[0], -1)
                  features.append(data)

          return np.hstack(features) if features else np.array([])

  class BCSD:
      """
      Bias Correction Spatial Disaggregation (BCSD).

      Common method for hydrological applications.
      """

      def __init__(self):
          self.obs_climatology = None
          self.model_climatology = None

      def train(
          self,
          gcm_hist: xr.DataArray,
          obs: xr.DataArray,
          high_res_obs: xr.DataArray
      ):
          """
          Train BCSD on historical data.

          Args:
              gcm_hist: Historical GCM data
              obs: Observed data at GCM resolution
              high_res_obs: High-resolution observed data
          """
          # Compute monthly climatologies
          self.obs_climatology = obs.groupby('time.month').mean('time')
          self.model_climatology = gcm_hist.groupby('time.month').mean('time')
          self.high_res_climatology = high_res_obs.groupby('time.month').mean('time')

      def downscale(
          self,
          gcm_fut: xr.DataArray,
          variable: str = 'temperature'
      ) -> xr.DataArray:
          """
          Apply BCSD to future GCM data.
          """
          # Step 1: Bias correct at coarse resolution
          corrected = gcm_fut.groupby('time.month') - self.model_climatology
          corrected = corrected.groupby('time.month') + self.obs_climatology

          # Step 2: Compute monthly anomalies
          anomalies = corrected.groupby('time.month') - self.obs_climatology

          # Step 3: Interpolate anomalies to high resolution
          # (simplified - real BCSD uses conservative remapping)
          high_res_anomalies = anomalies.interp(
              lat=self.high_res_climatology.lat,
              lon=self.high_res_climatology.lon,
              method='linear'
          )

          # Step 4: Add high-resolution climatology
          downscaled = high_res_anomalies.groupby('time.month') + self.high_res_climatology

          return downscaled

climate_indicators: description: "Compute climate impact indicators" example: | import numpy as np import xarray as xr from typing import Dict, Optional

  class ClimateIndicators:
      """
      Compute climate impact indicators from model/observation data.
      """

      @staticmethod
      def heating_degree_days(
          temperature: xr.DataArray,
          base_temp: float = 18.0
      ) -> xr.DataArray:
          """
          Heating Degree Days: energy demand indicator.

          HDD = sum(max(0, base_temp - T))
          """
          daily_hdd = (base_temp - temperature).clip(min=0)
          annual_hdd = daily_hdd.groupby('time.year').sum('time')
          return annual_hdd

      @staticmethod
      def cooling_degree_days(
          temperature: xr.DataArray,
          base_temp: float = 18.0
      ) -> xr.DataArray:
          """
          Cooling Degree Days: cooling demand indicator.

          CDD = sum(max(0, T - base_temp))
          """
          daily_cdd = (temperature - base_temp).clip(min=0)
          annual_cdd = daily_cdd.groupby('time.year').sum('time')
          return annual_cdd

      @staticmethod
      def extreme_heat_days(
          temperature: xr.DataArray,
          threshold: float = 35.0
      ) -> xr.DataArray:
          """
          Count of days exceeding heat threshold.
          """
          above_threshold = (temperature > threshold).astype(int)
          annual_count = above_threshold.groupby('time.year').sum('time')
          return annual_count

      @staticmethod
      def frost_days(
          temperature: xr.DataArray
      ) -> xr.DataArray:
          """
          Count of days with minimum temperature below 0°C.
          """
          frost = (temperature < 0).astype(int)
          annual_count = frost.groupby('time.year').sum('time')
          return annual_count

      @staticmethod
      def precipitation_extremes(
          precipitation: xr.DataArray,
          percentile: float = 95
      ) -> Dict[str, xr.DataArray]:
          """
          Precipitation extreme indicators.
          """
          # Annual maximum daily precipitation
          rx1day = precipitation.groupby('time.year').max('time')

          # Days above percentile threshold
          threshold = precipitation.quantile(percentile / 100, dim='time')
          above_threshold = (precipitation > threshold).astype(int)
          r95p = above_threshold.groupby('time.year').sum('time')

          # Consecutive dry days (CDD)
          dry = (precipitation < 1.0).astype(int)
          # (Simplified - full implementation needs run length encoding)

          return {
              'rx1day': rx1day,
              'r95p': r95p
          }

      @staticmethod
      def growing_season_length(
          temperature: xr.DataArray,
          threshold: float = 5.0
      ) -> xr.DataArray:
          """
          Growing Season Length: days between first and last T > threshold.
          """
          above = temperature > threshold

          def gsl_year(year_data):
              above_arr = year_data.values
              if not above_arr.any():
                  return 0
              first = np.argmax(above_arr)
              last = len(above_arr) - np.argmax(above_arr[::-1]) - 1
              return last - first

          gsl = temperature.groupby('time.year').apply(gsl_year)
          return gsl

      @staticmethod
      def drought_index_spi(
          precipitation: xr.DataArray,
          scale: int = 3
      ) -> xr.DataArray:
          """
          Standardized Precipitation Index (SPI).

          Scale: number of months for accumulation.
          """
          from scipy.stats import gamma, norm

          # Rolling sum
          rolling_precip = precipitation.rolling(time=scale, center=False).sum()

          # Fit gamma distribution, transform to standard normal
          def fit_and_transform(data):
              data = data[~np.isnan(data)]
              if len(data) < 30:
                  return np.nan * np.ones_like(data)

              # Fit gamma
              shape, loc, scale_param = gamma.fit(data, floc=0)

              # Transform to SPI
              cdf = gamma.cdf(data, shape, loc, scale_param)
              spi = norm.ppf(cdf)
              return spi

          # Apply per grid cell (simplified)
          spi = xr.apply_ufunc(
              fit_and_transform,
              rolling_precip,
              input_core_dims=[['time']],
              output_core_dims=[['time']],
              vectorize=True
          )

          return spi

ensemble_analysis: description: "Analyze multi-model ensemble uncertainty" example: | import numpy as np import xarray as xr from typing import Dict, Tuple

  class EnsembleAnalysis:
      """
      Analyze multi-model ensemble for uncertainty quantification.
      """

      @staticmethod
      def ensemble_statistics(
          ensemble: xr.Dataset,
          variable: str,
          model_dim: str = 'model'
      ) -> Dict[str, xr.DataArray]:
          """
          Compute ensemble statistics.
          """
          data = ensemble[variable]

          return {
              'mean': data.mean(dim=model_dim),
              'median': data.median(dim=model_dim),
              'std': data.std(dim=model_dim),
              'min': data.min(dim=model_dim),
              'max': data.max(dim=model_dim),
              'p10': data.quantile(0.1, dim=model_dim),
              'p90': data.quantile(0.9, dim=model_dim)
          }

      @staticmethod
      def model_agreement(
          ensemble: xr.Dataset,
          variable: str,
          threshold: float,
          model_dim: str = 'model'
      ) -> xr.DataArray:
          """
          Compute fraction of models agreeing on sign of change.
          """
          data = ensemble[variable]
          n_models = len(data[model_dim])

          positive = (data > threshold).sum(dim=model_dim) / n_models
          negative = (data < -threshold).sum(dim=model_dim) / n_models

          # Return agreement fraction (max of positive/negative)
          agreement = xr.where(positive > negative, positive, negative)
          return agreement

      @staticmethod
      def robustness_assessment(
          ensemble: xr.Dataset,
          variable: str,
          model_dim: str = 'model'
      ) -> xr.DataArray:
          """
          Assess robustness using IPCC criteria.

          Robust: >66% of models agree on sign AND
                  signal > internal variability
          """
          data = ensemble[variable]
          n_models = len(data[model_dim])

          # Model agreement on sign
          mean_change = data.mean(dim=model_dim)
          same_sign = xr.where(mean_change > 0,
                               (data > 0).sum(dim=model_dim),
                               (data < 0).sum(dim=model_dim))
          agreement = same_sign / n_models

          # Signal vs noise
          signal = np.abs(mean_change)
          noise = data.std(dim=model_dim)
          snr = signal / (noise + 1e-10)

          # Robust where agreement > 0.66 AND snr > 1
          robust = (agreement > 0.66) & (snr > 1)

          return robust.astype(int)

      @staticmethod
      def partition_uncertainty(
          historical: xr.Dataset,
          scenarios: Dict[str, xr.Dataset],
          variable: str
      ) -> Dict[str, xr.DataArray]:
          """
          Partition uncertainty into scenario, model, and internal variability.

          Based on Hawkins & Sutton (2009) method.
          """
          # Internal variability from historical
          internal_var = historical[variable].var(dim='time').mean(dim='model')

          # Model uncertainty (across models, same scenario)
          model_vars = []
          for scenario, data in scenarios.items():
              model_var = data[variable].mean(dim='time').var(dim='model')
              model_vars.append(model_var)
          model_uncertainty = xr.concat(model_vars, dim='scenario').mean(dim='scenario')

          # Scenario uncertainty (across scenarios, same model)
          scenario_means = xr.concat([
              data[variable].mean(dim='time').mean(dim='model')
              for data in scenarios.values()
          ], dim='scenario')
          scenario_uncertainty = scenario_means.var(dim='scenario')

          total = internal_var + model_uncertainty + scenario_uncertainty

          return {
              'internal': internal_var / total,
              'model': model_uncertainty / total,
              'scenario': scenario_uncertainty / total,
              'total_variance': total
          }

anti_patterns:

  • pattern: "Single model for projections" problem: "Ignores structural uncertainty across models" solution: "Use multi-model ensemble, report uncertainty range"

  • pattern: "Raw model output for impacts" problem: "Systematic biases propagate to impact estimates" solution: "Apply bias correction (quantile mapping, etc.)"

  • pattern: "Ignoring calendar differences" problem: "360-day, no-leap, etc. cause date mismatches" solution: "Convert calendars before comparison"

  • pattern: "Interpolating precipitation directly" problem: "Creates unrealistic drizzle, loses intensity" solution: "Use conservative remapping for precipitation"

  • pattern: "Treating SSP as probability" problem: "SSPs are scenarios, not forecasts" solution: "Present as 'what-if' scenarios, not predictions"

handoffs:

  • to: carbon-accounting when: "Need emissions data driving scenarios" context: "SSP scenario emissions pathways"

  • to: renewable-energy when: "Climate impacts on renewable resources" context: "Solar irradiance, wind speed projections"

  • to: energy-systems when: "Climate impacts on energy demand/supply" context: "Heating/cooling degree days, extreme events"

  • to: monte-carlo when: "Uncertainty propagation through impact models" context: "Ensemble uncertainty quantification"

ecosystem: data_sources: - "ESGF - Earth System Grid Federation (CMIP6)" - "Copernicus CDS - Climate Data Store" - "NCEP/NCAR Reanalysis" - "ERA5 - ECMWF Reanalysis" - "WorldClim - High-resolution climatology"

python_tools: - "xarray - NetCDF, climate data handling" - "xesmf - Regridding" - "climpred - Prediction skill" - "xclim - Climate indicators" - "cf-xarray - CF conventions"

packages: - "intake-esm - CMIP6 catalog search" - "cartopy - Map projections" - "regionmask - Region masking" - "climada - Climate risk"

references: frameworks: - "IPCC AR6 - Assessment methodology" - "CORDEX - Regional downscaling" - "ISIMIP - Impact model intercomparison"

papers: - "Hawkins & Sutton (2009) - Uncertainty partitioning" - "Maraun (2016) - Bias correction review" - "Cannon et al. (2015) - QDM method"