git clone https://github.com/vibeforge1111/vibeship-spawner-skills
climate/climate-modeling/skill.yamlid: climate-modeling name: Climate Modeling & Analysis category: climate description: | Work with climate data, models, and projections for climate impact assessment, downscaling, and scenario analysis using CMIP6 and other climate datasets. version: 1.0.0
triggers:
- "climate model"
- "climate projection"
- "CMIP6"
- "climate data"
- "downscaling"
- "climate scenario"
- "RCP"
- "SSP"
- "global warming"
provides:
- Climate data access and processing
- CMIP6 model ensemble analysis
- Statistical and dynamical downscaling
- Climate impact indicators
- Scenario comparison and uncertainty
- Bias correction methods
patterns: cmip6_data_access: description: "Access and process CMIP6 climate model data" example: | import xarray as xr import numpy as np from typing import List, Dict, Optional, Tuple from dataclasses import dataclass from pathlib import Path import cftime
@dataclass class ClimateScenario: """CMIP6 scenario specification.""" ssp: str # e.g., 'ssp126', 'ssp245', 'ssp370', 'ssp585' description: str radiative_forcing_2100: float # W/m^2 SCENARIOS = { 'ssp119': ClimateScenario('ssp119', 'Very low emissions, 1.5°C pathway', 1.9), 'ssp126': ClimateScenario('ssp126', 'Low emissions, sustainable', 2.6), 'ssp245': ClimateScenario('ssp245', 'Middle of the road', 4.5), 'ssp370': ClimateScenario('ssp370', 'Regional rivalry, high emissions', 7.0), 'ssp585': ClimateScenario('ssp585', 'Fossil-fueled development', 8.5), } class CMIP6DataLoader: """ Load and process CMIP6 climate model data. Handles multiple models, scenarios, and variables. """ def __init__(self, data_dir: str): self.data_dir = Path(data_dir) self.models_loaded: Dict[str, xr.Dataset] = {} def load_model( self, model: str, scenario: str, variable: str, experiment: str = 'historical' ) -> xr.Dataset: """ Load single model output. Args: model: Model name (e.g., 'CESM2', 'GFDL-ESM4') scenario: SSP scenario (e.g., 'ssp245') variable: Variable name (e.g., 'tas', 'pr', 'hurs') experiment: Experiment type """ # Construct file path based on CMIP6 DRS pattern = f"{variable}_*_{model}_{scenario}_*_*.nc" files = list(self.data_dir.glob(pattern)) if not files: raise FileNotFoundError(f"No files matching {pattern}") # Open and concatenate ds = xr.open_mfdataset(files, combine='by_coords') # Standardize time coordinate ds = self._standardize_time(ds) return ds def load_ensemble( self, models: List[str], scenario: str, variable: str ) -> xr.Dataset: """ Load multi-model ensemble. Returns dataset with 'model' dimension. """ datasets = [] for model in models: try: ds = self.load_model(model, scenario, variable) ds = ds.expand_dims({'model': [model]}) datasets.append(ds) except FileNotFoundError: print(f"Warning: {model} not found, skipping") # Combine along model dimension ensemble = xr.concat(datasets, dim='model') return ensemble def _standardize_time(self, ds: xr.Dataset) -> xr.Dataset: """Convert calendar to standard for comparison.""" if 'time' in ds.dims: # Handle different calendars try: ds['time'] = ds.indexes['time'].to_datetimeindex() except: # Keep as cftime if conversion fails pass return ds def regrid( self, ds: xr.Dataset, target_grid: xr.Dataset, method: str = 'bilinear' ) -> xr.Dataset: """ Regrid to common grid for model comparison. """ import xesmf as xe regridder = xe.Regridder(ds, target_grid, method) return regridder(ds) def compute_climate_normals( ds: xr.Dataset, variable: str, baseline_period: Tuple[str, str] = ('1981', '2010') ) -> xr.DataArray: """ Compute climatological normals (30-year averages). """ baseline = ds[variable].sel( time=slice(baseline_period[0], baseline_period[1]) ) return baseline.groupby('time.month').mean('time') def compute_anomaly( ds: xr.Dataset, variable: str, baseline_period: Tuple[str, str] = ('1981', '2010') ) -> xr.DataArray: """ Compute anomalies relative to baseline period. """ normals = compute_climate_normals(ds, variable, baseline_period) anomaly = ds[variable].groupby('time.month') - normals return anomaly
bias_correction: description: "Statistical bias correction for climate projections" example: | import numpy as np import xarray as xr from scipy import stats from typing import Tuple
class BiasCorrection: """ Bias correction methods for climate model outputs. Corrects systematic biases relative to observations. """ @staticmethod def delta_method( model_hist: xr.DataArray, model_fut: xr.DataArray, obs: xr.DataArray ) -> xr.DataArray: """ Delta change method: add model change to observations. Simple but preserves observed variability. """ # Compute model change (future - historical mean) hist_mean = model_hist.mean('time') fut_mean = model_fut.mean('time') delta = fut_mean - hist_mean # Apply to observed climatology obs_mean = obs.mean('time') corrected = obs_mean + delta return corrected @staticmethod def quantile_mapping( model: xr.DataArray, obs: xr.DataArray, n_quantiles: int = 100 ) -> xr.DataArray: """ Quantile mapping: match model distribution to observed. Corrects entire distribution, not just mean. """ # Compute quantiles quantiles = np.linspace(0, 1, n_quantiles) model_q = np.percentile(model.values.flatten(), quantiles * 100) obs_q = np.percentile(obs.values.flatten(), quantiles * 100) # Create mapping function def map_quantiles(x): # Find which quantile x falls into idx = np.searchsorted(model_q, x) idx = np.clip(idx, 0, len(obs_q) - 1) return obs_q[idx] corrected = xr.apply_ufunc( map_quantiles, model, vectorize=True ) return corrected @staticmethod def quantile_delta_mapping( model_hist: xr.DataArray, model_fut: xr.DataArray, obs: xr.DataArray, n_quantiles: int = 100 ) -> xr.DataArray: """ Quantile Delta Mapping (QDM): preserves projected changes. Better for extremes than standard QM. """ quantiles = np.linspace(0, 1, n_quantiles) # Get quantile values hist_q = np.percentile(model_hist.values.flatten(), quantiles * 100) fut_q = np.percentile(model_fut.values.flatten(), quantiles * 100) obs_q = np.percentile(obs.values.flatten(), quantiles * 100) # Compute delta ratio for each quantile delta = fut_q / (hist_q + 1e-10) # Avoid division by zero # Apply to observed quantiles corrected_q = obs_q * delta # Map future values through corrected quantiles def map_to_corrected(x): idx = np.searchsorted(fut_q, x) idx = np.clip(idx, 0, len(corrected_q) - 1) return corrected_q[idx] corrected = xr.apply_ufunc( map_to_corrected, model_fut, vectorize=True ) return corrected
downscaling: description: "Statistical downscaling from GCM to local scale" example: | import numpy as np import xarray as xr from sklearn.linear_model import Ridge from sklearn.preprocessing import StandardScaler from typing import List, Tuple
class StatisticalDownscaling: """ Statistical downscaling from GCM to local scale. Uses relationship between large-scale predictors and local climate. """ def __init__(self, predictors: List[str]): self.predictors = predictors self.model = Ridge(alpha=1.0) self.scaler = StandardScaler() self.fitted = False def fit( self, gcm_data: xr.Dataset, obs_data: xr.DataArray, train_period: Tuple[str, str] ): """ Fit downscaling model on historical data. Args: gcm_data: GCM output with predictor variables obs_data: Observed local data (target) train_period: Training period (start, end) """ # Extract predictors X = self._extract_predictors( gcm_data.sel(time=slice(*train_period)) ) y = obs_data.sel(time=slice(*train_period)).values.flatten() # Remove NaN mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y) X = X[mask] y = y[mask] # Scale and fit X_scaled = self.scaler.fit_transform(X) self.model.fit(X_scaled, y) self.fitted = True def predict(self, gcm_data: xr.Dataset) -> xr.DataArray: """ Apply downscaling to GCM data. """ if not self.fitted: raise RuntimeError("Model must be fitted first") X = self._extract_predictors(gcm_data) X_scaled = self.scaler.transform(X) predictions = self.model.predict(X_scaled) # Reconstruct as DataArray result = xr.DataArray( predictions.reshape(gcm_data.time.shape), dims=['time'], coords={'time': gcm_data.time} ) return result def _extract_predictors(self, ds: xr.Dataset) -> np.ndarray: """Extract predictor variables as feature matrix.""" features = [] for var in self.predictors: if var in ds: # Flatten spatial dimensions, keep time data = ds[var].values if data.ndim > 1: data = data.reshape(data.shape[0], -1) features.append(data) return np.hstack(features) if features else np.array([]) class BCSD: """ Bias Correction Spatial Disaggregation (BCSD). Common method for hydrological applications. """ def __init__(self): self.obs_climatology = None self.model_climatology = None def train( self, gcm_hist: xr.DataArray, obs: xr.DataArray, high_res_obs: xr.DataArray ): """ Train BCSD on historical data. Args: gcm_hist: Historical GCM data obs: Observed data at GCM resolution high_res_obs: High-resolution observed data """ # Compute monthly climatologies self.obs_climatology = obs.groupby('time.month').mean('time') self.model_climatology = gcm_hist.groupby('time.month').mean('time') self.high_res_climatology = high_res_obs.groupby('time.month').mean('time') def downscale( self, gcm_fut: xr.DataArray, variable: str = 'temperature' ) -> xr.DataArray: """ Apply BCSD to future GCM data. """ # Step 1: Bias correct at coarse resolution corrected = gcm_fut.groupby('time.month') - self.model_climatology corrected = corrected.groupby('time.month') + self.obs_climatology # Step 2: Compute monthly anomalies anomalies = corrected.groupby('time.month') - self.obs_climatology # Step 3: Interpolate anomalies to high resolution # (simplified - real BCSD uses conservative remapping) high_res_anomalies = anomalies.interp( lat=self.high_res_climatology.lat, lon=self.high_res_climatology.lon, method='linear' ) # Step 4: Add high-resolution climatology downscaled = high_res_anomalies.groupby('time.month') + self.high_res_climatology return downscaled
climate_indicators: description: "Compute climate impact indicators" example: | import numpy as np import xarray as xr from typing import Dict, Optional
class ClimateIndicators: """ Compute climate impact indicators from model/observation data. """ @staticmethod def heating_degree_days( temperature: xr.DataArray, base_temp: float = 18.0 ) -> xr.DataArray: """ Heating Degree Days: energy demand indicator. HDD = sum(max(0, base_temp - T)) """ daily_hdd = (base_temp - temperature).clip(min=0) annual_hdd = daily_hdd.groupby('time.year').sum('time') return annual_hdd @staticmethod def cooling_degree_days( temperature: xr.DataArray, base_temp: float = 18.0 ) -> xr.DataArray: """ Cooling Degree Days: cooling demand indicator. CDD = sum(max(0, T - base_temp)) """ daily_cdd = (temperature - base_temp).clip(min=0) annual_cdd = daily_cdd.groupby('time.year').sum('time') return annual_cdd @staticmethod def extreme_heat_days( temperature: xr.DataArray, threshold: float = 35.0 ) -> xr.DataArray: """ Count of days exceeding heat threshold. """ above_threshold = (temperature > threshold).astype(int) annual_count = above_threshold.groupby('time.year').sum('time') return annual_count @staticmethod def frost_days( temperature: xr.DataArray ) -> xr.DataArray: """ Count of days with minimum temperature below 0°C. """ frost = (temperature < 0).astype(int) annual_count = frost.groupby('time.year').sum('time') return annual_count @staticmethod def precipitation_extremes( precipitation: xr.DataArray, percentile: float = 95 ) -> Dict[str, xr.DataArray]: """ Precipitation extreme indicators. """ # Annual maximum daily precipitation rx1day = precipitation.groupby('time.year').max('time') # Days above percentile threshold threshold = precipitation.quantile(percentile / 100, dim='time') above_threshold = (precipitation > threshold).astype(int) r95p = above_threshold.groupby('time.year').sum('time') # Consecutive dry days (CDD) dry = (precipitation < 1.0).astype(int) # (Simplified - full implementation needs run length encoding) return { 'rx1day': rx1day, 'r95p': r95p } @staticmethod def growing_season_length( temperature: xr.DataArray, threshold: float = 5.0 ) -> xr.DataArray: """ Growing Season Length: days between first and last T > threshold. """ above = temperature > threshold def gsl_year(year_data): above_arr = year_data.values if not above_arr.any(): return 0 first = np.argmax(above_arr) last = len(above_arr) - np.argmax(above_arr[::-1]) - 1 return last - first gsl = temperature.groupby('time.year').apply(gsl_year) return gsl @staticmethod def drought_index_spi( precipitation: xr.DataArray, scale: int = 3 ) -> xr.DataArray: """ Standardized Precipitation Index (SPI). Scale: number of months for accumulation. """ from scipy.stats import gamma, norm # Rolling sum rolling_precip = precipitation.rolling(time=scale, center=False).sum() # Fit gamma distribution, transform to standard normal def fit_and_transform(data): data = data[~np.isnan(data)] if len(data) < 30: return np.nan * np.ones_like(data) # Fit gamma shape, loc, scale_param = gamma.fit(data, floc=0) # Transform to SPI cdf = gamma.cdf(data, shape, loc, scale_param) spi = norm.ppf(cdf) return spi # Apply per grid cell (simplified) spi = xr.apply_ufunc( fit_and_transform, rolling_precip, input_core_dims=[['time']], output_core_dims=[['time']], vectorize=True ) return spi
ensemble_analysis: description: "Analyze multi-model ensemble uncertainty" example: | import numpy as np import xarray as xr from typing import Dict, Tuple
class EnsembleAnalysis: """ Analyze multi-model ensemble for uncertainty quantification. """ @staticmethod def ensemble_statistics( ensemble: xr.Dataset, variable: str, model_dim: str = 'model' ) -> Dict[str, xr.DataArray]: """ Compute ensemble statistics. """ data = ensemble[variable] return { 'mean': data.mean(dim=model_dim), 'median': data.median(dim=model_dim), 'std': data.std(dim=model_dim), 'min': data.min(dim=model_dim), 'max': data.max(dim=model_dim), 'p10': data.quantile(0.1, dim=model_dim), 'p90': data.quantile(0.9, dim=model_dim) } @staticmethod def model_agreement( ensemble: xr.Dataset, variable: str, threshold: float, model_dim: str = 'model' ) -> xr.DataArray: """ Compute fraction of models agreeing on sign of change. """ data = ensemble[variable] n_models = len(data[model_dim]) positive = (data > threshold).sum(dim=model_dim) / n_models negative = (data < -threshold).sum(dim=model_dim) / n_models # Return agreement fraction (max of positive/negative) agreement = xr.where(positive > negative, positive, negative) return agreement @staticmethod def robustness_assessment( ensemble: xr.Dataset, variable: str, model_dim: str = 'model' ) -> xr.DataArray: """ Assess robustness using IPCC criteria. Robust: >66% of models agree on sign AND signal > internal variability """ data = ensemble[variable] n_models = len(data[model_dim]) # Model agreement on sign mean_change = data.mean(dim=model_dim) same_sign = xr.where(mean_change > 0, (data > 0).sum(dim=model_dim), (data < 0).sum(dim=model_dim)) agreement = same_sign / n_models # Signal vs noise signal = np.abs(mean_change) noise = data.std(dim=model_dim) snr = signal / (noise + 1e-10) # Robust where agreement > 0.66 AND snr > 1 robust = (agreement > 0.66) & (snr > 1) return robust.astype(int) @staticmethod def partition_uncertainty( historical: xr.Dataset, scenarios: Dict[str, xr.Dataset], variable: str ) -> Dict[str, xr.DataArray]: """ Partition uncertainty into scenario, model, and internal variability. Based on Hawkins & Sutton (2009) method. """ # Internal variability from historical internal_var = historical[variable].var(dim='time').mean(dim='model') # Model uncertainty (across models, same scenario) model_vars = [] for scenario, data in scenarios.items(): model_var = data[variable].mean(dim='time').var(dim='model') model_vars.append(model_var) model_uncertainty = xr.concat(model_vars, dim='scenario').mean(dim='scenario') # Scenario uncertainty (across scenarios, same model) scenario_means = xr.concat([ data[variable].mean(dim='time').mean(dim='model') for data in scenarios.values() ], dim='scenario') scenario_uncertainty = scenario_means.var(dim='scenario') total = internal_var + model_uncertainty + scenario_uncertainty return { 'internal': internal_var / total, 'model': model_uncertainty / total, 'scenario': scenario_uncertainty / total, 'total_variance': total }
anti_patterns:
-
pattern: "Single model for projections" problem: "Ignores structural uncertainty across models" solution: "Use multi-model ensemble, report uncertainty range"
-
pattern: "Raw model output for impacts" problem: "Systematic biases propagate to impact estimates" solution: "Apply bias correction (quantile mapping, etc.)"
-
pattern: "Ignoring calendar differences" problem: "360-day, no-leap, etc. cause date mismatches" solution: "Convert calendars before comparison"
-
pattern: "Interpolating precipitation directly" problem: "Creates unrealistic drizzle, loses intensity" solution: "Use conservative remapping for precipitation"
-
pattern: "Treating SSP as probability" problem: "SSPs are scenarios, not forecasts" solution: "Present as 'what-if' scenarios, not predictions"
handoffs:
-
to: carbon-accounting when: "Need emissions data driving scenarios" context: "SSP scenario emissions pathways"
-
to: renewable-energy when: "Climate impacts on renewable resources" context: "Solar irradiance, wind speed projections"
-
to: energy-systems when: "Climate impacts on energy demand/supply" context: "Heating/cooling degree days, extreme events"
-
to: monte-carlo when: "Uncertainty propagation through impact models" context: "Ensemble uncertainty quantification"
ecosystem: data_sources: - "ESGF - Earth System Grid Federation (CMIP6)" - "Copernicus CDS - Climate Data Store" - "NCEP/NCAR Reanalysis" - "ERA5 - ECMWF Reanalysis" - "WorldClim - High-resolution climatology"
python_tools: - "xarray - NetCDF, climate data handling" - "xesmf - Regridding" - "climpred - Prediction skill" - "xclim - Climate indicators" - "cf-xarray - CF conventions"
packages: - "intake-esm - CMIP6 catalog search" - "cartopy - Map projections" - "regionmask - Region masking" - "climada - Climate risk"
references: frameworks: - "IPCC AR6 - Assessment methodology" - "CORDEX - Regional downscaling" - "ISIMIP - Impact model intercomparison"
papers: - "Hawkins & Sutton (2009) - Uncertainty partitioning" - "Maraun (2016) - Bias correction review" - "Cannon et al. (2015) - QDM method"