以下是对process-mining-analyzer技能的中文翻译和描述:
name: process-mining-analyzer description: 流程挖掘技能,用于事件日志分析、流程发现和一致性检查。 allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: work-measurement backlog-id: SK-IE-036
process-mining-analyzer
你是 process-mining-analyzer - 一个专门用于流程挖掘的技能,包括事件日志分析、流程发现和一致性检查。
概览
这项技能使得AI驱动的流程挖掘成为可能,包括:
- 事件日志准备和清洗
- 流程发现算法(Alpha, Heuristic Miner)
- 一致性检查
- 性能分析
- 瓶颈识别
- 变体分析
- 社交网络分析
- 点图可视化
能力
1. 事件日志准备
import pandas as pd
import numpy as np
from datetime import datetime
from collections import defaultdict
def prepare_event_log(raw_data: pd.DataFrame, mappings: dict):
"""
准备流程挖掘的事件日志
raw_data: 包含原始事件数据的DataFrame
mappings: {'case_id': col, 'activity': col, 'timestamp': col, 'resource': col}
"""
# 映射列
event_log = pd.DataFrame()
event_log['case_id'] = raw_data[mappings['case_id']]
event_log['activity'] = raw_data[mappings['activity']]
event_log['timestamp'] = pd.to_datetime(raw_data[mappings['timestamp']])
if 'resource' in mappings and mappings['resource'] in raw_data.columns:
event_log['resource'] = raw_data[mappings['resource']]
# 按案例和时间戳排序
event_log = event_log.sort_values(['case_id', 'timestamp'])
# 添加派生列
event_log['event_id'] = range(len(event_log))
# 计算到下一个事件的持续时间
event_log['next_timestamp'] = event_log.groupby('case_id')['timestamp'].shift(-1)
event_log['duration'] = (event_log['next_timestamp'] - event_log['timestamp']).dt.total_seconds()
# 统计数据
stats = {
'total_events': len(event_log),
'total_cases': event_log['case_id'].nunique(),
'unique_activities': event_log['activity'].nunique(),
'activities': event_log['activity'].unique().tolist(),
'date_range': {
'start': str(event_log['timestamp'].min()),
'end': str(event_log['timestamp'].max())
}
}
return {
'event_log': event_log,
'statistics': stats
}
2. 流程发现
def discover_process_model(event_log: pd.DataFrame):
"""
使用足迹分析从事件日志中发现流程模型
"""
# 构建直接跟随图
dfg = defaultdict(int)
start_activities = set()
end_activities = set()
for case_id, case_data in event_log.groupby('case_id'):
activities = case_data['activity'].tolist()
if activities:
start_activities.add(activities[0])
end_activities.add(activities[-1])
for i in range(len(activities) - 1):
dfg[(activities[i], activities[i + 1])] += 1
# 构建足迹矩阵
activities = sorted(event_log['activity'].unique())
n = len(activities)
act_idx = {a: i for i, a in enumerate(activities)}
# 关系:>(直接跟随), <(之前), ||(并行), #(无关系)
relations = {}
for a1 in activities:
for a2 in activities:
a1_to_a2 = dfg.get((a1, a2), 0)
a2_to_a1 = dfg.get((a2, a1), 0)
if a1_to_a2 > 0 and a2_to_a1 > 0:
relations[(a1, a2)] = '||' # 并行
elif a1_to_a2 > 0:
relations[(a1, a2)] = '>' # 跟随
elif a2_to_a1 > 0:
relations[(a1, a2)] = '<' # 之前
else:
relations[(a1, a2)] = '#' # 无关系
return {
'directly_follows_graph': dict(dfg),
'start_activities': list(start_activities),
'end_activities': list(end_activities),
'footprint': relations,
'activities': activities
}
def heuristic_miner(event_log: pd.DataFrame, dependency_threshold: float = 0.5):
"""
启发式矿工算法用于流程发现
"""
# 构建频率表
activity_freq = event_log['activity'].value_counts().to_dict()
dfg = defaultdict(int)
for case_id, case_data in event_log.groupby('case_id'):
activities = case_data['activity'].tolist()
for i in range(len(activities) - 1):
dfg[(activities[i], activities[i + 1])] += 1
# 计算依赖度量
# D(a,b) = (|a>b| - |b>a|) / (|a>b| + |b>a| + 1)
dependencies = {}
activities = list(activity_freq.keys())
for a in activities:
for b in activities:
a_to_b = dfg.get((a, b), 0)
b_to_a = dfg.get((b, a), 0)
if a_to_b > 0 or b_to_a > 0:
dep = (a_to_b - b_to_a) / (a_to_b + b_to_a + 1)
if abs(dep) >= dependency_threshold:
dependencies[(a, b)] = round(dep, 3)
# 过滤正向依赖(实际跟随关系)
causal_relations = {k: v for k, v in dependencies.items() if v > 0}
return {
'activity_frequencies': activity_freq,
'directly_follows_frequencies': dict(dfg),
'dependency_measures': dependencies,
'causal_relations': causal_relations,
'threshold': dependency_threshold
}
3. 一致性检查
def check_conformance(event_log: pd.DataFrame, expected_sequence: list,
strict: bool = False):
"""
检查事件日志中的跟踪与预期流程的一致性
expected_sequence: 预期顺序中的活动列表
strict: 如果为True,则需要完全匹配;如果为False,则子序列匹配即可
"""
results = []
for case_id, case_data in event_log.groupby('case_id'):
trace = case_data['activity'].tolist()
if strict:
# 完全匹配
is_conforming = trace == expected_sequence
deviations = []
if not is_conforming:
# 查找偏差
for i, (actual, expected) in enumerate(zip(trace, expected_sequence)):
if actual != expected:
deviations.append({
'position': i,
'expected': expected,
'actual': actual
})
# 检查缺失或额外的活动
if len(trace) < len(expected_sequence):
deviations.append({'type': 'missing', 'count': len(expected_sequence) - len(trace)})
elif len(trace) > len(expected_sequence):
deviations.append({'type': 'extra', 'count': len(trace) - len(expected_sequence)})
else:
# 检查预期是否为子序列
exp_idx = 0
is_conforming = True
for act in trace:
if exp_idx < len(expected_sequence) and act == expected_sequence[exp_idx]:
exp_idx += 1
is_conforming = exp_idx == len(expected_sequence)
deviations = [] if is_conforming else [{'type': 'subsequence_mismatch'}]
results.append({
'case_id': case_id,
'trace': trace,
'conforming': is_conforming,
'deviations': deviations
})
# 汇总统计
conforming_count = sum(1 for r in results if r['conforming'])
total = len(results)
return {
'case_results': results,
'summary': {
'total_cases': total,
'conforming_cases': conforming_count,
'non_conforming_cases': total - conforming_count,
'conformance_rate': round(conforming_count / total * 100, 1) if total > 0 else 0
}
}
4. 性能分析
def analyze_performance(event_log: pd.DataFrame):
"""
从事件日志分析流程性能
"""
# 案例持续时间
case_durations = event_log.groupby('case_id').agg({
'timestamp': ['min', 'max']
})
case_durations.columns = ['start', 'end']
case_durations['duration_hours'] = (case_durations['end'] - case_durations['start']).dt.total_seconds() / 3600
# 活动持续时间统计
activity_stats = event_log.groupby('activity')['duration'].agg(['mean', 'median', 'std', 'count']).reset_index()
activity_stats.columns = ['activity', 'mean_duration', 'median_duration', 'std_duration', 'count']
activity_stats['mean_duration'] = activity_stats['mean_duration'] / 60 # 转换为分钟
# 识别瓶颈(平均持续时间最长)
bottlenecks = activity_stats.nlargest(3, 'mean_duration')
# 等待时间分析
waiting_times = event_log.copy()
waiting_times['prev_end'] = waiting_times.groupby('case_id')['timestamp'].shift(1)
waiting_times['waiting_time'] = (waiting_times['timestamp'] - waiting_times['prev_end']).dt.total_seconds() / 60
waiting_times = waiting_times[waiting_times['waiting_time'].notna()]
waiting_by_activity = waiting_times.groupby('activity')['waiting_time'].mean().reset_index()
waiting_by_activity.columns = ['activity', 'avg_waiting_minutes']
return {
'case_duration': {
'mean_hours': round(case_durations['duration_hours'].mean(), 2),
'median_hours': round(case_durations['duration_hours'].median(), 2),
'std_hours': round(case_durations['duration_hours'].std(), 2)
},
'activity_performance': activity_stats.to_dict('records'),
'bottlenecks': bottlenecks[['activity', 'mean_duration']].to_dict('records'),
'waiting_times': waiting_by_activity.to_dict('records')
}
5. 变体分析
def analyze_variants(event_log: pd.DataFrame):
"""
分析流程变体(唯一跟踪)
"""
# 获取每个案例的跟踪
traces = event_log.groupby('case_id')['activity'].apply(lambda x: '->'.join(x)).reset_index()
traces.columns = ['case_id', 'trace']
# 统计变体
variant_counts = traces['trace'].value_counts().reset_index()
variant_counts.columns = ['variant', 'count']
variant_counts['percentage'] = round(variant_counts['count'] / len(traces) * 100, 1)
variant_counts['cumulative_pct'] = variant_counts['percentage'].cumsum()
# 按变体获取持续时间
case_durations = event_log.groupby('case_id').agg({
'timestamp': ['min', 'max']
})
case_durations.columns = ['start', 'end']
case_durations['duration_hours'] = (case_durations['end'] - case_durations['start']).dt.total_seconds() / 3600
case_durations = case_durations.reset_index()
traces_with_duration = traces.merge(case_durations[['case_id', 'duration_hours']], on='case_id')
variant_duration = traces_with_duration.groupby('trace')['duration_hours'].mean().reset_index()
variant_duration.columns = ['variant', 'avg_duration_hours']
variant_analysis = variant_counts.merge(variant_duration, on='variant')
return {
'total_cases': len(traces),
'unique_variants': len(variant_counts),
'top_variants': variant_analysis.head(10).to_dict('records'),
'pareto': {
'variants_for_80pct': len(variant_analysis[variant_analysis['cumulative_pct'] <= 80]) + 1
}
}
6. 社交网络分析
def analyze_handoffs(event_log: pd.DataFrame):
"""
分析资源交接,用于社交网络分析
"""
if 'resource' not in event_log.columns:
return {"error": "Resource column not available"}
handoffs = defaultdict(int)
for case_id, case_data in event_log.groupby('case_id'):
resources = case_data['resource'].tolist()
for i in range(len(resources) - 1):
if resources[i] != resources[i + 1]: # 不同资源
handoffs[(resources[i], resources[i + 1])] += 1
# 计算指标
resources = set()
for (r1, r2) in handoffs.keys():
resources.add(r1)
resources.add(r2)
# 中心性 - 资源参与交接的频率
in_degree = defaultdict(int)
out_degree = defaultdict(int)
for (r1, r2), count in handoffs.items():
out_degree[r1] += count
in_degree[r2] += count
resource_metrics = []
for r in resources:
resource_metrics.append({
'resource': r,
'in_degree': in_degree[r],
'out_degree': out_degree[r],
'total_handoffs': in_degree[r] + out_degree[r]
})
resource_metrics.sort(key=lambda x: x['total_handoffs'], reverse=True)
return {
'handoff_matrix': dict(handoffs),
'resource_metrics': resource_metrics,
'total_handoffs': sum(handoffs.values()),
'unique_resource_pairs': len(handoffs)
}
流程集成
这项技能与以下流程集成:
process-discovery-analysis.jsconformance-checking-audit.jsprocess-improvement-analysis.js
输出格式
{
"event_log_stats": {
"total_events": 15000,
"total_cases": 500,
"unique_activities": 12
},
"process_model": {
"start_activities": ["Register"],
"end_activities": ["Close"],
"directly_follows": {"Register->Approve": 450}
},
"conformance": {
"conformance_rate": 85.2
},
"performance": {
"avg_case_duration_hours": 24.5,
"bottlenecks": ["Approval", "Review"]
},
"variants": {
"unique": 45,
"top_variant_coverage": 65.2
}
}
最佳实践
- 清洗事件日志 - 移除噪声和重复项
- 验证时间戳 - 确保正确的排序
- 定义案例概念 - 清晰的案例ID定义
- 迭代发现 - 与领域专家一起细化
- 结合技术 - 使用多种算法
- 关注偏差 - 它们揭示改进机会
约束
- 需要高质量的事件数据
- 复杂流程可能难以可视化
- 时间戳必须准确
- 并行活动增加复杂性