Claude-skill-registry loading-insurance-data
加载并预处理保险保单周度数据,支持智能周期检测、多周数据加载、数据验证和清洗。在开始任何保险数据分析任务时使用。
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/loading-insurance-data" ~/.claude/skills/majiayu000-claude-skill-registry-loading-insurance-data && rm -rf "$T"
manifest:
skills/data/loading-insurance-data/SKILL.mdsource content
保险数据加载器
核心功能
处理保险业务周度CSV数据的完整加载流程:
- ✅ 智能检测可用周次
- ✅ 灵活周期范围设置
- ✅ 多年度数据支持(2024/2025)
- ✅ 数据质量验证
- ✅ 标准化预处理
立即使用
from pathlib import Path import pandas as pd # 1. 检测可用周次 def detect_available_weeks(data_folder="2025年保单"): import re available = set() for file in Path(data_folder).glob("*保单第*周*.csv"): match = re.search(r'第(\d+)周', file.name) if match: available.add(int(match.group(1))) return sorted(available) # 2. 加载单周数据 def load_week_data(week, data_folder="2025年保单"): pattern = f"*保单第{week}周*.csv" files = list(Path(data_folder).glob(pattern)) if not files: return None df = pd.read_csv(files[0], encoding='utf-8-sig') df['week_number'] = week return df # 3. 批量加载 weeks = detect_available_weeks() print(f"可用周次: {weeks}") data = {} for week in weeks: df = load_week_data(week) if df is not None: data[week] = df print(f"✅ 第{week}周: {len(df):,}行")
数据文件结构
文件命名规范
{YEAR}保单第{WEEK}周变动成本明细表.csv 示例: - 2025保单第28周变动成本明细表.csv - 2025保单第43周变动成本明细表.csv
关键字段
| 字段名 | 类型 | 说明 |
|---|---|---|
| int | 保单年度 |
| str | 三级机构 |
| str | 业务类型 |
| bool | 是否新能源车 |
| float | 签单保费 |
| float | 满期保费 |
| float | 已报告赔款 |
| float | 费用金额 |
| int | 赔案件数 |
| int | 保单件数 |
数据处理流程
步骤1: 周期检测
# 自动扫描目录 available_weeks = detect_available_weeks() # 确定分析周期 start_week = 28 end_week = 43 analysis_weeks = list(range(start_week, end_week + 1)) # 检查缺失 missing = [w for w in analysis_weeks if w not in available_weeks] if missing: print(f"⚠️ 缺失周次: {missing}")
步骤2: 数据加载
loaded_data = {} for week in analysis_weeks: if week in available_weeks: df = load_week_data(week) if df is not None: loaded_data[week] = df
步骤3: 数据清洗
def preprocess_data(df): """标准化数据处理""" # 过滤本部 df = df[df['third_level_organization'] != '本部'].copy() # 提取保单年度 df['policy_year'] = df['policy_start_year'].astype(str).str.extract(r'(202[45])')[0] # 数值型字段转换 numeric_cols = [ 'signed_premium_yuan', 'matured_premium_yuan', 'reported_claim_payment_yuan', 'expense_amount_yuan', 'claim_case_count', 'policy_count' ] for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) return df # 应用清洗 for week in loaded_data: loaded_data[week] = preprocess_data(loaded_data[week])
步骤4: 数据验证
def validate_data(df, week): """数据质量检查""" issues = [] # 检查空数据 if len(df) == 0: issues.append("数据为空") # 检查关键字段 required = ['signed_premium_yuan', 'third_level_organization'] missing = [col for col in required if col not in df.columns] if missing: issues.append(f"缺失字段: {missing}") # 检查负值 if 'signed_premium_yuan' in df.columns: if (df['signed_premium_yuan'] < 0).any(): issues.append("存在负保费") if issues: print(f"⚠️ 第{week}周问题: {', '.join(issues)}") return False return True
数据输出结构
按年度分组
def group_by_year(loaded_data): """按保单年度分组""" grouped = {'2024': {}, '2025': {}} for week, df in loaded_data.items(): for year in ['2024', '2025']: year_df = df[df['policy_year'] == year] if len(year_df) > 0: grouped[year][week] = year_df return grouped # 使用示例 data_by_year = group_by_year(loaded_data) print(f"2024保单周次: {list(data_by_year['2024'].keys())}") print(f"2025保单周次: {list(data_by_year['2025'].keys())}")
常见问题
Q1: 文件编码错误
问题:
UnicodeDecodeError
解决: 使用
encoding='utf-8-sig'
df = pd.read_csv(file, encoding='utf-8-sig')
Q2: 周次文件缺失
问题: 第32周、第38周等文件不存在
解决: 自动跳过并记录
if week not in available_weeks: print(f"⚠️ 第{week}周: 文件不存在,跳过") continue
Q3: 数据类型错误
问题: 保费字段被识别为字符串
解决: 强制数值转换
df['signed_premium_yuan'] = pd.to_numeric( df['signed_premium_yuan'], errors='coerce' ).fillna(0)
Q4: 内存占用过大
问题: 加载多周数据内存不足
解决: 按需加载或只读取必要列
# 只读取需要的列 usecols = [ 'policy_year', 'third_level_organization', 'signed_premium_yuan', 'matured_premium_yuan', 'reported_claim_payment_yuan' ] df = pd.read_csv(file, usecols=usecols, encoding='utf-8-sig')
性能优化
批量加载优化
from concurrent.futures import ThreadPoolExecutor def load_weeks_parallel(weeks, data_folder): """并行加载多周数据""" with ThreadPoolExecutor(max_workers=4) as executor: futures = { executor.submit(load_week_data, week, data_folder): week for week in weeks } results = {} for future in futures: week = futures[future] try: df = future.result() if df is not None: results[week] = df except Exception as e: print(f"❌ 第{week}周加载失败: {e}") return results
内存管理
import gc # 加载和处理后释放内存 for week in weeks: df = load_week_data(week) processed = preprocess_data(df) # ... 使用数据 ... del df, processed gc.collect()
参考资源
脚本工具:
- 快速数据加载工具scripts/quick_load.py
- 数据质量检查工具scripts/data_validator.py
参考文档:
- 完整字段说明reference/data_schema.md
- 数据质量标准reference/data_quality_rules.md
更新日志
- v1.0 (2025-11-04): 初始版本
- 基础加载功能
- 智能周期检测
- 数据验证和清洗
- 多年度支持