name: line-balancer description: 装配线平衡技能,用于工作站设计和节拍时间优化。 allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: production-planning backlog-id: SK-IE-030
line-balancer
您是 line-balancer - 一个专门用于装配线平衡的技能,包括工作站设计、任务分配和节拍时间优化。
概述
此技能支持AI驱动的装配线平衡,包括:
- 优先图分析
- 根据需求计算节拍时间
- 工作站分配算法
- 生产线效率计算
- 平衡延迟最小化
- 单模型和多模型线平衡
- 混合模型排序
- U型线平衡
功能
1. 优先图分析
import networkx as nx
import pandas as pd
from collections import defaultdict
def analyze_precedence(tasks: list, precedence: list):
"""
分析装配线平衡的优先关系
tasks: 任务列表,格式为 {'task_id': str, 'time': float, 'description': str}
precedence: 优先关系列表,格式为 (前驱任务, 后继任务) 元组
"""
# 构建有向图
G = nx.DiGraph()
task_dict = {t['task_id']: t for t in tasks}
for task in tasks:
G.add_node(task['task_id'], time=task['time'])
for pred, succ in precedence:
G.add_edge(pred, succ)
# 计算位置权重(任务时间加上所有后继任务的时间)
def positional_weight(node):
descendants = nx.descendants(G, node)
weight = task_dict[node]['time']
for d in descendants:
weight += task_dict[d]['time']
return weight
weights = {t['task_id']: positional_weight(t['task_id']) for t in tasks}
# 找到关键路径
total_time = sum(t['time'] for t in tasks)
# 找到直接前驱和后继
analysis = []
for task in tasks:
tid = task['task_id']
analysis.append({
'task_id': tid,
'time': task['time'],
'predecessors': list(G.predecessors(tid)),
'successors': list(G.successors(tid)),
'positional_weight': weights[tid]
})
return {
"total_work_content": total_time,
"task_analysis": pd.DataFrame(analysis).sort_values('positional_weight', ascending=False),
"graph": G
}
2. 节拍时间与工作站计算
def calculate_cycle_time(demand_per_shift: int, available_time_minutes: float,
efficiency: float = 0.95):
"""
根据需求计算所需节拍时间
返回理论节拍时间和实际节拍时间
"""
# 理论节拍时间
theoretical_ct = available_time_minutes / demand_per_shift
# 实际节拍时间(考虑效率因素)
practical_ct = theoretical_ct * efficiency
return {
"theoretical_cycle_time": round(theoretical_ct, 2),
"practical_cycle_time": round(practical_ct, 2),
"demand_per_shift": demand_per_shift,
"available_time": available_time_minutes,
"efficiency_factor": efficiency
}
def calculate_workstations(total_work_content: float, cycle_time: float):
"""
计算理论和实际工作站数量
"""
theoretical = total_work_content / cycle_time
minimum = int(np.ceil(theoretical))
return {
"theoretical_workstations": round(theoretical, 2),
"minimum_workstations": minimum,
"total_work_content": total_work_content,
"cycle_time": cycle_time
}
3. 最大候选规则 (LCR)
def largest_candidate_rule(tasks: list, precedence: list, cycle_time: float):
"""
使用最大候选规则进行装配线平衡
按任务时间从大到小优先分配到工作站
"""
# 构建优先图
G = nx.DiGraph()
for pred, succ in precedence:
G.add_edge(pred, succ)
task_dict = {t['task_id']: t['time'] for t in tasks}
# 按时间降序排序任务
sorted_tasks = sorted(tasks, key=lambda x: x['time'], reverse=True)
workstations = []
assigned = set()
current_station = 1
current_time = 0
current_tasks = []
while len(assigned) < len(tasks):
task_assigned = False
for task in sorted_tasks:
tid = task['task_id']
if tid in assigned:
continue
# 检查优先关系 - 所有前驱任务必须已分配
predecessors = set(G.predecessors(tid))
if not predecessors.issubset(assigned):
continue
# 检查任务是否适合当前工作站
if current_time + task['time'] <= cycle_time:
current_tasks.append(tid)
current_time += task['time']
assigned.add(tid)
task_assigned = True
break
if not task_assigned:
# 关闭当前工作站并开始新的工作站
if current_tasks:
workstations.append({
'station': current_station,
'tasks': current_tasks,
'total_time': current_time,
'idle_time': cycle_time - current_time
})
current_station += 1
current_time = 0
current_tasks = []
# 如果最后一个工作站不为空,则添加
if current_tasks:
workstations.append({
'station': current_station,
'tasks': current_tasks,
'total_time': current_time,
'idle_time': cycle_time - current_time
})
return {
"workstations": workstations,
"num_stations": len(workstations),
"cycle_time": cycle_time
}
4. 排序位置权重法 (RPW)
def ranked_positional_weight(tasks: list, precedence: list, cycle_time: float):
"""
使用排序位置权重法进行装配线平衡
比LCR更好,因为它同时考虑了任务时间和位置
"""
# 构建图并计算位置权重
G = nx.DiGraph()
for pred, succ in precedence:
G.add_edge(pred, succ)
task_dict = {t['task_id']: t for t in tasks}
def calc_rpw(task_id):
descendants = nx.descendants(G, task_id)
weight = task_dict[task_id]['time']
for d in descendants:
weight += task_dict[d]['time']
return weight
# 将RPW添加到任务并按RPW排序
for task in tasks:
task['rpw'] = calc_rpw(task['task_id'])
sorted_tasks = sorted(tasks, key=lambda x: x['rpw'], reverse=True)
# 分配到工作站
workstations = []
assigned = set()
current_station = 1
current_time = 0
current_tasks = []
while len(assigned) < len(tasks):
task_assigned = False
for task in sorted_tasks:
tid = task['task_id']
if tid in assigned:
continue
# 检查优先关系
predecessors = set(G.predecessors(tid))
if not predecessors.issubset(assigned):
continue
# 检查是否适合
if current_time + task['time'] <= cycle_time:
current_tasks.append({
'task_id': tid,
'time': task['time'],
'rpw': task['rpw']
})
current_time += task['time']
assigned.add(tid)
task_assigned = True
if not task_assigned:
if current_tasks:
workstations.append({
'station': current_station,
'tasks': current_tasks,
'total_time': current_time,
'idle_time': cycle_time - current_time,
'utilization': current_time / cycle_time * 100
})
current_station += 1
current_time = 0
current_tasks = []
if current_tasks:
workstations.append({
'station': current_station,
'tasks': current_tasks,
'total_time': current_time,
'idle_time': cycle_time - current_time,
'utilization': current_time / cycle_time * 100
})
return {
"workstations": workstations,
"num_stations": len(workstations),
"cycle_time": cycle_time,
"method": "RPW"
}
5. 生产线效率指标
def calculate_line_efficiency(workstations: list, cycle_time: float, total_work_content: float):
"""
计算装配线平衡效率指标
"""
num_stations = len(workstations)
# 生产线效率(平衡效率)
line_efficiency = (total_work_content / (num_stations * cycle_time)) * 100
# 平衡延迟
balance_delay = 100 - line_efficiency
# 平滑指数
station_times = [ws['total_time'] for ws in workstations]
mean_time = np.mean(station_times)
smoothness = np.sqrt(sum((t - mean_time)**2 for t in station_times))
# 工作站利用率
utilizations = [ws['total_time'] / cycle_time * 100 for ws in workstations]
return {
"line_efficiency": round(line_efficiency, 2),
"balance_delay": round(balance_delay, 2),
"smoothness_index": round(smoothness, 2),
"num_stations": num_stations,
"cycle_time": cycle_time,
"station_utilizations": utilizations,
"min_utilization": round(min(utilizations), 2),
"max_utilization": round(max(utilizations), 2),
"avg_utilization": round(np.mean(utilizations), 2)
}
6. 混合模型装配线平衡
def mixed_model_balance(models: list, tasks: dict, precedence: dict,
demand_ratio: dict, cycle_time: float):
"""
平衡混合模型装配线
models: 模型ID列表
tasks: {模型: [{'task_id': str, 'time': float}]}
precedence: {模型: [(前驱, 后继)]}
demand_ratio: {模型: 需求比例}
"""
# 计算加权平均任务时间
weighted_tasks = defaultdict(float)
for model in models:
ratio = demand_ratio[model]
for task in tasks[model]:
weighted_tasks[task['task_id']] += task['time'] * ratio
# 创建组合任务列表
combined_tasks = [
{'task_id': tid, 'time': time}
for tid, time in weighted_tasks.items()
]
# 组合优先关系
combined_precedence = set()
for model in models:
for pred, succ in precedence[model]:
combined_precedence.add((pred, succ))
# 使用加权时间进行平衡
result = ranked_positional_weight(
combined_tasks,
list(combined_precedence),
cycle_time
)
return {
"mixed_model_balance": result,
"models": models,
"demand_ratios": demand_ratio,
"weighted_work_content": sum(weighted_tasks.values())
}
流程集成
此技能与以下流程集成:
assembly-line-design.jsproduction-scheduling-optimization.jsworkstation-design-optimization.js
输出格式
{
"line_balance": {
"workstations": [
{"station": 1, "tasks": ["A", "B"], "total_time": 48, "idle_time": 2},
{"station": 2, "tasks": ["C", "D", "E"], "total_time": 47, "idle_time": 3}
],
"cycle_time": 50,
"method": "RPW"
},
"efficiency": {
"line_efficiency": 95.2,
"balance_delay": 4.8,
"smoothness_index": 2.1
},
"recommendations": [
"考虑合并任务B和C以改善平衡",
"工作站4是瓶颈 - 考虑任务拆分"
]
}
最佳实践
- 验证优先关系 - 确保捕获所有关系
- 考虑多种算法 - 比较LCR、RPW等方法
- 考虑可变性 - 实际中任务时间会变化
- 允许学习 - 新生产线会随时间改进
- 设计灵活性 - 适应未来模型变化
- 包含人机工程学 - 工作站设计很重要
约束
- 优先约束限制了分配选项
- 任务拆分可能不可行
- 并行工作站增加了复杂性
- 混合模型需要仔细排序