流程挖掘分析器 process-mining-analyzer

流程挖掘分析器是一个专门用于流程挖掘的技能,包括事件日志分析、流程发现和一致性检查,旨在通过AI技术优化业务流程,提高效率。关键词:流程挖掘、事件日志分析、流程优化。

数据分析 0 次安装 0 次浏览 更新于 2/25/2026

以下是对process-mining-analyzer技能的中文翻译和描述:


name: process-mining-analyzer description: 流程挖掘技能,用于事件日志分析、流程发现和一致性检查。 allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: work-measurement backlog-id: SK-IE-036

process-mining-analyzer

你是 process-mining-analyzer - 一个专门用于流程挖掘的技能,包括事件日志分析、流程发现和一致性检查。

概览

这项技能使得AI驱动的流程挖掘成为可能,包括:

  • 事件日志准备和清洗
  • 流程发现算法(Alpha, Heuristic Miner)
  • 一致性检查
  • 性能分析
  • 瓶颈识别
  • 变体分析
  • 社交网络分析
  • 点图可视化

能力

1. 事件日志准备

import pandas as pd
import numpy as np
from datetime import datetime
from collections import defaultdict

def prepare_event_log(raw_data: pd.DataFrame, mappings: dict):
    """
    准备流程挖掘的事件日志

    raw_data: 包含原始事件数据的DataFrame
    mappings: {'case_id': col, 'activity': col, 'timestamp': col, 'resource': col}
    """
    # 映射列
    event_log = pd.DataFrame()
    event_log['case_id'] = raw_data[mappings['case_id']]
    event_log['activity'] = raw_data[mappings['activity']]
    event_log['timestamp'] = pd.to_datetime(raw_data[mappings['timestamp']])

    if 'resource' in mappings and mappings['resource'] in raw_data.columns:
        event_log['resource'] = raw_data[mappings['resource']]

    # 按案例和时间戳排序
    event_log = event_log.sort_values(['case_id', 'timestamp'])

    # 添加派生列
    event_log['event_id'] = range(len(event_log))

    # 计算到下一个事件的持续时间
    event_log['next_timestamp'] = event_log.groupby('case_id')['timestamp'].shift(-1)
    event_log['duration'] = (event_log['next_timestamp'] - event_log['timestamp']).dt.total_seconds()

    # 统计数据
    stats = {
        'total_events': len(event_log),
        'total_cases': event_log['case_id'].nunique(),
        'unique_activities': event_log['activity'].nunique(),
        'activities': event_log['activity'].unique().tolist(),
        'date_range': {
            'start': str(event_log['timestamp'].min()),
            'end': str(event_log['timestamp'].max())
        }
    }

    return {
        'event_log': event_log,
        'statistics': stats
    }

2. 流程发现

def discover_process_model(event_log: pd.DataFrame):
    """
    使用足迹分析从事件日志中发现流程模型
    """
    # 构建直接跟随图
    dfg = defaultdict(int)
    start_activities = set()
    end_activities = set()

    for case_id, case_data in event_log.groupby('case_id'):
        activities = case_data['activity'].tolist()

        if activities:
            start_activities.add(activities[0])
            end_activities.add(activities[-1])

        for i in range(len(activities) - 1):
            dfg[(activities[i], activities[i + 1])] += 1

    # 构建足迹矩阵
    activities = sorted(event_log['activity'].unique())
    n = len(activities)
    act_idx = {a: i for i, a in enumerate(activities)}

    # 关系:>(直接跟随), <(之前), ||(并行), #(无关系)
    relations = {}

    for a1 in activities:
        for a2 in activities:
            a1_to_a2 = dfg.get((a1, a2), 0)
            a2_to_a1 = dfg.get((a2, a1), 0)

            if a1_to_a2 > 0 and a2_to_a1 > 0:
                relations[(a1, a2)] = '||'  # 并行
            elif a1_to_a2 > 0:
                relations[(a1, a2)] = '>'   # 跟随
            elif a2_to_a1 > 0:
                relations[(a1, a2)] = '<'   # 之前
            else:
                relations[(a1, a2)] = '#'   # 无关系

    return {
        'directly_follows_graph': dict(dfg),
        'start_activities': list(start_activities),
        'end_activities': list(end_activities),
        'footprint': relations,
        'activities': activities
    }

def heuristic_miner(event_log: pd.DataFrame, dependency_threshold: float = 0.5):
    """
    启发式矿工算法用于流程发现
    """
    # 构建频率表
    activity_freq = event_log['activity'].value_counts().to_dict()
    dfg = defaultdict(int)

    for case_id, case_data in event_log.groupby('case_id'):
        activities = case_data['activity'].tolist()
        for i in range(len(activities) - 1):
            dfg[(activities[i], activities[i + 1])] += 1

    # 计算依赖度量
    # D(a,b) = (|a>b| - |b>a|) / (|a>b| + |b>a| + 1)
    dependencies = {}
    activities = list(activity_freq.keys())

    for a in activities:
        for b in activities:
            a_to_b = dfg.get((a, b), 0)
            b_to_a = dfg.get((b, a), 0)

            if a_to_b > 0 or b_to_a > 0:
                dep = (a_to_b - b_to_a) / (a_to_b + b_to_a + 1)
                if abs(dep) >= dependency_threshold:
                    dependencies[(a, b)] = round(dep, 3)

    # 过滤正向依赖(实际跟随关系)
    causal_relations = {k: v for k, v in dependencies.items() if v > 0}

    return {
        'activity_frequencies': activity_freq,
        'directly_follows_frequencies': dict(dfg),
        'dependency_measures': dependencies,
        'causal_relations': causal_relations,
        'threshold': dependency_threshold
    }

3. 一致性检查

def check_conformance(event_log: pd.DataFrame, expected_sequence: list,
                     strict: bool = False):
    """
    检查事件日志中的跟踪与预期流程的一致性

    expected_sequence: 预期顺序中的活动列表
    strict: 如果为True,则需要完全匹配;如果为False,则子序列匹配即可
    """
    results = []

    for case_id, case_data in event_log.groupby('case_id'):
        trace = case_data['activity'].tolist()

        if strict:
            # 完全匹配
            is_conforming = trace == expected_sequence
            deviations = []

            if not is_conforming:
                # 查找偏差
                for i, (actual, expected) in enumerate(zip(trace, expected_sequence)):
                    if actual != expected:
                        deviations.append({
                            'position': i,
                            'expected': expected,
                            'actual': actual
                        })

                # 检查缺失或额外的活动
                if len(trace) < len(expected_sequence):
                    deviations.append({'type': 'missing', 'count': len(expected_sequence) - len(trace)})
                elif len(trace) > len(expected_sequence):
                    deviations.append({'type': 'extra', 'count': len(trace) - len(expected_sequence)})

        else:
            # 检查预期是否为子序列
            exp_idx = 0
            is_conforming = True

            for act in trace:
                if exp_idx < len(expected_sequence) and act == expected_sequence[exp_idx]:
                    exp_idx += 1

            is_conforming = exp_idx == len(expected_sequence)
            deviations = [] if is_conforming else [{'type': 'subsequence_mismatch'}]

        results.append({
            'case_id': case_id,
            'trace': trace,
            'conforming': is_conforming,
            'deviations': deviations
        })

    # 汇总统计
    conforming_count = sum(1 for r in results if r['conforming'])
    total = len(results)

    return {
        'case_results': results,
        'summary': {
            'total_cases': total,
            'conforming_cases': conforming_count,
            'non_conforming_cases': total - conforming_count,
            'conformance_rate': round(conforming_count / total * 100, 1) if total > 0 else 0
        }
    }

4. 性能分析

def analyze_performance(event_log: pd.DataFrame):
    """
    从事件日志分析流程性能
    """
    # 案例持续时间
    case_durations = event_log.groupby('case_id').agg({
        'timestamp': ['min', 'max']
    })
    case_durations.columns = ['start', 'end']
    case_durations['duration_hours'] = (case_durations['end'] - case_durations['start']).dt.total_seconds() / 3600

    # 活动持续时间统计
    activity_stats = event_log.groupby('activity')['duration'].agg(['mean', 'median', 'std', 'count']).reset_index()
    activity_stats.columns = ['activity', 'mean_duration', 'median_duration', 'std_duration', 'count']
    activity_stats['mean_duration'] = activity_stats['mean_duration'] / 60  # 转换为分钟

    # 识别瓶颈(平均持续时间最长)
    bottlenecks = activity_stats.nlargest(3, 'mean_duration')

    # 等待时间分析
    waiting_times = event_log.copy()
    waiting_times['prev_end'] = waiting_times.groupby('case_id')['timestamp'].shift(1)
    waiting_times['waiting_time'] = (waiting_times['timestamp'] - waiting_times['prev_end']).dt.total_seconds() / 60
    waiting_times = waiting_times[waiting_times['waiting_time'].notna()]

    waiting_by_activity = waiting_times.groupby('activity')['waiting_time'].mean().reset_index()
    waiting_by_activity.columns = ['activity', 'avg_waiting_minutes']

    return {
        'case_duration': {
            'mean_hours': round(case_durations['duration_hours'].mean(), 2),
            'median_hours': round(case_durations['duration_hours'].median(), 2),
            'std_hours': round(case_durations['duration_hours'].std(), 2)
        },
        'activity_performance': activity_stats.to_dict('records'),
        'bottlenecks': bottlenecks[['activity', 'mean_duration']].to_dict('records'),
        'waiting_times': waiting_by_activity.to_dict('records')
    }

5. 变体分析

def analyze_variants(event_log: pd.DataFrame):
    """
    分析流程变体(唯一跟踪)
    """
    # 获取每个案例的跟踪
    traces = event_log.groupby('case_id')['activity'].apply(lambda x: '->'.join(x)).reset_index()
    traces.columns = ['case_id', 'trace']

    # 统计变体
    variant_counts = traces['trace'].value_counts().reset_index()
    variant_counts.columns = ['variant', 'count']
    variant_counts['percentage'] = round(variant_counts['count'] / len(traces) * 100, 1)
    variant_counts['cumulative_pct'] = variant_counts['percentage'].cumsum()

    # 按变体获取持续时间
    case_durations = event_log.groupby('case_id').agg({
        'timestamp': ['min', 'max']
    })
    case_durations.columns = ['start', 'end']
    case_durations['duration_hours'] = (case_durations['end'] - case_durations['start']).dt.total_seconds() / 3600
    case_durations = case_durations.reset_index()

    traces_with_duration = traces.merge(case_durations[['case_id', 'duration_hours']], on='case_id')
    variant_duration = traces_with_duration.groupby('trace')['duration_hours'].mean().reset_index()
    variant_duration.columns = ['variant', 'avg_duration_hours']

    variant_analysis = variant_counts.merge(variant_duration, on='variant')

    return {
        'total_cases': len(traces),
        'unique_variants': len(variant_counts),
        'top_variants': variant_analysis.head(10).to_dict('records'),
        'pareto': {
            'variants_for_80pct': len(variant_analysis[variant_analysis['cumulative_pct'] <= 80]) + 1
        }
    }

6. 社交网络分析

def analyze_handoffs(event_log: pd.DataFrame):
    """
    分析资源交接,用于社交网络分析
    """
    if 'resource' not in event_log.columns:
        return {"error": "Resource column not available"}

    handoffs = defaultdict(int)

    for case_id, case_data in event_log.groupby('case_id'):
        resources = case_data['resource'].tolist()
        for i in range(len(resources) - 1):
            if resources[i] != resources[i + 1]:  # 不同资源
                handoffs[(resources[i], resources[i + 1])] += 1

    # 计算指标
    resources = set()
    for (r1, r2) in handoffs.keys():
        resources.add(r1)
        resources.add(r2)

    # 中心性 - 资源参与交接的频率
    in_degree = defaultdict(int)
    out_degree = defaultdict(int)

    for (r1, r2), count in handoffs.items():
        out_degree[r1] += count
        in_degree[r2] += count

    resource_metrics = []
    for r in resources:
        resource_metrics.append({
            'resource': r,
            'in_degree': in_degree[r],
            'out_degree': out_degree[r],
            'total_handoffs': in_degree[r] + out_degree[r]
        })

    resource_metrics.sort(key=lambda x: x['total_handoffs'], reverse=True)

    return {
        'handoff_matrix': dict(handoffs),
        'resource_metrics': resource_metrics,
        'total_handoffs': sum(handoffs.values()),
        'unique_resource_pairs': len(handoffs)
    }

流程集成

这项技能与以下流程集成:

  • process-discovery-analysis.js
  • conformance-checking-audit.js
  • process-improvement-analysis.js

输出格式

{
  "event_log_stats": {
    "total_events": 15000,
    "total_cases": 500,
    "unique_activities": 12
  },
  "process_model": {
    "start_activities": ["Register"],
    "end_activities": ["Close"],
    "directly_follows": {"Register->Approve": 450}
  },
  "conformance": {
    "conformance_rate": 85.2
  },
  "performance": {
    "avg_case_duration_hours": 24.5,
    "bottlenecks": ["Approval", "Review"]
  },
  "variants": {
    "unique": 45,
    "top_variant_coverage": 65.2
  }
}

最佳实践

  1. 清洗事件日志 - 移除噪声和重复项
  2. 验证时间戳 - 确保正确的排序
  3. 定义案例概念 - 清晰的案例ID定义
  4. 迭代发现 - 与领域专家一起细化
  5. 结合技术 - 使用多种算法
  6. 关注偏差 - 它们揭示改进机会

约束

  • 需要高质量的事件数据
  • 复杂流程可能难以可视化
  • 时间戳必须准确
  • 并行活动增加复杂性