帕累托分析器Skill pareto-analyzer

帕累托分析器是一个专门用于识别关键少数原因和优先改进工作的技能,通过AI技术进行数据分析,包括基本帕累托图创建、多级帕累托分析、加权帕累托分析等,关键词包括帕累托分析、数据分析、关键少数、改进优先级。

数据分析 0 次安装 0 次浏览 更新于 2/25/2026

以下是对“pareto-analyzer”技能的中文翻译和描述:


name: pareto-analyzer description: 帕累托分析技能,用于识别关键的少数原因和优先改进工作。 allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: continuous-improvement backlog-id: SK-IE-038

pareto-analyzer

你是 pareto-analyzer - 一个专门用于帕累托分析的技能,以识别关键的少数原因和优先改进工作。

概览

这项技能使AI能够进行帕累托分析,包括:

  • 基本帕累托图创建
  • 多级帕累托分析
  • 加权帕累托分析
  • 前后比较
  • 多维度帕累托
  • 统计验证
  • 关键少数识别
  • 改进优先级

能力

1. 基本帕累托分析

import pandas as pd
import numpy as np

def pareto_analysis(data: pd.DataFrame, category_col: str, value_col: str):
    """
    执行基本帕累托分析

    data: 包含类别和值的DataFrame
    category_col: 类别的列名
    value_col: 值的列名(计数、成本等)
    """
    # 按类别聚合
    summary = data.groupby(category_col)[value_col].sum().reset_index()
    summary.columns = ['category', 'value']

    # 降序排序
    summary = summary.sort_values('value', ascending=False).reset_index(drop=True)

    # 计算百分比
    total = summary['value'].sum()
    summary['percentage'] = summary['value'] / total * 100
    summary['cumulative_value'] = summary['value'].cumsum()
    summary['cumulative_percentage'] = summary['cumulative_value'] / total * 100

    # 识别关键少数(累积至80%的类别)
    vital_few = summary[summary['cumulative_percentage'] <= 80]
    if len(vital_few) == 0:
        vital_few = summary.head(1)
    elif summary[summary['cumulative_percentage'] <= 80].iloc[-1]['cumulative_percentage'] < 80:
        # 添加一个更多以超过80%
        vital_few = summary.head(len(vital_few) + 1)

    trivial_many = summary[~summary['category'].isin(vital_few['category'])]

    return {
        "analysis": summary.to_dict('records'),
        "total_value": total,
        "vital_few": {
            "categories": vital_few['category'].tolist(),
            "count": len(vital_few),
            "value": vital_few['value'].sum(),
            "percentage": round(vital_few['value'].sum() / total * 100, 1)
        },
        "trivial_many": {
            "categories": trivial_many['category'].tolist(),
            "count": len(trivial_many),
            "value": trivial_many['value'].sum(),
            "percentage": round(trivial_many['value'].sum() / total * 100, 1)
        },
        "pareto_ratio": f"{len(vital_few)}/{len(summary)} 类别导致 {round(vital_few['value'].sum() / total * 100)}% 的影响"
    }

2. 多级帕累托

def multi_level_pareto(data: pd.DataFrame, levels: list, value_col: str):
    """
    多级帕累托分析,用于深入分析

    levels: 用于层次分析的列名列表
    例如:['department', 'defect_type', 'root_cause']
    """
    results = {}

    # 第1级 - 顶级帕累托
    level1_result = pareto_analysis(data, levels[0], value_col)
    results['level_1'] = {
        'dimension': levels[0],
        'analysis': level1_result
    }

    # 后续级别 - 顶级类别内的帕累托
    if len(levels) > 1:
        vital_categories = level1_result['vital_few']['categories']

        for level_idx in range(1, len(levels)):
            level_results = []

            for cat in vital_categories:
                filtered = data[data[levels[level_idx - 1]] == cat]

                if len(filtered) > 0:
                    sub_pareto = pareto_analysis(filtered, levels[level_idx], value_col)
                    level_results.append({
                        'parent_category': cat,
                        'analysis': sub_pareto
                    })

            results[f'level_{level_idx + 1}'] = {
                'dimension': levels[level_idx],
                'sub_analyses': level_results
            }

            # 更新下一级的关键类别
            vital_categories = []
            for sub in level_results:
                vital_categories.extend(sub['analysis']['vital_few']['categories'])

    return results

3. 加权帕累托分析

def weighted_pareto(data: pd.DataFrame, category_col: str,
                   frequency_col: str, severity_col: str = None,
                   cost_col: str = None):
    """
    考虑多个因素的加权帕累托

    可以通过频率×严重性或实际成本进行加权
    """
    summary = data.groupby(category_col).agg({
        frequency_col: 'sum'
    }).reset_index()
    summary.columns = ['category', 'frequency']

    # 如果提供严重性加权
    if severity_col:
        severity_avg = data.groupby(category_col)[severity_col].mean().reset_index()
        severity_avg.columns = ['category', 'avg_severity']
        summary = summary.merge(severity_avg, on='category')
        summary['weighted_score'] = summary['frequency'] * summary['avg_severity']
    elif cost_col:
        cost_total = data.groupby(category_col)[cost_col].sum().reset_index()
        cost_total.columns = ['category', 'total_cost']
        summary = summary.merge(cost_total, on='category')
        summary['weighted_score'] = summary['total_cost']
    else:
        summary['weighted_score'] = summary['frequency']

    # 按加权得分排序
    summary = summary.sort_values('weighted_score', ascending=False).reset_index(drop=True)

    # 计算累积
    total = summary['weighted_score'].sum()
    summary['percentage'] = summary['weighted_score'] / total * 100
    summary['cumulative_pct'] = summary['percentage'].cumsum()

    # 比较排名
    freq_rank = summary.sort_values('frequency', ascending=False)['category'].tolist()
    weighted_rank = summary['category'].tolist()

    rank_comparison = []
    for i, cat in enumerate(weighted_rank):
        freq_position = freq_rank.index(cat) + 1
        rank_comparison.append({
            'category': cat,
            'weighted_rank': i + 1,
            'frequency_rank': freq_position,
            'rank_change': freq_position - (i + 1)
        })

    return {
        "weighted_analysis": summary.to_dict('records'),
        "rank_comparison": rank_comparison,
        "weighting_method": "severity" if severity_col else "cost" if cost_col else "frequency",
        "insight": identify_rank_changes(rank_comparison)
    }

def identify_rank_changes(comparisons):
    """识别排名变化显著的类别"""
    movers = [c for c in comparisons if abs(c['rank_change']) >= 2]
    if movers:
        return f"{len(movers)} 类别在加权时排名变化显著"
    return "频率和加权分析之间的排名一致"

4. 前后帕累托比较

def compare_pareto_periods(before_data: pd.DataFrame, after_data: pd.DataFrame,
                          category_col: str, value_col: str):
    """
    比较两个时期的帕累托分析
    """
    before = pareto_analysis(before_data, category_col, value_col)
    after = pareto_analysis(after_data, category_col, value_col)

    # 构建比较
    before_df = pd.DataFrame(before['analysis'])
    after_df = pd.DataFrame(after['analysis'])

    comparison = before_df.merge(
        after_df,
        on='category',
        how='outer',
        suffixes=('_before', '_after')
    )

    comparison = comparison.fillna(0)
    comparison['change'] = comparison['value_after'] - comparison['value_before']
    comparison['change_pct'] = np.where(
        comparison['value_before'] > 0,
        (comparison['change'] / comparison['value_before']) * 100,
        100 if comparison['value_after'] > 0 else 0
    )

    # 汇总指标
    total_before = before['total_value']
    total_after = after['total_value']

    # 识别改进和恶化
    improved = comparison[comparison['change'] < 0].sort_values('change')
    deteriorated = comparison[comparison['change'] > 0].sort_values('change', ascending=False)

    return {
        "before_period": before,
        "after_period": after,
        "comparison": comparison.to_dict('records'),
        "summary": {
            "total_before": total_before,
            "total_after": total_after,
            "total_change": total_after - total_before,
            "total_change_pct": round((total_after - total_before) / total_before * 100, 1)
        },
        "improvements": improved[['category', 'change', 'change_pct']].head(5).to_dict('records'),
        "deteriorations": deteriorated[['category', 'change', 'change_pct']].head(5).to_dict('records'),
        "vital_few_change": compare_vital_few(before, after)
    }

def compare_vital_few(before, after):
    """比较不同时期的少数关键类别"""
    before_vital = set(before['vital_few']['categories'])
    after_vital = set(after['vital_few']['categories'])

    return {
        "added": list(after_vital - before_vital),
        "removed": list(before_vital - after_vital),
        "unchanged": list(before_vital & after_vital)
    }

5. 帕累托图数据生成

def generate_pareto_chart_data(pareto_result: dict, chart_options: dict = None):
    """
    生成帕累托图可视化所需的数据格式
    """
    options = chart_options or {}

    data = pareto_result['analysis']

    chart_data = {
        "chart_type": "pareto",
        "title": options.get('title', '帕累托分析'),
        "x_axis": {
            "label": options.get('x_label', '类别'),
            "values": [d['category'] for d in data]
        },
        "bars": {
            "label": options.get('bar_label', '值'),
            "values": [d['value'] for d in data],
            "color": options.get('bar_color', '#4472C4')
        },
        "line": {
            "label": "累积百分比",
            "values": [d['cumulative_percentage'] for d in data],
            "color": options.get('line_color', '#ED7D31')
        },
        "reference_lines": [
            {"y": 80, "label": "80% 线", "style": "dashed"}
        ],
        "annotations": {
            "vital_few_boundary": len(pareto_result['vital_few']['categories']),
            "vital_few_label": f"关键少数 ({pareto_result['vital_few']['count']} 类别 = {pareto_result['vital_few']['percentage']}%)"
        }
    }

    return chart_data

6. 统计验证

from scipy import stats

def validate_pareto_pattern(data: pd.DataFrame, category_col: str, value_col: str):
    """
    统计验证数据是否遵循帕累托分布
    """
    # 聚合
    summary = data.groupby(category_col)[value_col].sum().reset_index()
    summary.columns = ['category', 'value']
    summary = summary.sort_values('value', ascending=False)

    total = summary['value'].sum()
    n = len(summary)

    # 计算基尼系数
    values = summary['value'].values
    cumulative = np.cumsum(values) / total
    gini = 1 - 2 * np.trapz(cumulative, dx=1/n)

    # 检查80/20规则
    cumsum = 0
    count_for_80 = 0
    for val in values:
        cumsum += val
        count_for_80 += 1
        if cumsum >= total * 0.8:
            break

    percent_categories_for_80 = count_for_80 / n * 100

    # 拟合幂律
    ranks = np.arange(1, n + 1)
    log_ranks = np.log(ranks)
    log_values = np.log(values + 1)  # 加1以处理零值

    slope, intercept, r_value, p_value, std_err = stats.linregress(log_ranks, log_values)

    return {
        "gini_coefficient": round(gini, 3),
        "gini_interpretation": interpret_gini(gini),
        "pareto_check": {
            "percent_categories_for_80": round(percent_categories_for_80, 1),
            "follows_80_20": percent_categories_for_80 <= 30  # 大约20%
        },
        "power_law_fit": {
            "exponent": round(-slope, 3),
            "r_squared": round(r_value**2, 3),
            "is_power_law": r_value**2 > 0.8 and p_value < 0.05
        },
        "recommendation": generate_recommendation(gini, percent_categories_for_80)
    }

def interpret_gini(gini):
    if gini > 0.6:
        return "高集中度 - 强烈的帕累托模式"
    elif gini > 0.4:
        return "中等集中度 - 帕累托分析有用"
    else:
        return "低集中度 - 考虑其他分析方法"

def generate_recommendation(gini, pct_for_80):
    if gini > 0.5 and pct_for_80 <= 30:
        return "强烈的帕累托模式 - 集中努力在关键少数类别"
    elif gini > 0.4:
        return "中等帕累托模式 - 优先考虑顶级类别但监控所有"
    else:
        return "弱帕累托模式 - 考虑分层或其他分析"

流程集成

这项技能与以下流程集成:

  • root-cause-analysis.js
  • quality-improvement-project.js
  • cost-reduction-analysis.js

输出格式

{
  "pareto_analysis": {
    "total_value": 1250,
    "vital_few": {
      "categories": ["缺陷A", "缺陷B", "缺陷C"],
      "count": 3,
      "percentage": 78.5
    },
    "trivial_many": {
      "count": 12,
      "percentage": 21.5
    }
  },
  "statistical_validation": {
    "gini_coefficient": 0.62,
    "follows_80_20": true
  },
  "chart_data": {...},
  "recommendations": [
    "专注于缺陷A - 占总数的45%",
    "一起解决缺陷B和C - 合计33%"
  ]
}

最佳实践

  1. 使用有意义的类别 - 不要太细或太宽泛
  2. 包含所有数据 - 不要排除低频项目
  3. 统计验证 - 确保模式存在
  4. 深入挖掘 - 对关键少数进行第二级帕累托
  5. 随时间跟踪 - 监控优先级的变化
  6. 适当加权 - 考虑严重性和成本

限制

  • 需要分类数据
  • 小样本量可能具有误导性
  • 类别必须是互斥的
  • 模式可能并不总是存在