装配线平衡器 line-balancer

装配线平衡器是一个专门用于生产线优化的人工智能技能,通过算法实现工作站设计、任务分配和节拍时间优化。该技能支持优先图分析、最大候选规则(LCR)、排序位置权重法(RPW)等多种平衡算法,能够计算生产线效率、平衡延迟和平滑指数等关键指标。适用于单模型、多模型和混合模型装配线的平衡优化,帮助制造企业提高生产效率、减少闲置时间、优化资源配置。关键词:装配线平衡,生产线优化,工作站设计,节拍时间,任务分配,生产效率,工业工程,制造优化,平衡算法,生产线效率。

精益生产 0 次安装 0 次浏览 更新于 2/25/2026

name: line-balancer description: 装配线平衡技能,用于工作站设计和节拍时间优化。 allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: production-planning backlog-id: SK-IE-030

line-balancer

您是 line-balancer - 一个专门用于装配线平衡的技能,包括工作站设计、任务分配和节拍时间优化。

概述

此技能支持AI驱动的装配线平衡,包括:

  • 优先图分析
  • 根据需求计算节拍时间
  • 工作站分配算法
  • 生产线效率计算
  • 平衡延迟最小化
  • 单模型和多模型线平衡
  • 混合模型排序
  • U型线平衡

功能

1. 优先图分析

import networkx as nx
import pandas as pd
from collections import defaultdict

def analyze_precedence(tasks: list, precedence: list):
    """
    分析装配线平衡的优先关系

    tasks: 任务列表,格式为 {'task_id': str, 'time': float, 'description': str}
    precedence: 优先关系列表,格式为 (前驱任务, 后继任务) 元组
    """
    # 构建有向图
    G = nx.DiGraph()

    task_dict = {t['task_id']: t for t in tasks}
    for task in tasks:
        G.add_node(task['task_id'], time=task['time'])

    for pred, succ in precedence:
        G.add_edge(pred, succ)

    # 计算位置权重(任务时间加上所有后继任务的时间)
    def positional_weight(node):
        descendants = nx.descendants(G, node)
        weight = task_dict[node]['time']
        for d in descendants:
            weight += task_dict[d]['time']
        return weight

    weights = {t['task_id']: positional_weight(t['task_id']) for t in tasks}

    # 找到关键路径
    total_time = sum(t['time'] for t in tasks)

    # 找到直接前驱和后继
    analysis = []
    for task in tasks:
        tid = task['task_id']
        analysis.append({
            'task_id': tid,
            'time': task['time'],
            'predecessors': list(G.predecessors(tid)),
            'successors': list(G.successors(tid)),
            'positional_weight': weights[tid]
        })

    return {
        "total_work_content": total_time,
        "task_analysis": pd.DataFrame(analysis).sort_values('positional_weight', ascending=False),
        "graph": G
    }

2. 节拍时间与工作站计算

def calculate_cycle_time(demand_per_shift: int, available_time_minutes: float,
                        efficiency: float = 0.95):
    """
    根据需求计算所需节拍时间

    返回理论节拍时间和实际节拍时间
    """
    # 理论节拍时间
    theoretical_ct = available_time_minutes / demand_per_shift

    # 实际节拍时间(考虑效率因素)
    practical_ct = theoretical_ct * efficiency

    return {
        "theoretical_cycle_time": round(theoretical_ct, 2),
        "practical_cycle_time": round(practical_ct, 2),
        "demand_per_shift": demand_per_shift,
        "available_time": available_time_minutes,
        "efficiency_factor": efficiency
    }

def calculate_workstations(total_work_content: float, cycle_time: float):
    """
    计算理论和实际工作站数量
    """
    theoretical = total_work_content / cycle_time
    minimum = int(np.ceil(theoretical))

    return {
        "theoretical_workstations": round(theoretical, 2),
        "minimum_workstations": minimum,
        "total_work_content": total_work_content,
        "cycle_time": cycle_time
    }

3. 最大候选规则 (LCR)

def largest_candidate_rule(tasks: list, precedence: list, cycle_time: float):
    """
    使用最大候选规则进行装配线平衡

    按任务时间从大到小优先分配到工作站
    """
    # 构建优先图
    G = nx.DiGraph()
    for pred, succ in precedence:
        G.add_edge(pred, succ)

    task_dict = {t['task_id']: t['time'] for t in tasks}

    # 按时间降序排序任务
    sorted_tasks = sorted(tasks, key=lambda x: x['time'], reverse=True)

    workstations = []
    assigned = set()
    current_station = 1
    current_time = 0
    current_tasks = []

    while len(assigned) < len(tasks):
        task_assigned = False

        for task in sorted_tasks:
            tid = task['task_id']

            if tid in assigned:
                continue

            # 检查优先关系 - 所有前驱任务必须已分配
            predecessors = set(G.predecessors(tid))
            if not predecessors.issubset(assigned):
                continue

            # 检查任务是否适合当前工作站
            if current_time + task['time'] <= cycle_time:
                current_tasks.append(tid)
                current_time += task['time']
                assigned.add(tid)
                task_assigned = True
                break

        if not task_assigned:
            # 关闭当前工作站并开始新的工作站
            if current_tasks:
                workstations.append({
                    'station': current_station,
                    'tasks': current_tasks,
                    'total_time': current_time,
                    'idle_time': cycle_time - current_time
                })
                current_station += 1
                current_time = 0
                current_tasks = []

    # 如果最后一个工作站不为空,则添加
    if current_tasks:
        workstations.append({
            'station': current_station,
            'tasks': current_tasks,
            'total_time': current_time,
            'idle_time': cycle_time - current_time
        })

    return {
        "workstations": workstations,
        "num_stations": len(workstations),
        "cycle_time": cycle_time
    }

4. 排序位置权重法 (RPW)

def ranked_positional_weight(tasks: list, precedence: list, cycle_time: float):
    """
    使用排序位置权重法进行装配线平衡

    比LCR更好,因为它同时考虑了任务时间和位置
    """
    # 构建图并计算位置权重
    G = nx.DiGraph()
    for pred, succ in precedence:
        G.add_edge(pred, succ)

    task_dict = {t['task_id']: t for t in tasks}

    def calc_rpw(task_id):
        descendants = nx.descendants(G, task_id)
        weight = task_dict[task_id]['time']
        for d in descendants:
            weight += task_dict[d]['time']
        return weight

    # 将RPW添加到任务并按RPW排序
    for task in tasks:
        task['rpw'] = calc_rpw(task['task_id'])

    sorted_tasks = sorted(tasks, key=lambda x: x['rpw'], reverse=True)

    # 分配到工作站
    workstations = []
    assigned = set()
    current_station = 1
    current_time = 0
    current_tasks = []

    while len(assigned) < len(tasks):
        task_assigned = False

        for task in sorted_tasks:
            tid = task['task_id']

            if tid in assigned:
                continue

            # 检查优先关系
            predecessors = set(G.predecessors(tid))
            if not predecessors.issubset(assigned):
                continue

            # 检查是否适合
            if current_time + task['time'] <= cycle_time:
                current_tasks.append({
                    'task_id': tid,
                    'time': task['time'],
                    'rpw': task['rpw']
                })
                current_time += task['time']
                assigned.add(tid)
                task_assigned = True

        if not task_assigned:
            if current_tasks:
                workstations.append({
                    'station': current_station,
                    'tasks': current_tasks,
                    'total_time': current_time,
                    'idle_time': cycle_time - current_time,
                    'utilization': current_time / cycle_time * 100
                })
                current_station += 1
                current_time = 0
                current_tasks = []

    if current_tasks:
        workstations.append({
            'station': current_station,
            'tasks': current_tasks,
            'total_time': current_time,
            'idle_time': cycle_time - current_time,
            'utilization': current_time / cycle_time * 100
        })

    return {
        "workstations": workstations,
        "num_stations": len(workstations),
        "cycle_time": cycle_time,
        "method": "RPW"
    }

5. 生产线效率指标

def calculate_line_efficiency(workstations: list, cycle_time: float, total_work_content: float):
    """
    计算装配线平衡效率指标
    """
    num_stations = len(workstations)

    # 生产线效率(平衡效率)
    line_efficiency = (total_work_content / (num_stations * cycle_time)) * 100

    # 平衡延迟
    balance_delay = 100 - line_efficiency

    # 平滑指数
    station_times = [ws['total_time'] for ws in workstations]
    mean_time = np.mean(station_times)
    smoothness = np.sqrt(sum((t - mean_time)**2 for t in station_times))

    # 工作站利用率
    utilizations = [ws['total_time'] / cycle_time * 100 for ws in workstations]

    return {
        "line_efficiency": round(line_efficiency, 2),
        "balance_delay": round(balance_delay, 2),
        "smoothness_index": round(smoothness, 2),
        "num_stations": num_stations,
        "cycle_time": cycle_time,
        "station_utilizations": utilizations,
        "min_utilization": round(min(utilizations), 2),
        "max_utilization": round(max(utilizations), 2),
        "avg_utilization": round(np.mean(utilizations), 2)
    }

6. 混合模型装配线平衡

def mixed_model_balance(models: list, tasks: dict, precedence: dict,
                       demand_ratio: dict, cycle_time: float):
    """
    平衡混合模型装配线

    models: 模型ID列表
    tasks: {模型: [{'task_id': str, 'time': float}]}
    precedence: {模型: [(前驱, 后继)]}
    demand_ratio: {模型: 需求比例}
    """
    # 计算加权平均任务时间
    weighted_tasks = defaultdict(float)

    for model in models:
        ratio = demand_ratio[model]
        for task in tasks[model]:
            weighted_tasks[task['task_id']] += task['time'] * ratio

    # 创建组合任务列表
    combined_tasks = [
        {'task_id': tid, 'time': time}
        for tid, time in weighted_tasks.items()
    ]

    # 组合优先关系
    combined_precedence = set()
    for model in models:
        for pred, succ in precedence[model]:
            combined_precedence.add((pred, succ))

    # 使用加权时间进行平衡
    result = ranked_positional_weight(
        combined_tasks,
        list(combined_precedence),
        cycle_time
    )

    return {
        "mixed_model_balance": result,
        "models": models,
        "demand_ratios": demand_ratio,
        "weighted_work_content": sum(weighted_tasks.values())
    }

流程集成

此技能与以下流程集成:

  • assembly-line-design.js
  • production-scheduling-optimization.js
  • workstation-design-optimization.js

输出格式

{
  "line_balance": {
    "workstations": [
      {"station": 1, "tasks": ["A", "B"], "total_time": 48, "idle_time": 2},
      {"station": 2, "tasks": ["C", "D", "E"], "total_time": 47, "idle_time": 3}
    ],
    "cycle_time": 50,
    "method": "RPW"
  },
  "efficiency": {
    "line_efficiency": 95.2,
    "balance_delay": 4.8,
    "smoothness_index": 2.1
  },
  "recommendations": [
    "考虑合并任务B和C以改善平衡",
    "工作站4是瓶颈 - 考虑任务拆分"
  ]
}

最佳实践

  1. 验证优先关系 - 确保捕获所有关系
  2. 考虑多种算法 - 比较LCR、RPW等方法
  3. 考虑可变性 - 实际中任务时间会变化
  4. 允许学习 - 新生产线会随时间改进
  5. 设计灵活性 - 适应未来模型变化
  6. 包含人机工程学 - 工作站设计很重要

约束

  • 优先约束限制了分配选项
  • 任务拆分可能不可行
  • 并行工作站增加了复杂性
  • 混合模型需要仔细排序