CSV处理器Skill csv-processor

CSV处理器是一种高级数据处理工具,专门用于解析、转换和分析CSV文件。它支持自动检测分隔符、处理多种编码、数据清洗、统计分析等功能,适用于数据预处理、ETL流程、数据质量检查和商业智能应用。关键词:CSV处理,数据解析,数据变换,数据分析,数据清洗,ETL,数据工程。

数据工程 0 次安装 0 次浏览 更新于 3/11/2026

name: csv-processor description: 使用高级数据处理能力解析、转换和分析CSV文件。

CSV 处理器技能

使用高级数据处理能力解析、转换和分析CSV文件。

指令

您是一个CSV处理专家。当被调用时:

  1. 解析CSV文件

    • 自动检测分隔符(逗号、制表符、分号、竖线)
    • 处理不同编码(UTF-8、Latin-1、Windows-1252)
    • 处理引用字段和转义字符
    • 正确处理多行字段
    • 检测并使用标题行
  2. 转换数据

    • 基于条件过滤行
    • 选择特定列
    • 排序和分组数据
    • 合并多个CSV文件
    • 将大文件分割成较小块
    • 透视和逆透视数据
  3. 清洗数据

    • 移除重复项
    • 处理缺失值
    • 修剪空白字符
    • 规范化数据格式
    • 修复编码问题
    • 验证数据类型
  4. 分析数据

    • 生成统计信息(总和、平均值、最小值、最大值、计数)
    • 识别数据质量问题
    • 检测异常值
    • 分析列数据类型
    • 计算分布

使用示例

@csv-processor data.csv
@csv-processor --filter "age > 30"
@csv-processor --select "name,email,age"
@csv-processor --merge file1.csv file2.csv
@csv-processor --stats
@csv-processor --clean --remove-duplicates

基本CSV操作

读取CSV文件

Python (pandas)

import pandas as pd

# 基本读取
df = pd.read_csv('data.csv')

# 自定义分隔符
df = pd.read_csv('data.tsv', delimiter='\t')

# 指定编码
df = pd.read_csv('data.csv', encoding='latin-1')

# 跳过行
df = pd.read_csv('data.csv', skiprows=2)

# 选择特定列
df = pd.read_csv('data.csv', usecols=['name', 'email', 'age'])

# 解析日期
df = pd.read_csv('data.csv', parse_dates=['created_at', 'updated_at'])

# 处理缺失值
df = pd.read_csv('data.csv', na_values=['NA', 'N/A', 'null', ''])

# 指定数据类型
df = pd.read_csv('data.csv', dtype={
    'user_id': int,
    'age': int,
    'score': float,
    'active': bool
})

JavaScript (csv-parser)

const fs = require('fs');
const csv = require('csv-parser');

// 基本解析
const results = [];
fs.createReadStream('data.csv')
  .pipe(csv())
  .on('data', (row) => {
    results.push(row);
  })
  .on('end', () => {
    console.log(`Processed ${results.length} rows`);
  });

// 使用自定义选项
const Papa = require('papaparse');

Papa.parse(fs.createReadStream('data.csv'), {
  header: true,
  delimiter: ',',
  skipEmptyLines: true,
  transformHeader: (header) => header.trim().toLowerCase(),
  complete: (results) => {
    console.log('Parsed:', results.data);
  }
});

Python (csv模块)

import csv

# 基本读取
with open('data.csv', 'r', encoding='utf-8') as file:
    reader = csv.DictReader(file)
    for row in reader:
        print(row['name'], row['age'])

# 自定义分隔符
with open('data.csv', 'r') as file:
    reader = csv.reader(file, delimiter='\t')
    for row in reader:
        print(row)

# 处理不同方言
with open('data.csv', 'r') as file:
    dialect = csv.Sniffer().sniff(file.read(1024))
    file.seek(0)
    reader = csv.reader(file, dialect)
    for row in reader:
        print(row)

写入CSV文件

Python (pandas)

# 基本写入
df.to_csv('output.csv', index=False)

# 自定义分隔符
df.to_csv('output.tsv', sep='\t', index=False)

# 指定编码
df.to_csv('output.csv', encoding='utf-8-sig', index=False)

# 仅写入特定列
df[['name', 'email']].to_csv('output.csv', index=False)

# 追加到现有文件
df.to_csv('output.csv', mode='a', header=False, index=False)

# 引用所有字段
df.to_csv('output.csv', quoting=csv.QUOTE_ALL, index=False)

JavaScript (csv-writer)

const createCsvWriter = require('csv-writer').createObjectCsvWriter;

const csvWriter = createCsvWriter({
  path: 'output.csv',
  header: [
    {id: 'name', title: 'Name'},
    {id: 'email', title: 'Email'},
    {id: 'age', title: 'Age'}
  ]
});

const records = [
  {name: 'John Doe', email: 'john@example.com', age: 30},
  {name: 'Jane Smith', email: 'jane@example.com', age: 25}
];

csvWriter.writeRecords(records)
  .then(() => console.log('CSV file written successfully'));

数据转换模式

过滤行

Python (pandas)

# 单条件
filtered = df[df['age'] > 30]

# 多条件(AND)
filtered = df[(df['age'] > 30) & (df['country'] == 'USA')]

# 多条件(OR)
filtered = df[(df['age'] < 18) | (df['age'] > 65)]

# 字符串操作
filtered = df[df['email'].str.contains('@gmail.com')]
filtered = df[df['name'].str.startswith('John')]

# 在列表中
filtered = df[df['country'].isin(['USA', 'Canada', 'Mexico'])]

# 非空值
filtered = df[df['email'].notna()]

# 复杂条件
filtered = df.query('age > 30 and country == "USA" and active == True')

JavaScript

// 使用箭头函数过滤
const filtered = data.filter(row => row.age > 30);

// 多条件
const filtered = data.filter(row =>
  row.age > 30 && row.country === 'USA'
);

// 字符串操作
const filtered = data.filter(row =>
  row.email.includes('@gmail.com')
);

// 复杂过滤
const filtered = data.filter(row => {
  const age = parseInt(row.age);
  return age >= 18 && age <= 65 && row.active === 'true';
});

选择列

Python (pandas)

# 选择单列
names = df['name']

# 选择多列
subset = df[['name', 'email', 'age']]

# 按列类型选择
numeric_cols = df.select_dtypes(include=['int64', 'float64'])
string_cols = df.select_dtypes(include=['object'])

# 选择匹配模式的列
email_cols = df.filter(regex='.*email.*')

# 删除列
df_without = df.drop(['temporary', 'unused'], axis=1)

# 重命名列
df_renamed = df.rename(columns={
    'old_name': 'new_name',
    'email_address': 'email'
})

JavaScript

// 映射以选择列
const subset = data.map(row => ({
  name: row.name,
  email: row.email,
  age: row.age
}));

// 解构
const subset = data.map(({name, email, age}) => ({name, email, age}));

// 动态列选择
const columns = ['name', 'email', 'age'];
const subset = data.map(row =>
  Object.fromEntries(
    columns.map(col => [col, row[col]])
  )
);

排序数据

Python (pandas)

# 按单列排序
sorted_df = df.sort_values('age')

# 降序排序
sorted_df = df.sort_values('age', ascending=False)

# 按多列排序
sorted_df = df.sort_values(['country', 'age'], ascending=[True, False])

# 按索引排序
sorted_df = df.sort_index()

JavaScript

// 按单字段排序
const sorted = data.sort((a, b) => a.age - b.age);

// 降序排序
const sorted = data.sort((a, b) => b.age - a.age);

// 按字符串排序
const sorted = data.sort((a, b) => a.name.localeCompare(b.name));

// 按多字段排序
const sorted = data.sort((a, b) => {
  if (a.country !== b.country) {
    return a.country.localeCompare(b.country);
  }
  return b.age - a.age;
});

分组和聚合

Python (pandas)

# 按单列分组
grouped = df.groupby('country')

# 按组计数
counts = df.groupby('country').size()

# 多聚合
stats = df.groupby('country').agg({
    'age': ['mean', 'min', 'max'],
    'salary': ['sum', 'mean'],
    'user_id': 'count'
})

# 按多列分组
grouped = df.groupby(['country', 'city']).agg({
    'revenue': 'sum',
    'user_id': 'count'
})

# 自定义聚合
df.groupby('country').apply(lambda x: x['salary'].max() - x['salary'].min())

# 透视表
pivot = df.pivot_table(
    values='revenue',
    index='country',
    columns='year',
    aggfunc='sum',
    fill_value=0
)

JavaScript (lodash)

const _ = require('lodash');

// 按字段分组
const grouped = _.groupBy(data, 'country');

// 按组计数
const counts = _.mapValues(
  _.groupBy(data, 'country'),
  group => group.length
);

// 按组求和
const sums = _.mapValues(
  _.groupBy(data, 'country'),
  group => _.sumBy(group, row => parseFloat(row.salary))
);

// 多聚合
const stats = Object.entries(_.groupBy(data, 'country')).map(([country, rows]) => ({
  country,
  count: rows.length,
  avgAge: _.meanBy(rows, row => parseInt(row.age)),
  totalSalary: _.sumBy(rows, row => parseFloat(row.salary))
}));

合并CSV文件

Python (pandas)

# 垂直连接(堆叠行)
df1 = pd.read_csv('file1.csv')
df2 = pd.read_csv('file2.csv')
combined = pd.concat([df1, df2], ignore_index=True)

# 连接(类似SQL合并)
users = pd.read_csv('users.csv')
orders = pd.read_csv('orders.csv')

# 内连接
merged = pd.merge(users, orders, on='user_id', how='inner')

# 左连接
merged = pd.merge(users, orders, on='user_id', how='left')

# 多键
merged = pd.merge(
    users, orders,
    left_on='id',
    right_on='user_id',
    how='left'
)

# 合并不同列名
merged = pd.merge(
    users, orders,
    left_on='user_id',
    right_on='customer_id',
    how='inner'
)

JavaScript

// 连接数组
const file1 = parseCSV('file1.csv');
const file2 = parseCSV('file2.csv');
const combined = [...file1, ...file2];

// 连接数组(类似SQL)
function leftJoin(left, right, leftKey, rightKey) {
  return left.map(leftRow => {
    const rightRow = right.find(r => r[rightKey] === leftRow[leftKey]);
    return {...leftRow, ...rightRow};
  });
}

const merged = leftJoin(users, orders, 'id', 'user_id');

数据清洗操作

移除重复项

Python (pandas)

# 移除重复行
df_unique = df.drop_duplicates()

# 基于特定列
df_unique = df.drop_duplicates(subset=['email'])

# 保留第一次或最后一次出现
df_unique = df.drop_duplicates(subset=['email'], keep='first')
df_unique = df.drop_duplicates(subset=['email'], keep='last')

# 识别重复项
duplicates = df[df.duplicated()]
duplicate_emails = df[df.duplicated(subset=['email'])]

处理缺失值

Python (pandas)

# 检查缺失值
missing_count = df.isnull().sum()
missing_percent = (df.isnull().sum() / len(df)) * 100

# 删除任何缺失值的行
df_clean = df.dropna()

# 删除特定列缺失的行
df_clean = df.dropna(subset=['email'])

# 删除缺失值过多的列
df_clean = df.dropna(axis=1, thresh=len(df)*0.7)

# 填充缺失值
df_filled = df.fillna(0)
df_filled = df.fillna({'age': 0, 'country': 'Unknown'})

# 前向填充
df_filled = df.fillna(method='ffill')

# 用均值/中位数填充
df['age'].fillna(df['age'].mean(), inplace=True)
df['age'].fillna(df['age'].median(), inplace=True)

# 插值
df['value'].interpolate(method='linear', inplace=True)

JavaScript

// 过滤掉缺失值的行
const cleaned = data.filter(row =>
  row.email && row.name && row.age
);

// 填充缺失值
const filled = data.map(row => ({
  ...row,
  age: row.age || 0,
  country: row.country || 'Unknown'
}));

数据验证

Python (pandas)

# 验证电子邮件格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
df['email_valid'] = df['email'].str.match(email_pattern)

# 验证年龄范围
df['age_valid'] = df['age'].between(0, 120)

# 验证必填字段
df['valid'] = df[['name', 'email', 'age']].notna().all(axis=1)

# 检查数据类型
def validate_types(df):
    errors = []

    # 检查数值列
    for col in ['age', 'salary', 'score']:
        if col in df.columns:
            if not pd.api.types.is_numeric_dtype(df[col]):
                errors.append(f"{col} 应为数值型")

    # 检查日期列
    for col in ['created_at', 'updated_at']:
        if col in df.columns:
            try:
                pd.to_datetime(df[col])
            except:
                errors.append(f"{col} 有无效日期")

    return errors

# 移除无效行
df_valid = df[df['email_valid'] & df['age_valid']]

数据规范化

Python (pandas)

# 修剪空白字符
df['name'] = df['name'].str.strip()
df['email'] = df['email'].str.strip()

# 转换为小写
df['email'] = df['email'].str.lower()

# 标准化电话号码
df['phone'] = df['phone'].str.replace(r'[^0-9]', '', regex=True)

# 标准化日期
df['created_at'] = pd.to_datetime(df['created_at'])

# 标准化国家名称
country_mapping = {
    'USA': '美国',
    'US': '美国',
    'United States of America': '美国',
    'UK': '英国'
}
df['country'] = df['country'].replace(country_mapping)

# 转换数据类型
df['age'] = pd.to_numeric(df['age'], errors='coerce')
df['active'] = df['active'].astype(bool)
df['score'] = df['score'].astype(float)

数据分析操作

统计摘要

Python (pandas)

# 基本统计
print(df.describe())

# 所有列的统计(包括非数值)
print(df.describe(include='all'))

# 特定统计
print(f"平均年龄: {df['age'].mean()}")
print(f"中位年龄: {df['age'].median()}")
print(f"标准差: {df['age'].std()}")
print(f"最小值: {df['age'].min()}")
print(f"最大值: {df['age'].max()}")

# 计数值
print(df['country'].value_counts())

# 百分比分布
print(df['country'].value_counts(normalize=True) * 100)

# 交叉表
cross_tab = pd.crosstab(df['country'], df['active'])

# 相关性矩阵
correlation = df[['age', 'salary', 'score']].corr()

数据剖析

Python (pandas)

def profile_dataframe(df):
    """生成全面的数据剖析"""

    profile = {
        'shape': df.shape,
        'columns': list(df.columns),
        'dtypes': df.dtypes.to_dict(),
        'memory_usage': df.memory_usage(deep=True).sum() / 1024**2,  # MB
        'missing_values': df.isnull().sum().to_dict(),
        'missing_percent': (df.isnull().sum() / len(df) * 100).to_dict(),
        'duplicates': df.duplicated().sum(),
        'numeric_summary': df.describe().to_dict(),
        'unique_counts': df.nunique().to_dict()
    }

    # 列特定分析
    for col in df.columns:
        profile[f'{col}_sample'] = df[col].head(5).tolist()

        if df[col].dtype == 'object':
            profile[f'{col}_top_values'] = df[col].value_counts().head(10).to_dict()

        if pd.api.types.is_numeric_dtype(df[col]):
            profile[f'{col}_outliers'] = detect_outliers(df[col])

    return profile

def detect_outliers(series):
    """使用IQR方法检测异常值"""
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    outliers = series[(series < lower_bound) | (series > upper_bound)]
    return {
        'count': len(outliers),
        'percent': (len(outliers) / len(series)) * 100,
        'values': outliers.tolist()
    }

生成报告

def generate_csv_report(df, filename='report.md'):
    """生成全面分析报告"""

    report = f"""# CSV分析报告
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## 数据集概述
- **行数**: {len(df):,}
- **列数**: {len(df.columns)}
- **内存使用**: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB
- **重复项**: {df.duplicated().sum():,}

## 列摘要

| 列名 | 类型 | 非空数 | 唯一值 | 缺失率 |
|--------|------|----------|--------|-----------|
"""

    for col in df.columns:
        dtype = str(df[col].dtype)
        non_null = df[col].count()
        unique = df[col].nunique()
        missing_pct = (df[col].isnull().sum() / len(df)) * 100

        report += f"| {col} | {dtype} | {non_null:,} | {unique:,} | {missing_pct:.1f}% |
"

    report += "
## 数值列统计

"
    report += df.describe().to_markdown()

    report += "

## 数据质量问题

"

    # 缺失值
    missing = df.isnull().sum()
    if missing.sum() > 0:
        report += "### 缺失值
"
        for col, count in missing[missing > 0].items():
            pct = (count / len(df)) * 100
            report += f"- **{col}**: {count:,} ({pct:.1f}%)
"

    # 重复项
    if df.duplicated().sum() > 0:
        report += f"
### 重复项
"
        report += f"- 发现 {df.duplicated().sum():,} 重复行
"

    # 写入报告
    with open(filename, 'w') as f:
        f.write(report)

    print(f"报告生成: {filename}")

高级操作

分割大CSV文件

def split_csv(input_file, rows_per_file=10000):
    """将大CSV分割成较小块"""

    chunk_num = 0

    for chunk in pd.read_csv(input_file, chunksize=rows_per_file):
        output_file = f"{input_file.rsplit('.', 1)[0]}_part{chunk_num}.csv"
        chunk.to_csv(output_file, index=False)
        print(f"创建 {output_file} 包含 {len(chunk)} 行")
        chunk_num += 1

透视和逆透视

# 透视(宽格式)
pivot = df.pivot_table(
    values='revenue',
    index='product',
    columns='month',
    aggfunc='sum'
)

# 逆透视(长格式)
melted = df.melt(
    id_vars=['product', 'category'],
    value_vars=['jan', 'feb', 'mar'],
    var_name='month',
    value_name='revenue'
)

数据类型转换

# 转换列
df['age'] = pd.to_numeric(df['age'], errors='coerce')
df['created_at'] = pd.to_datetime(df['created_at'])
df['active'] = df['active'].astype(bool)

# 解析自定义日期格式
df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y')

# 处理混合类型
df['mixed'] = df['mixed'].astype(str)

性能优化

高效读取大文件

# 分块读取
chunk_size = 10000
chunks = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # 处理块
    processed = chunk[chunk['active'] == True]
    chunks.append(processed)

result = pd.concat(chunks, ignore_index=True)

# 仅读取所需列
df = pd.read_csv('large_file.csv', usecols=['name', 'email', 'age'])

# 使用适当数据类型
df = pd.read_csv('large_file.csv', dtype={
    'id': 'int32',  # 替代int64
    'age': 'int8',  # 小整数
    'category': 'category'  # 分类数据
})

写入大文件

# 分块写入
chunk_size = 10000

for i in range(0, len(df), chunk_size):
    chunk = df.iloc[i:i+chunk_size]
    mode = 'w' if i == 0 else 'a'
    header = i == 0
    chunk.to_csv('output.csv', mode=mode, header=header, index=False)

命令行工具

使用csvkit

# 查看CSV结构
csvcut -n data.csv

# 过滤列
csvcut -c name,email,age data.csv > subset.csv

# 过滤行
csvgrep -c age -r "^[3-9][0-9]$" data.csv > age_30plus.csv

# 转换为JSON
csvjson data.csv > data.json

# 统计
csvstat data.csv

# CSV上的SQL查询
csvsql --query "SELECT country, COUNT(*) FROM data GROUP BY country" data.csv

使用awk

# 打印特定列
awk -F',' '{print $1, $3}' data.csv

# 过滤行
awk -F',' '$3 > 30' data.csv

# 求列和
awk -F',' '{sum+=$3} END {print sum}' data.csv

最佳实践

  1. 始终在处理前验证数据
  2. 使用适当数据类型以节省内存
  3. 尽早处理编码问题
  4. 首先剖析数据以了解结构
  5. 对大文件使用分块
  6. 转换前备份原始文件
  7. 记录转换以实现可重复性
  8. 处理后验证输出
  9. 对CSV处理脚本使用版本控制
  10. 处理完整数据集前用样本数据测试

常见问题和解决方案

问题:编码错误

# 尝试不同编码
for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
    try:
        df = pd.read_csv('data.csv', encoding=encoding)
        print(f"成功使用编码: {encoding}")
        break
    except UnicodeDecodeError:
        continue

问题:分隔符检测

# 自动检测分隔符
with open('data.csv', 'r') as file:
    sample = file.read(1024)
    sniffer = csv.Sniffer()
    delimiter = sniffer.sniff(sample).delimiter

df = pd.read_csv('data.csv', delimiter=delimiter)

问题:内存错误

# 使用分块
chunks = []
for chunk in pd.read_csv('large.csv', chunksize=10000):
    # 处理和过滤
    processed = chunk[chunk['keep'] == True]
    chunks.append(processed)

df = pd.concat(chunks, ignore_index=True)

备注

  • 处理前始终检查CSV结构
  • 首先在小样本上测试转换
  • 对极大数据集考虑使用数据库
  • 记录列含义和数据类型
  • 使用一致的日期和数字格式
  • 定期验证数据质量
  • 保持处理脚本版本受控