名称: 本地LLM路由器 描述: 在空气隔离网络中路由AI编码查询到本地LLMs。集成Serena MCP进行语义代码理解。在离线工作、使用本地模型(Ollama、LM Studio、Jan、OpenWebUI)或安全/封闭环境中使用。触发词:本地LLM、Ollama、LM Studio、Jan、空气隔离、离线AI、Serena、本地推理、封闭网络、模型路由、防御网络、安全编码。
空气隔离网络的本地LLM路由器
智能路由AI编码查询到本地LLMs,集成Serena LSP,用于安全、离线能力的开发环境。
先决条件(关键)
使用此技能前,确保:
- Serena MCP服务器 已安装并运行(主要工具)
- 至少一个本地LLM服务 运行(Ollama、LM Studio、Jan等)
# 安装Serena(必需)
pip install serena
# 或通过uvx
uvx --from git+https://github.com/oraios/serena serena start-mcp-server
# 验证本地LLM服务
curl http://localhost:11434/api/version # Ollama
curl http://localhost:1234/v1/models # LM Studio
curl http://localhost:1337/v1/models # Jan
快速开始
import httpx
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class TaskCategory(Enum):
CODING = "coding"
REASONING = "reasoning"
ANALYSIS = "analysis"
DOCUMENTATION = "documentation"
@dataclass
class RouterConfig:
"""本地LLM路由器配置。"""
ollama_url: str = "http://localhost:11434"
lmstudio_url: str = "http://localhost:1234"
jan_url: str = "http://localhost:1337"
serena_enabled: bool = True
timeout: int = 30
async def quick_route(query: str, config: RouterConfig = RouterConfig()):
"""快速路由示例 - 检测服务并路由查询。"""
# 1. 检测可用服务
services = await discover_services(config)
if not services:
raise RuntimeError("无本地LLM服务可用")
# 2. 分类任务
category = classify_task(query)
# 3. 选择任务的最佳模型
model = select_model(category, services)
# 4. 执行查询
return await execute_query(query, model, services[0])
# 示例用法
async def main():
response = await quick_route("编写一个安全解析JSON的函数")
print(response)
asyncio.run(main())
Serena集成(主要工具)
关键: Serena MCP 必须首先调用所有代码相关任务。这提供了代码库的语义理解,然后路由到LLM。
为何首先使用Serena?
- 令牌效率: Serena 仅提取相关代码上下文
- 准确性: 符号级操作 vs grep风格搜索
- 代码库意识: 理解类型、引用、调用层次
- 编辑精度: 在符号级别应用更改,而非字符串匹配
Serena MCP 设置
import subprocess
import json
from typing import Any
class SerenaMCP:
"""Serena MCP客户端,用于代码智能。"""
def __init__(self, workspace_root: str):
self.workspace = workspace_root
self.process = None
async def start(self):
"""启动Serena MCP服务器。"""
self.process = subprocess.Popen(
["serena", "start-mcp-server", "--workspace", self.workspace],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
async def call(self, method: str, params: dict) -> Any:
"""调用Serena MCP方法。"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params
}
self.process.stdin.write(json.dumps(request).encode() + b"
")
self.process.stdin.flush()
response = self.process.stdout.readline()
return json.loads(response)
async def find_symbol(self, name: str) -> dict:
"""按名称查找符号定义。"""
return await self.call("find_symbol", {"name": name})
async def get_references(self, file: str, line: int, char: int) -> list:
"""获取位置符号的所有引用。"""
return await self.call("get_references", {
"file": file,
"line": line,
"character": char
})
async def get_hover_info(self, file: str, line: int, char: int) -> dict:
"""获取位置的类型/文档信息。"""
return await self.call("get_hover_info", {
"file": file,
"line": line,
"character": char
})
async def get_diagnostics(self, file: str) -> list:
"""获取文件的错误/警告。"""
return await self.call("get_diagnostics", {"file": file})
async def apply_edit(self, file: str, edits: list) -> bool:
"""应用代码编辑到文件。"""
return await self.call("apply_edit", {"file": file, "edits": edits})
# 按优先级排序的Serena工具(始终先使用高优先级)
SERENA_TOOLS = {
# 优先级1: 符号级操作(最高)
"find_symbol": {"priority": 1, "use_for": ["导航", "定义"]},
"get_references": {"priority": 1, "use_for": ["重构", "影响分析"]},
"get_hover_info": {"priority": 1, "use_for": ["类型信息", "文档"]},
# 优先级2: 代码导航
"go_to_definition": {"priority": 2, "use_for": ["导航"]},
"go_to_type_definition": {"priority": 2, "use_for": ["类型导航"]},
"go_to_implementation": {"priority": 2, "use_for": ["接口实现"]},
# 优先级3: 代码理解
"get_document_symbols": {"priority": 3, "use_for": ["文件结构"]},
"get_workspace_symbols": {"priority": 3, "use_for": ["代码库搜索"]},
"get_call_hierarchy": {"priority": 3, "use_for": ["调用分析"]},
# 优先级4: 代码修改
"apply_edit": {"priority": 4, "use_for": ["编辑"]},
"rename_symbol": {"priority": 4, "use_for": ["重构"]},
# 优先级5: 诊断
"get_diagnostics": {"priority": 5, "use_for": ["错误", "警告"]},
"get_code_actions": {"priority": 5, "use_for": ["快速修复"]},
}
首先使用Serena的请求处理器
async def handle_code_request(
query: str,
file_context: Optional[dict] = None,
serena: SerenaMCP = None,
router: "LLMRouter" = None
):
"""
使用首先使用Serena模式处理代码请求。
关键: Serena 总是首先为代码任务调用。
"""
# 步骤1: 分类任务
category = classify_task(query)
# 步骤2: 总是使用Serena获取代码上下文(如果可用)
serena_context = {}
if serena and file_context:
# 从Serena收集语义上下文
if file_context.get("file") and file_context.get("position"):
file = file_context["file"]
line = file_context["position"]["line"]
char = file_context["position"]["character"]
# 获取悬停信息(类型、文档)
serena_context["hover"] = await serena.get_hover_info(file, line, char)
# 对于重构/分析,获取引用
if category in [TaskCategory.ANALYSIS, TaskCategory.CODING]:
if "refactor" in query.lower() or "rename" in query.lower():
serena_context["references"] = await serena.get_references(
file, line, char
)
# 总是获取文件的诊断
serena_context["diagnostics"] = await serena.get_diagnostics(file)
# 步骤3: 构建带有Serena上下文的丰富提示
enriched_query = build_enriched_query(query, serena_context)
# 步骤4: 选择并路由到适当的LLM
model = router.select_model(category)
response = await router.execute(enriched_query, model)
# 步骤5: 如果响应包含编辑,通过Serena应用
if serena and contains_code_edit(response):
edits = parse_code_edits(response)
await serena.apply_edit(file_context["file"], edits)
return response
def build_enriched_query(query: str, serena_context: dict) -> str:
"""构建带有Serena上下文的查询。"""
parts = [query]
if serena_context.get("hover"):
hover = serena_context["hover"]
parts.append(f"
## 类型信息
{hover}
if serena_context.get("references"):
refs = serena_context["references"]
parts.append(f"
## 引用(找到{len(refs)}个)
")
for ref in refs[:10]: # 限制到前10个
parts.append(f"- {ref['file']}:{ref['line']}")
if serena_context.get("diagnostics"):
diags = serena_context["diagnostics"]
if diags:
parts.append(f"
## 当前问题({len(diags)}个)
")
for diag in diags[:5]:
parts.append(f"- 行 {diag['line']}: {diag['message']}")
return "
".join(parts)
服务发现
支持的服务
| 服务 | 默认端点 | 健康检查 | 模型端点 | 聊天端点 | API风格 |
|---|---|---|---|---|---|
| Ollama | localhost:11434 |
/api/version |
/api/tags |
/api/chat |
原生 |
| LM Studio | localhost:1234 |
/v1/models |
/v1/models |
/v1/chat/completions |
OpenAI |
| Jan | localhost:1337 |
/v1/models |
/v1/models |
/v1/chat/completions |
OpenAI |
| OpenWebUI | localhost:3000 |
/api/health |
/api/models |
/api/chat |
自定义 |
| LocalAI | localhost:8080 |
/readyz |
/v1/models |
/v1/chat/completions |
OpenAI |
| vLLM | localhost:8000 |
/health |
/v1/models |
/v1/chat/completions |
OpenAI |
| llama.cpp | localhost:8080 |
/health |
/v1/models |
/v1/chat/completions |
OpenAI |
| Kobold.cpp | localhost:5001 |
/api/v1/info |
/api/v1/models |
/api/v1/generate |
自定义 |
| GPT4All | localhost:4891 |
/v1/models |
/v1/models |
/v1/chat/completions |
OpenAI |
| text-generation-webui | localhost:5000 |
/api/v1/model |
/api/v1/models |
/api/v1/chat |
自定义 |
OS检测
import sys
import os
import platform
from dataclasses import dataclass
@dataclass
class OSInfo:
platform: str # 'windows', 'linux', 'darwin'
release: str
arch: str # 'x64', 'arm64'
is_wsl: bool
is_container: bool
def detect_os() -> OSInfo:
"""检测操作系统和环境。"""
plat = sys.platform
# 标准化平台名称
if plat == 'win32':
plat = 'windows'
elif plat == 'darwin':
plat = 'darwin'
else:
plat = 'linux'
# WSL检测
is_wsl = False
if plat == 'linux':
try:
with open('/proc/version', 'r') as f:
is_wsl = 'microsoft' in f.read().lower()
except FileNotFoundError:
pass
is_wsl = is_wsl or os.environ.get('WSL_DISTRO_NAME') is not None
# 容器检测
is_container = (
os.path.exists('/.dockerenv') or
os.environ.get('KUBERNETES_SERVICE_HOST') is not None
)
if not is_container and plat == 'linux':
try:
with open('/proc/1/cgroup', 'r') as f:
is_container = 'docker' in f.read() or 'kubepods' in f.read()
except FileNotFoundError:
pass
return OSInfo(
platform=plat,
release=platform.release(),
arch=platform.machine(),
is_wsl=is_wsl,
is_container=is_container
)
def adjust_endpoint_for_os(endpoint: str, os_info: OSInfo) -> str:
"""基于OS环境调整端点。"""
if os_info.is_wsl or os_info.is_container:
# 在WSL/容器中,本地主机服务在主机上
return endpoint.replace('localhost', 'host.docker.internal')
return endpoint
服务发现实现
import httpx
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class DiscoveredModel:
id: str
name: str
size: int = 0
family: Optional[str] = None
context_length: int = 4096
quantization: Optional[str] = None
@dataclass
class LLMService:
name: str
type: str # 'ollama', 'lmstudio', 'jan', 'openwebui', 'custom'
endpoint: str
status: str = 'unknown' # 'online', 'offline', 'unknown'
models: list = field(default_factory=list)
last_checked: datetime = None
api_style: str = 'openai' # 'openai', 'native'
# 端点路径
health_path: str = '/v1/models'
models_path: str = '/v1/models'
chat_path: str = '/v1/chat/completions'
# 默认服务配置
SERVICE_DEFAULTS = {
'ollama': LLMService(
name='Ollama',
type='ollama',
endpoint='http://localhost:11434',
health_path='/api/version',
models_path='/api/tags',
chat_path='/api/chat',
api_style='native'
),
'lmstudio': LLMService(
name='LM Studio',
type='lmstudio',
endpoint='http://localhost:1234',
health_path='/v1/models',
models_path='/v1/models',
chat_path='/v1/chat/completions',
api_style='openai'
),
'jan': LLMService(
name='Jan',
type='jan',
endpoint='http://localhost:1337',
health_path='/v1/models',
models_path='/v1/models',
chat_path='/v1/chat/completions',
api_style='openai'
),
'openwebui': LLMService(
name='Open WebUI',
type='openwebui',
endpoint='http://localhost:3000',
health_path='/api/health',
models_path='/api/models',
chat_path='/api/chat',
api_style='custom'
),
'localai': LLMService(
name='LocalAI',
type='localai',
endpoint='http://localhost:8080',
health_path='/readyz',
models_path='/v1/models',
chat_path='/v1/chat/completions',
api_style='openai'
),
'vllm': LLMService(
name='vLLM',
type='vllm',
endpoint='http://localhost:8000',
health_path='/health',
models_path='/v1/models',
chat_path='/v1/chat/completions',
api_style='openai'
),
'llamacpp': LLMService(
name='llama.cpp',
type='llamacpp',
endpoint='http://localhost:8080',
health_path='/health',
models_path='/v1/models',
chat_path='/v1/chat/completions',
api_style='openai'
),
'koboldcpp': LLMService(
name='Kobold.cpp',
type='koboldcpp',
endpoint='http://localhost:5001',
health_path='/api/v1/info',
models_path='/api/v1/model',
chat_path='/api/v1/generate',
api_style='custom'
),
'gpt4all': LLMService(
name='GPT4All',
type='gpt4all',
endpoint='http://localhost:4891',
health_path='/v1/models',
models_path='/v1/models',
chat_path='/v1/chat/completions',
api_style='openai'
),
}
class ServiceDiscovery:
"""发现和监控本地LLM服务。"""
def __init__(self, custom_endpoints: list = None):
self.services: dict[str, LLMService] = {}
self.os_info = detect_os()
self.custom_endpoints = custom_endpoints or []
self._client = httpx.AsyncClient(timeout=5.0)
async def discover_all(self) -> list[LLMService]:
"""发现所有可用的LLM服务。"""
discovered = []
# 检查默认服务
tasks = []
for key, default in SERVICE_DEFAULTS.items():
service = LLMService(
name=default.name,
type=default.type,
endpoint=adjust_endpoint_for_os(default.endpoint, self.os_info),
health_path=default.health_path,
models_path=default.models_path,
chat_path=default.chat_path,
api_style=default.api_style
)
tasks.append(self._check_service(service))
# 检查自定义端点
for custom in self.custom_endpoints:
service = LLMService(
name=custom.get('name', '自定义'),
type='custom',
endpoint=custom['endpoint'],
health_path=custom.get('health_path', '/v1/models'),
models_path=custom.get('models_path', '/v1/models'),
chat_path=custom.get('chat_path', '/v1/chat/completions'),
api_style=custom.get('api_style', 'openai')
)
tasks.append(self._check_service(service))
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, LLMService) and result.status == 'online':
discovered.append(result)
self.services[result.type] = result
return discovered
async def _check_service(self, service: LLMService) -> LLMService:
"""检查服务是否在线并发现模型。"""
try:
# 健康检查
response = await self._client.get(
f"{service.endpoint}{service.health_path}"
)
if response.status_code == 200:
service.status = 'online'
service.last_checked = datetime.now()
# 发现模型
service.models = await self._discover_models(service)
else:
service.status = 'offline'
except (httpx.ConnectError, httpx.TimeoutException):
service.status = 'offline'
return service
async def _discover_models(self, service: LLMService) -> list[DiscoveredModel]:
"""发现服务上的可用模型。"""
try:
response = await self._client.get(
f"{service.endpoint}{service.models_path}"
)
data = response.json()
# 基于服务类型解析
if service.type == 'ollama':
return [
DiscoveredModel(
id=m['name'],
name=m['name'],
size=m.get('size', 0),
family=m.get('details', {}).get('family'),
context_length=self._infer_context_length(m['name'])
)
for m in data.get('models', [])
]
else: # OpenAI风格
return [
DiscoveredModel(
id=m['id'],
name=m['id'],
context_length=m.get('context_length', 4096)
)
for m in data.get('data', [])
]
except Exception:
return []
def _infer_context_length(self, model_name: str) -> int:
"""从模型名称推断上下文长度。"""
name_lower = model_name.lower()
# 检查显式上下文标记
if '128k' in name_lower or '131k' in name_lower:
return 131072
if '64k' in name_lower:
return 65536
if '32k' in name_lower:
return 32768
if '16k' in name_lower:
return 16384
# 模型系列默认值
if 'qwen' in name_lower:
return 131072 # Qwen模型通常有128K+
if 'deepseek' in name_lower:
return 128000
if 'llama-3' in name_lower or 'llama3' in name_lower:
return 128000
if 'codellama' in name_lower:
return 100000
if 'mixtral' in name_lower:
return 65536
return 8192 # 安全默认值
任务分类
分类系统
import re
from enum import Enum
from dataclasses import dataclass
class TaskCategory(Enum):
CODING = "coding"
REASONING = "reasoning"
ANALYSIS = "analysis"
DOCUMENTATION = "documentation"
@dataclass
class ClassificationResult:
category: TaskCategory
confidence: float # 0.0 - 1.0
requires_serena: bool
keywords_matched: list[str]
# 任务模式(正则表达式)
TASK_PATTERNS = {
TaskCategory.CODING: [
r"(?:write|create|implement|code|generate)\s+(?:a\s+)?(?:function|class|method|component)",
r"(?:fix|debug|solve)\s+(?:this|the)\s+(?:bug|error|issue)",
r"refactor\s+(?:this|the)",
r"add\s+(?:error\s+handling|validation|logging|tests?)",
r"complete\s+(?:this|the)\s+code",
r"(?:convert|translate)\s+(?:this|the)\s+code",
r"(?:optimize|improve)\s+(?:this|the)\s+(?:function|code|performance)",
],
TaskCategory.REASONING: [
r"(?:design|architect|plan)\s+(?:a|the)\s+(?:system|architecture|solution)",
r"how\s+should\s+(?:I|we)\s+(?:approach|structure|implement)",
r"what\s+(?:is|would\s+be)\s+the\s+best\s+(?:way|approach|pattern)",
r"explain\s+the\s+(?:logic|reasoning|algorithm)",
r"compare\s+(?:and\s+contrast|between)",
r"(?:recommend|suggest)\s+(?:an?\s+)?(?:approach|solution|pattern)",
r"trade-?offs?\s+(?:between|of)",
],
TaskCategory.ANALYSIS: [
r"(?:review|analyze|audit)\s+(?:this|the)\s+code",
r"find\s+(?:potential\s+)?(?:issues|vulnerabilities|bugs|problems)",
r"(?:security|performance)\s+(?:review|analysis|audit)",
r"what\s+(?:could|might)\s+go\s+wrong",
r"identify\s+(?:problems|improvements|issues)",
r"(?:check|scan)\s+for\s+(?:vulnerabilities|issues)",
],
TaskCategory.DOCUMENTATION: [
r"(?:write|create|generate)\s+(?:documentation|docs|docstring)",
r"(?:add|write)\s+(?:comments|jsdoc|docstring|type\s+hints)",
r"(?:document|explain)\s+(?:this|the)\s+(?:code|function|api)",
r"(?:create|write)\s+(?:a\s+)?readme",
r"(?:generate|write)\s+(?:api\s+)?documentation",
r"describe\s+(?:what|how)\s+(?:this|the)",
],
}
# 关键词权重用于评分
KEYWORD_WEIGHTS = {
# 编码
"function": (TaskCategory.CODING, 0.3),
"implement": (TaskCategory.CODING, 0.4),
"code": (TaskCategory.CODING, 0.2),
"debug": (TaskCategory.CODING, 0.5),
"refactor": (TaskCategory.CODING, 0.6),
"fix": (TaskCategory.CODING, 0.4),
"test": (TaskCategory.CODING, 0.3),
"bug": (TaskCategory.CODING, 0.5),
# 推理
"architecture": (TaskCategory.REASONING, 0.6),
"design": (TaskCategory.REASONING, 0.4),
"approach": (TaskCategory.REASONING, 0.3),
"strategy": (TaskCategory.REASONING, 0.5),
"tradeoff": (TaskCategory.REASONING, 0.5),
"compare": (TaskCategory.REASONING, 0.4),
"recommend": (TaskCategory.REASONING, 0.4),
# 分析
"review": (TaskCategory.ANALYSIS, 0.5),
"analyze": (TaskCategory.ANALYSIS, 0.6),
"security": (TaskCategory.ANALYSIS, 0.4),
"vulnerability": (TaskCategory.ANALYSIS, 0.7),
"performance": (TaskCategory.ANALYSIS, 0.3),
"audit": (TaskCategory.ANALYSIS, 0.6),
# 文档
"document": (TaskCategory.DOCUMENTATION, 0.6),
"readme": (TaskCategory.DOCUMENTATION, 0.8),
"docstring": (TaskCategory.DOCUMENTATION, 0.8),
"comment": (TaskCategory.DOCUMENTATION, 0.4),
"explain": (TaskCategory.DOCUMENTATION, 0.3),
}
def classify_task(query: str) -> ClassificationResult:
"""将查询分类到任务类别。"""
query_lower = query.lower()
scores = {cat: 0.0 for cat in TaskCategory}
matched_keywords = []
# 模式匹配(权重: 0.5)
for category, patterns in TASK_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, query_lower):
scores[category] += 0.5
# 关键词评分(权重: 0.5)
words = re.findall(r'\w+', query_lower)
for word in words:
if word in KEYWORD_WEIGHTS:
category, weight = KEYWORD_WEIGHTS[word]
scores[category] += weight * 0.5
matched_keywords.append(word)
# 找到最高评分类别
best_category = max(scores, key=scores.get)
confidence = min(scores[best_category], 1.0)
# 如果没有清晰匹配,默认到CODING
if confidence < 0.2:
best_category = TaskCategory.CODING
confidence = 0.5
# 确定是否需要Serena
requires_serena = (
best_category == TaskCategory.ANALYSIS or
any(kw in query_lower for kw in [
'definition', 'reference', 'symbol', 'rename',
'where is', 'find all', 'go to', 'jump to'
])
)
return ClassificationResult(
category=best_category,
confidence=confidence,
requires_serena=requires_serena,
keywords_matched=matched_keywords
)
模型选择
模型能力矩阵
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModelCapability:
id: str
family: str
context_window: int
vram_gb: float
categories: list[TaskCategory]
performance_scores: dict[TaskCategory, int] # 0-100
tier: int # 1=最佳, 2=良好, 3=基本
quantization: Optional[str] = None
# 全面的模型数据库(40+模型) - 更新于2025年1月
MODEL_DATABASE: dict[str, ModelCapability] = {
# === 编码专家(第1层) ===
"deepseek-v3": ModelCapability(
id="deepseek-v3",
family="deepseek",
context_window=128000,
vram_gb=48, # MoE: 685B total, 37B active
categories=[TaskCategory.CODING, TaskCategory.REASONING, TaskCategory.ANALYSIS],
performance_scores={
TaskCategory.CODING: 99,
TaskCategory.REASONING: 97,
TaskCategory.ANALYSIS: 96,
TaskCategory.DOCUMENTATION: 92
},
tier=1
),
"qwen2.5-coder-32b": ModelCapability(
id="qwen2.5-coder-32b",
family="qwen",
context_window=131072,
vram_gb=22,
categories=[TaskCategory.CODING, TaskCategory.ANALYSIS],
performance_scores={
TaskCategory.CODING: 96,
TaskCategory.REASONING: 82,
TaskCategory.ANALYSIS: 92,
TaskCategory.DOCUMENTATION: 88
},
tier=1
),
"deepseek-coder-v2": ModelCapability(
id="deepseek-coder-v2",
family="deepseek",
context_window=128000,
vram_gb=48, # MoE: 236B total, 21B active
categories=[TaskCategory.CODING, TaskCategory.ANALYSIS, TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 95,
TaskCategory.REASONING: 88,
TaskCategory.ANALYSIS: 92,
TaskCategory.DOCUMENTATION: 80
},
tier=1
),
"codellama-70b": ModelCapability(
id="codellama-70b",
family="llama",
context_window=100000,
vram_gb=40,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 90,
TaskCategory.REASONING: 70,
TaskCategory.ANALYSIS: 85,
TaskCategory.DOCUMENTATION: 75
},
tier=1
),
"codellama-34b": ModelCapability(
id="codellama-34b",
family="llama",
context_window=100000,
vram_gb=20,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 85,
TaskCategory.REASONING: 65,
TaskCategory.ANALYSIS: 80,
TaskCategory.DOCUMENTATION: 70
},
tier=2
),
"qwen2.5-coder-14b": ModelCapability(
id="qwen2.5-coder-14b",
family="qwen",
context_window=131072,
vram_gb=10,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 82,
TaskCategory.REASONING: 60,
TaskCategory.ANALYSIS: 75,
TaskCategory.DOCUMENTATION: 70
},
tier=2
),
"starcoder2-15b": ModelCapability(
id="starcoder2-15b",
family="starcoder",
context_window=16384,
vram_gb=10,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 80,
TaskCategory.REASONING: 50,
TaskCategory.ANALYSIS: 70,
TaskCategory.DOCUMENTATION: 60
},
tier=2
),
"deepseek-coder-6.7b": ModelCapability(
id="deepseek-coder-6.7b",
family="deepseek",
context_window=16384,
vram_gb=5,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 75,
TaskCategory.REASONING: 50,
TaskCategory.ANALYSIS: 65,
TaskCategory.DOCUMENTATION: 55
},
tier=3
),
"codellama-7b": ModelCapability(
id="codellama-7b",
family="llama",
context_window=16384,
vram_gb=5,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 70,
TaskCategory.REASONING: 45,
TaskCategory.ANALYSIS: 60,
TaskCategory.DOCUMENTATION: 50
},
tier=3
),
# === 推理专家 ===
"deepseek-r1": ModelCapability(
id="deepseek-r1",
family="deepseek",
context_window=128000,
vram_gb=160, # 671B total
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 92,
TaskCategory.REASONING: 99,
TaskCategory.ANALYSIS: 95,
TaskCategory.DOCUMENTATION: 90
},
tier=1
),
"deepseek-r1-distill-70b": ModelCapability(
id="deepseek-r1-distill-70b",
family="deepseek",
context_window=128000,
vram_gb=42,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 88,
TaskCategory.REASONING: 94,
TaskCategory.ANALYSIS: 90,
TaskCategory.DOCUMENTATION: 86
},
tier=1
),
"qwen2.5-72b-instruct": ModelCapability(
id="qwen2.5-72b-instruct",
family="qwen",
context_window=131072,
vram_gb=48,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 88,
TaskCategory.REASONING: 95,
TaskCategory.ANALYSIS: 92,
TaskCategory.DOCUMENTATION: 94
},
tier=1
),
"llama-3.3-70b-instruct": ModelCapability(
id="llama-3.3-70b-instruct",
family="llama",
context_window=128000,
vram_gb=42,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 85,
TaskCategory.REASONING: 92,
TaskCategory.ANALYSIS: 88,
TaskCategory.DOCUMENTATION: 90
},
tier=1
),
"deepseek-r1-distill-32b": ModelCapability(
id="deepseek-r1-distill-32b",
family="deepseek",
context_window=128000,
vram_gb=22,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 82,
TaskCategory.REASONING: 90,
TaskCategory.ANALYSIS: 85,
TaskCategory.DOCUMENTATION: 82
},
tier=2
),
"mistral-small-24b": ModelCapability(
id="mistral-small-24b",
family="mistral",
context_window=32768,
vram_gb=16,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 80,
TaskCategory.REASONING: 85,
TaskCategory.ANALYSIS: 82,
TaskCategory.DOCUMENTATION: 84
},
tier=2
),
"qwen2.5-32b-instruct": ModelCapability(
id="qwen2.5-32b-instruct",
family="qwen",
context_window=131072,
vram_gb=22,
categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 78,
TaskCategory.REASONING: 86,
TaskCategory.ANALYSIS: 82,
TaskCategory.DOCUMENTATION: 88
},
tier=2
),
"phi-4": ModelCapability(
id="phi-4",
family="phi",
context_window=16384,
vram_gb=10,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 82,
TaskCategory.REASONING: 88,
TaskCategory.ANALYSIS: 80,
TaskCategory.DOCUMENTATION: 78
},
tier=2
),
"deepseek-r1-distill-14b": ModelCapability(
id="deepseek-r1-distill-14b",
family="deepseek",
context_window=128000,
vram_gb=10,
categories=[TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 75,
TaskCategory.REASONING: 85,
TaskCategory.ANALYSIS: 78,
TaskCategory.DOCUMENTATION: 76
},
tier=2
),
"llama-3.2-11b-vision": ModelCapability(
id="llama-3.2-11b-vision",
family="llama",
context_window=128000,
vram_gb=8,
categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 68,
TaskCategory.REASONING: 78,
TaskCategory.ANALYSIS: 75,
TaskCategory.DOCUMENTATION: 80
},
tier=2
),
"gemma-2-27b": ModelCapability(
id="gemma-2-27b",
family="gemma",
context_window=8192,
vram_gb=18,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 78,
TaskCategory.REASONING: 82,
TaskCategory.ANALYSIS: 78,
TaskCategory.DOCUMENTATION: 80
},
tier=2
),
"deepseek-r1-distill-8b": ModelCapability(
id="deepseek-r1-distill-8b",
family="deepseek",
context_window=128000,
vram_gb=6,
categories=[TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 68,
TaskCategory.REASONING: 78,
TaskCategory.ANALYSIS: 70,
TaskCategory.DOCUMENTATION: 68
},
tier=3
),
"gemma-2-9b": ModelCapability(
id="gemma-2-9b",
family="gemma",
context_window=8192,
vram_gb=7,
categories=[TaskCategory.REASONING, TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 72,
TaskCategory.REASONING: 75,
TaskCategory.ANALYSIS: 70,
TaskCategory.DOCUMENTATION: 74
},
tier=3
),
"llama-3.2-3b": ModelCapability(
id="llama-3.2-3b",
family="llama",
context_window=128000,
vram_gb=3,
categories=[TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 55,
TaskCategory.REASONING: 65,
TaskCategory.ANALYSIS: 58,
TaskCategory.DOCUMENTATION: 65
},
tier=3
),
# === 分析专家(需要Serena) ===
"codellama-34b-instruct": ModelCapability(
id="codellama-34b-instruct",
family="llama",
context_window=100000,
vram_gb=20,
categories=[TaskCategory.ANALYSIS],
performance_scores={
TaskCategory.CODING: 80,
TaskCategory.REASONING: 70,
TaskCategory.ANALYSIS: 88,
TaskCategory.DOCUMENTATION: 75
},
tier=2
),
# === 文档专家 ===
"mistral-nemo-12b": ModelCapability(
id="mistral-nemo-12b",
family="mistral",
context_window=128000,
vram_gb=8,
categories=[TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 65,
TaskCategory.REASONING: 70,
TaskCategory.ANALYSIS: 65,
TaskCategory.DOCUMENTATION: 82
},
tier=2
),
"mistral-7b": ModelCapability(
id="mistral-7b",
family="mistral",
context_window=32768,
vram_gb=5,
categories=[TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 55,
TaskCategory.REASONING: 60,
TaskCategory.ANALYSIS: 55,
TaskCategory.DOCUMENTATION: 72
},
tier=3
),
# === 额外模型 ===
"phi-3-medium": ModelCapability(
id="phi-3-medium",
family="phi",
context_window=128000,
vram_gb=8,
categories=[TaskCategory.CODING, TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 72,
TaskCategory.REASONING: 75,
TaskCategory.ANALYSIS: 68,
TaskCategory.DOCUMENTATION: 70
},
tier=2
),
"gemma-2-27b": ModelCapability(
id="gemma-2-27b",
family="gemma",
context_window=8192,
vram_gb=18,
categories=[TaskCategory.CODING, TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 78,
TaskCategory.REASONING: 80,
TaskCategory.ANALYSIS: 75,
TaskCategory.DOCUMENTATION: 78
},
tier=2
),
"yi-34b": ModelCapability(
id="yi-34b",
family="yi",
context_window=200000,
vram_gb=20,
categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 72,
TaskCategory.REASONING: 82,
TaskCategory.ANALYSIS: 75,
TaskCategory.DOCUMENTATION: 80
},
tier=2
),
"command-r-plus": ModelCapability(
id="command-r-plus",
family="cohere",
context_window=128000,
vram_gb=48,
categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 70,
TaskCategory.REASONING: 85,
TaskCategory.ANALYSIS: 78,
TaskCategory.DOCUMENTATION: 88
},
tier=1
),
"wizardcoder-33b": ModelCapability(
id="wizardcoder-33b",
family="wizard",
context_window=16384,
vram_gb=20,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 85,
TaskCategory.REASONING: 60,
TaskCategory.ANALYSIS: 75,
TaskCategory.DOCUMENTATION: 65
},
tier=2
),
"magicoder-7b": ModelCapability(
id="magicoder-7b",
family="magicoder",
context_window=16384,
vram_gb=5,
categories=[TaskCategory.CODING],
performance_scores={
TaskCategory.CODING: 78,
TaskCategory.REASONING: 50,
TaskCategory.ANALYSIS: 65,
TaskCategory.DOCUMENTATION: 55
},
tier=3
),
"dolphin-mixtral-8x7b": ModelCapability(
id="dolphin-mixtral-8x7b",
family="dolphin",
context_window=32768,
vram_gb=28,
categories=[TaskCategory.CODING, TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 75,
TaskCategory.REASONING: 78,
TaskCategory.ANALYSIS: 72,
TaskCategory.DOCUMENTATION: 75
},
tier=2
),
"nous-hermes-2-mixtral": ModelCapability(
id="nous-hermes-2-mixtral",
family="nous",
context_window=32768,
vram_gb=28,
categories=[TaskCategory.REASONING],
performance_scores={
TaskCategory.CODING: 72,
TaskCategory.REASONING: 82,
TaskCategory.ANALYSIS: 75,
TaskCategory.DOCUMENTATION: 78
},
tier=2
),
"solar-10.7b": ModelCapability(
id="solar-10.7b",
family="solar",
context_window=4096,
vram_gb=7,
categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
performance_scores={
TaskCategory.CODING: 60,
TaskCategory.REASONING: 72,
TaskCategory.ANALYSIS: 65,
TaskCategory.DOCUMENTATION: 75
},
tier=3
),
}
# 任务到模型优先级映射(更新于2025年1月)
TASK_MODEL_PRIORITY = {
TaskCategory.CODING: [
# 第1层 - 最佳
"deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2",
# 第2层 - 良好
"codellama-70b", "qwen2.5-coder-14b", "codellama-34b",
"starcoder2-15b", "phi-4",
# 第3层 - 基本
"qwen2.5-coder-7b", "codellama-7b", "deepseek-coder-6.7b"
],
TaskCategory.REASONING: [
# 第1层 - 最佳
"deepseek-r1", "deepseek-v3", "deepseek-r1-distill-70b",
"qwen2.5-72b-instruct", "llama-3.3-70b-instruct",
# 第2层 - 良好
"deepseek-r1-distill-32b", "mistral-small-24b", "qwen2.5-32b-instruct",
"phi-4", "gemma-2-27b",
# 第3层 - 基本
"deepseek-r1-distill-14b", "deepseek-r1-distill-8b", "gemma-2-9b"
],
TaskCategory.ANALYSIS: [
# 需要Serena LSP
"deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2",
"codellama-34b-instruct", "qwen2.5-72b-instruct"
],
TaskCategory.DOCUMENTATION: [
"qwen2.5-72b-instruct", "llama-3.3-70b-instruct", "qwen2.5-32b-instruct",
"mistral-small-24b", "mistral-nemo-12b", "gemma-2-27b"
],
}
模型选择逻辑
from typing import Optional
class ModelSelector:
"""基于可用性和需求选择最佳模型。"""
def __init__(self, available_models: list[str]):
self.available = set(m.lower() for m in available_models)
def select(
self,
category: TaskCategory,
required_context: int = 0,
max_vram_gb: Optional[float] = None
) -> Optional[str]:
"""为任务类别选择最佳可用模型。"""
# 获取类别的优先级列表
priority_list = TASK_MODEL_PRIORITY.get(category, [])
for model_id in priority_list:
# 检查模型是否可用
if not self._is_available(model_id):
continue
# 检查模型能力
capability = MODEL_DATABASE.get(model_id)
if not capability:
continue
# 检查上下文窗口需求
if required_context > 0 and capability.context_window < required_context:
continue
# 检查VRAM约束
if max_vram_gb and capability.vram_gb > max_vram_gb:
continue
return model_id
# 后备:返回任何可用模型
for model_id, capability in MODEL_DATABASE.items():
if self._is_available(model_id):
return model_id
return None
def _is_available(self, model_id: str) -> bool:
"""检查模型是否可用(模糊匹配)。"""
model_lower = model_id.lower()
# 精确匹配
if model_lower in self.available:
return True
# 部分匹配(模型名称包含在可用中)
for avail in self.available:
if model_lower in avail or avail in model_lower:
return True
return False
def get_fallback_models(self, category: TaskCategory) -> list[str]:
"""获取类别的后备模型列表。"""
priority_list = TASK_MODEL_PRIORITY.get(category, [])
available_in_priority = [
m for m in priority_list if self._is_available(m)
]
# 返回第2层和第3层模型作为后备
fallbacks = []
for model_id in available_in_priority:
capability = MODEL_DATABASE.get(model_id)
if capability and capability.tier >= 2:
fallbacks.append(model_id)
return fallbacks
上下文管理
令牌计数
from abc import ABC, abstractmethod
import re
class TokenCounter(ABC):
"""令牌计数的基类。"""
@abstractmethod
def count(self, text: str) -> int:
pass
class EstimationCounter(TokenCounter):
"""基于估计的令牌计数器(无外部依赖)。"""
def __init__(self, chars_per_token: float = 4.0):
self.chars_per_token = chars_per_token
def count(self, text: str) -> int:
return int(len(text) / self.chars_per_token)
class QwenCounter(TokenCounter):
"""Qwen模型的令牌计数器。"""
def count(self, text: str) -> int:
# Qwen使用略有不同的令牌化
return int(len(text) / 3.5)
class LlamaCounter(TokenCounter):
"""Llama模型的令牌计数器。"""
def count(self, text: str) -> int:
# Llama使用SentencePiece
return int(len(text) / 3.8)
# 模型系列到计数器的映射
TOKEN_COUNTERS = {
"qwen": QwenCounter(),
"deepseek": EstimationCounter(4.0),
"llama": LlamaCounter(),
"mistral": EstimationCounter(4.0),
"mixtral": EstimationCounter(4.0),
"default": EstimationCounter(4.0),
}
def get_token_counter(model_id: str) -> TokenCounter:
"""获取模型的适当令牌计数器。"""
capability = MODEL_DATABASE.get(model_id)
if capability:
return TOKEN_COUNTERS.get(capability.family, TOKEN_COUNTERS["default"])
return TOKEN_COUNTERS["default"]
上下文管理器
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Message:
role: str # 'system', 'user', 'assistant', 'tool'
content: str
timestamp: datetime = field(default_factory=datetime.now)
token_count: int = 0
metadata: dict = field(default_factory=dict)
@dataclass
class ConversationContext:
session_id: str
messages: list[Message] = field(default_factory=list)
total_tokens: int = 0
system_prompt: str = ""
system_prompt_tokens: int = 0
active_model: str = ""
model_history: list[str] = field(default_factory=list)
compaction_count: int = 0
class ContextManager:
"""管理对话上下文,支持压缩。"""
def __init__(
self,
session_id: str,
system_prompt: str = "",
compaction_threshold: float = 0.8, # 上下文窗口的80%
compaction_target: float = 0.5, # 压缩到50%
preserve_recent: int = 10 # 保留最后N条消息
):
self.context = ConversationContext(
session_id=session_id,
system_prompt=system_prompt
)
self.compaction_threshold = compaction_threshold
self.compaction_target = compaction_target
self.preserve_recent = preserve_recent
self._counter: Optional[TokenCounter] = None
def set_model(self, model_id: str):
"""设置活动模型并更新令牌计数器。"""
if self.context.active_model:
self.context.model_history.append(self.context.active_model)
self.context.active_model = model_id
self._counter = get_token_counter(model_id)
# 用新计数器重新计数所有令牌
self._recount_tokens()
def add_message(self, role: str, content: str, metadata: dict = None):
"""添加消息到上下文。"""
token_count = self._counter.count(content) if self._counter else 0
message = Message(
role=role,
content=content,
token_count=token_count,
metadata=metadata or {}
)
self.context.messages.append(message)
self.context.total_tokens += token_count
def check_and_compact(self, max_tokens: int) -> bool:
"""检查是否需要压缩并执行。"""
threshold = int(max_tokens * self.compaction_threshold)
if self.context.total_tokens > threshold:
self._compact(max_tokens)
return True
return False
def _compact(self, max_tokens: int):
"""压缩上下文到目标大小。"""
target = int(max_tokens * self.compaction_target)
# 步骤1: 截断大型工具输出
for msg in self.context.messages:
if msg.role == 'tool' and msg.token_count > 500:
original = msg.token_count
msg.content = f"[工具输出截断 - {msg.metadata.get('tool_name', 'unknown')}]"
msg.token_count = self._counter.count(msg.content)
msg.metadata['truncated'] = True
msg.metadata['original_tokens'] = original
self._recalculate_total()
if self.context.total_tokens <= target:
return
# 步骤2: 总结较旧的消息
if len(self.context.messages) > self.preserve_recent:
older = self.context.messages[:-self.preserve_recent]
recent = self.context.messages[-self.preserve_recent:]
# 创建较旧消息的摘要
summary = self._create_summary(older)
summary_msg = Message(
role='system',
content=f"[先前对话摘要]
{summary}",
token_count=self._counter.count(summary),
metadata={'compacted': True}
)
self.context.messages = [summary_msg] + recent
self.context.compaction_count += 1
self._recalculate_total()
def _create_summary(self, messages: list[Message]) -> str:
"""创建消息摘要(简单实现)。"""
# 在生产中,这将使用轻量级LLM
key_points = []
for msg in messages:
if msg.role == 'user':
# 提取用户查询的第一句
first_sentence = msg.content.split('.')[0][:100]
key_points.append(f"- 用户询问: {first_sentence}")
elif msg.role == 'assistant' and len(key_points) < 10:
# 提取关键决策/结果
if 'created' in msg.content.lower() or 'implemented' in msg.content.lower():
first_sentence = msg.content.split('.')[0][:100]
key_points.append(f"- 助手: {first_sentence}")
return "
".join(key_points[:10])
def _recount_tokens(self):
"""用当前计数器重新计数所有令牌。"""
if not self._counter:
return
self.context.system_prompt_tokens = self._counter.count(self.context.system_prompt)
for msg in self.context.messages:
msg.token_count = self._counter.count(msg.content)
self._recalculate_total()
def _recalculate_total(self):
"""重新计算总令牌计数。"""
self.context.total_tokens = (
self.context.system_prompt_tokens +
sum(m.token_count for m in self.context.messages)
)
def export_for_api(self) -> list[dict]:
"""以API格式导出消息。"""
messages = []
if self.context.system_prompt:
messages.append({
"role": "system",
"content": self.context.system_prompt
})
for msg in self.context.messages:
messages.append({
"role": msg.role,
"content": msg.content
})
return messages
def prepare_handoff(self, new_model: str) -> "ContextManager":
"""为模型切换准备上下文。"""
self.set_model(new_model)
return self
配置
内联配置模式
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ServiceConfig:
"""单个LLM服务的配置。"""
enabled: bool = True
endpoint: str = ""
priority: int = 1
timeout: int = 30000
max_retries: int = 3
api_style: str = "openai"
@dataclass
class TaskRoutingConfig:
"""任务路由配置。"""
primary_models: list[str] = field(default_factory=list)
fallback_models: list[str] = field(default_factory=list)
min_context: int = 8192
require_serena: bool = False
@dataclass
class SecurityConfig:
"""空气隔离网络的安全配置。"""
allow_external: bool = False
allowed_hosts: list[str] = field(default_factory=lambda: [
"localhost", "127.0.0.1", "host.docker.internal"
])
allowed_cidrs: list[str] = field(default_factory=lambda: [
"192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12"
])
audit_enabled: bool = True
audit_log_path: str = "./audit.log"
log_queries: bool = True
log_responses: bool = False # 不记录敏感响应
verify_checksums: bool = True
@dataclass
class ContextConfig:
"""上下文管理配置。"""
compaction_threshold: float = 0.8
compaction_target: float = 0.5
preserve_recent_messages: int = 10
preserve_recent_tool_calls: int = 5
max_tool_output_tokens: int = 500
@dataclass
class RouterConfig:
"""完整路由器配置。"""
# 服务
ollama: ServiceConfig = field(default_factory=lambda: ServiceConfig(
endpoint="http://localhost:11434",
priority=1
))
lmstudio: ServiceConfig = field(default_factory=lambda: ServiceConfig(
endpoint="http://localhost:1234",
priority=2
))
jan: ServiceConfig = field(default_factory=lambda: ServiceConfig(
endpoint="http://localhost:1337",
priority=3
))
custom_endpoints: list[dict] = field(default_factory=list)
# 任务路由(更新于2025年1月)
coding: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
primary_models=["deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2"],
fallback_models=["codellama-34b", "qwen2.5-coder-14b", "phi-4"],
min_context=8192
))
reasoning: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
primary_models=["deepseek-r1", "deepseek-v3", "qwen2.5-72b-instruct"],
fallback_models=["deepseek-r1-distill-32b", "mistral-small-24b"],
min_context=16384
))
analysis: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
primary_models=["deepseek-v3", "qwen2.5-coder-32b"],
fallback_models=["codellama-34b-instruct", "qwen2.5-72b-instruct"],
min_context=16384,
require_serena=True
))
documentation: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
primary_models=["qwen2.5-72b-instruct", "llama-3.3-70b-instruct"],
fallback_models=["qwen2.5-32b-instruct", "mistral-nemo-12b"],
min_context=8192
))
# Serena
serena_enabled: bool = True
serena_priority: str = "always_first"
# 上下文
context: ContextConfig = field(default_factory=ContextConfig)
# 安全
security: SecurityConfig = field(default_factory=SecurityConfig)
# 默认配置实例
DEFAULT_CONFIG = RouterConfig()
def load_config_from_dict(data: dict) -> RouterConfig:
"""从字典加载配置(例如,解析的YAML)。"""
config = RouterConfig()
# 更新服务
if 'services' in data:
for service_name, service_data in data['services'].items():
if hasattr(config, service_name):
setattr(config, service_name, ServiceConfig(**service_data))
# 更新任务路由
for category in ['coding', 'reasoning', 'analysis', 'documentation']:
if category in data.get('task_routing', {}):
setattr(config, category, TaskRoutingConfig(**data['task_routing'][category]))
# 更新安全
if 'security' in data:
config.security = SecurityConfig(**data['security'])
return config
示例YAML配置(供参考)
# local-llm-router.yaml
# 复制到您的项目并自定义
version: "1.0"
environment: "air-gapped"
services:
ollama:
enabled: true
endpoint: "http://localhost:11434"
priority: 1
timeout: 30000
lmstudio:
enabled: true
endpoint: "http://localhost:1234"
priority: 2
jan:
enabled: false
endpoint: "http://localhost:1337"
priority: 3
custom_endpoints:
- name: "internal-gpu-server"
endpoint: "http://192.168.1.100:8000"
priority: 0
api_style: "openai"
task_routing:
coding:
primary_models:
- "deepseek-v3"
- "qwen2.5-coder-32b"
- "deepseek-coder-v2"
fallback_models:
- "codellama-34b"
- "qwen2.5-coder-14b"
- "phi-4"
min_context: 8192
reasoning:
primary_models:
- "deepseek-r1"
- "deepseek-v3"
- "qwen2.5-72b-instruct"
fallback_models:
- "deepseek-r1-distill-32b"
- "mistral-small-24b"
min_context: 16384
analysis:
primary_models:
- "deepseek-v3"
- "qwen2.5-coder-32b"
require_serena: true
documentation:
primary_models:
- "qwen2.5-72b-instruct"
- "llama-3.3-70b-instruct"
fallback_models:
- "mistral-nemo-12b"
serena:
enabled: true
priority: "always_first"
workspace: "${WORKSPACE_ROOT}"
context:
compaction_threshold: 0.8
preserve_recent_messages: 10
security:
allow_external: false
allowed_hosts:
- "localhost"
- "127.0.0.1"
- "192.168.0.0/16"
audit_enabled: true
audit_log_path: "./llm-router-audit.log"
后备策略
优雅降级
from enum import IntEnum
from dataclasses import dataclass
from typing import Optional, Any
class FallbackLevel(IntEnum):
PRIMARY = 0
FALLBACK_MODELS = 1
REDUCED_CONTEXT = 2
SMALLEST_MODEL = 3
FAILED = 4
@dataclass
class ExecutionResult:
success: bool
model: Optional[str] = None
service: Optional[str] = None
response: Any = None
fallback_level: FallbackLevel = FallbackLevel.PRIMARY
error: Optional[str] = None
class FallbackExecutor:
"""使用多级后备执行查询。"""
def __init__(
self,
discovery: ServiceDiscovery,
context_manager: ContextManager,
config: RouterConfig
):
self.discovery = discovery
self.context = context_manager
self.config = config
async def execute_with_fallback(
self,
query: str,
category: TaskCategory
) -> ExecutionResult:
"""使用后备策略执行查询。"""
# 获取模型列表
task_config = getattr(self.config, category.value)
primary_models = task_config.primary_models
fallback_models = task_config.fallback_models
# 级别0: 尝试主要模型
for model in primary_models:
result = await self._try_model(model, query)
if result.success:
result.fallback_level = FallbackLevel.PRIMARY
return result
# 级别1: 尝试后备模型
for model in fallback_models:
result = await self._try_model(model, query)
if result.success:
result.fallback_level = FallbackLevel.FALLBACK_MODELS
return result
# 级别2: 减少上下文并重试
self.context._compact(task_config.min_context)
for model in primary_models + fallback_models:
result = await self._try_model(model, query)
if result.success:
result.fallback_level = FallbackLevel.REDUCED_CONTEXT
return result
# 级别3: 使用最小可用模型
smallest = await self._find_smallest_model()
if smallest:
result = await self._try_model(smallest, query)
if result.success:
result.fallback_level = FallbackLevel.SMALLEST_MODEL
return result
# 级别4: 全部失败
return ExecutionResult(
success=False,
fallback_level=FallbackLevel.FAILED,
error="所有后备策略耗尽"
)
async def _try_model(self, model_id: str, query: str) -> ExecutionResult:
"""在特定模型上尝试执行查询。"""
# 查找包含此模型的服务
service = await self._find_service_with_model(model_id)
if not service:
return ExecutionResult(
success=False,
error=f"模型 {model_id} 不可用"
)
try:
response = await self._execute_on_service(service, model_id, query)
return ExecutionResult(
success=True,
model=model_id,
service=service.name,
response=response
)
except Exception as e:
return ExecutionResult(
success=False,
error=str(e)
)
async def _find_service_with_model(self, model_id: str) -> Optional[LLMService]:
"""查找包含指定模型的服务。"""
services = list(self.discovery.services.values())
# 按优先级排序
services.sort(key=lambda s: getattr(self.config, s.type, ServiceConfig()).priority)
for service in services:
for model in service.models:
if model_id.lower() in model.id.lower() or model.id.lower() in model_id.lower():
return service
return None
async def _find_smallest_model(self) -> Optional[str]:
"""按VRAM需求查找最小可用模型。"""
smallest = None
smallest_vram = float('inf')
for service in self.discovery.services.values():
for model in service.models:
capability = MODEL_DATABASE.get(model.id)
if capability and capability.vram_gb < smallest_vram:
smallest = model.id
smallest_vram = capability.vram_gb
return smallest
async def _execute_on_service(
self,
service: LLMService,
model_id: str,
query: str
) -> str:
"""在特定服务上执行查询。"""
import httpx
messages = self.context.export_for_api()
messages.append({"role": "user", "content": query})
async with httpx.AsyncClient() as client:
if service.api_style == 'native' and service.type == 'ollama':
# Ollama原生API
response = await client.post(
f"{service.endpoint}{service.chat_path}",
json={
"model": model_id,
"messages": messages,
"stream": False
},
timeout=self.config.ollama.timeout / 1000
)
data = response.json()
return data.get('message', {}).get('content', '')
else:
# OpenAI兼容API
response = await client.post(
f"{service.endpoint}{service.chat_path}",
json={
"model": model_id,
"messages": messages,
"stream": False
},
timeout=30
)
data = response.json()
return data.get('choices', [{}])[0].get('message', {}).get('content', '')
安全(空气隔离)
网络隔离
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
import ipaddress
import logging
@dataclass
class AuditLogEntry:
timestamp: str
event_type: str
session_id: Optional[str] = None
model: Optional[str] = None
service: Optional[str] = None
query_hash: Optional[str] = None # 哈希,非明文
tokens_in: int = 0
tokens_out: int = 0
success: bool = True
error: Optional[str] = None
class SecurityModule:
"""空气隔离网络的安全执行。"""
def __init__(self, config: SecurityConfig):
self.config = config
self._allowed_ips = self._parse_allowed_networks()
self._logger = self._setup_audit_logger()
def _parse_allowed_networks(self) -> list:
"""解析允许的主机和CIDR。"""
networks = []
for host in self.config.allowed_hosts:
if '/' in host:
# CIDR表示法
networks.append(ipaddress.ip_network(host, strict=False))
else:
# 单个主机
try:
ip = ipaddress.ip_address(host)
networks.append(ipaddress.ip_network(f"{ip}/32"))
except ValueError:
# 主机名如'localhost'
if host == 'localhost':
networks.append(ipaddress.ip_network("127.0.0.0/8"))
elif host == 'host.docker.internal':
# 允许常见Docker主机IP
networks.append(ipaddress.ip_network("172.17.0.0/16"))
for cidr in self.config.allowed_cidrs:
networks.append(ipaddress.ip_network(cidr, strict=False))
return networks
def _setup_audit_logger(self) -> logging.Logger:
"""设置审计记录器。"""
logger = logging.getLogger('llm-router-audit')
logger.setLevel(logging.INFO)
if self.config.audit_enabled:
handler = logging.FileHandler(self.config.audit_log_path)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
return logger
def validate_endpoint(self, url: str) -> bool:
"""验证端点是否在允许网络中。"""
if self.config.allow_external:
return True
try:
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
# 检查本地主机
if host in ['localhost', '127.0.0.1', '::1']:
return True
# 检查允许的网络
try:
ip = ipaddress.ip_address(host)
for network in self._allowed_ips:
if ip in network:
return True
except ValueError:
# 主机名 - 仅允许特定
return host in ['localhost', 'host.docker.internal']
return False
except Exception:
return False
def log_query(
self,
session_id: str,
model: str,
service: str,
query: str,
tokens_in: int,
tokens_out: int,
success: bool,
error: Optional[str] = None
):
"""为审计追踪记录查询。"""
if not self.config.audit_enabled:
return
entry = AuditLogEntry(
timestamp=datetime.now().isoformat(),
event_type='query',
session_id=session_id,
model=model,
service=service,
query_hash=self._hash_content(query) if self.config.log_queries else None,
tokens_in=tokens_in,
tokens_out=tokens_out,
success=success,
error=error
)
self._logger.info(json.dumps(entry.__dict__))
def log_security_event(self, event_type: str, details: dict):
"""记录安全相关事件。"""
if not self.config.audit_enabled:
return
entry = {
'timestamp': datetime.now().isoformat(),
'event_type': f'security:{event_type}',
**details
}
self._logger.warning(json.dumps(entry))
def _hash_content(self, content: str) -> str:
"""哈希内容用于审计记录(隐私)。"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
# 空气隔离部署的安全清单
AIR_GAPPED_CHECKLIST = """
## 空气隔离部署清单
### 网络
- [ ] 验证无外部DNS解析
- [ ] 在防火墙阻止所有出口流量
- [ ] 仅白名单内部IP范围
- [ ] 如不需要则禁用IPv6
### 模型验证
- [ ] 预下载所有所需模型
- [ ] 为所有模型生成SHA256校验和
- [ ] 将校验和存储在防篡改位置
- [ ] 在加载模型前验证校验和
### 访问控制
- [ ] 实施基于角色的LLM服务访问
- [ ] 所有端点需要身份验证
- [ ] 使用短期令牌进行API访问
- [ ] 记录所有访问尝试
### 审计
- [ ] 启用全面审计记录
- [ ] 记录查询(哈希,非明文)
- [ ] 记录模型使用模式
- [ ] 记录所有安全事件
- [ ] 实施日志轮换和保留
"""
编码代理检测
检测活动编码代理
import os
import sys
from dataclasses import dataclass
from typing import Optional
@dataclass
class CodingAgentInfo:
name: str
type: str
version: Optional[str] = None
config_path: Optional[str] = None
# 不同代理的环境变量标记
AGENT_ENV_MARKERS = {
# CLI-based agents
'QWEN_CLI_VERSION': ('qwen-cli', 'cli'),
'OPENCODE_SESSION': ('opencode', 'cli'),
'AIDER_SESSION': ('aider', 'cli'),
'CODEX_SESSION': ('codex', 'cli'),
'GEMINI_CLI_SESSION': ('gemini-cli', 'cli'),
# IDE extensions
'CONTINUE_SESSION': ('continue', 'ide'),
'CLINE_SESSION': ('cline', 'ide'),
'ROO_CODE_SESSION': ('roo-code', 'ide'),
'CURSOR_SESSION': ('cursor', 'ide'),
# Local GUI apps
'OPENWEBUI_SESSION': ('openwebui', 'gui'),
'JAN_SESSION': ('jan', 'gui'),
'AGNO_SESSION': ('agno', 'gui'),
# Generic markers
'LLM_AGENT': ('generic', 'unknown'),
}
def detect_coding_agent() -> CodingAgentInfo:
"""检测哪个编码代理正在调用路由器。"""
# 检查环境变量
for env_var, (name, agent_type) in AGENT_ENV_MARKERS.items():
value = os.environ.get(env_var)
if value:
return CodingAgentInfo(
name=name,
type=agent_type,
version=value if value != '1' else None
)
# 检查进程名称 / 父进程
try:
import psutil
parent = psutil.Process(os.getppid())
parent_name = parent.name().lower()
agent_process_names = {
'qwen': 'qwen-cli',
'aider': 'aider',
'codex': 'codex',
'continue': 'continue',
'cursor': 'cursor',
}
for proc_name, agent_name in agent_process_names.items():
if proc_name in parent_name:
return CodingAgentInfo(name=agent_name, type='detected')
except ImportError:
pass # psutil不可用
# 检查MCP客户端标记
if os.environ.get('MCP_CLIENT'):
return CodingAgentInfo(
name=os.environ.get('MCP_CLIENT', 'mcp-client'),
type='mcp'
)
# 默认: 未知
return CodingAgentInfo(name='unknown', type='unknown')
def get_agent_specific_config(agent: CodingAgentInfo) -> dict:
"""获取代理特定的配置覆盖。"""
configs = {
'qwen-cli': {
'default_model_preference': 'qwen',
'context_format': 'qwen',
},
'aider': {
'default_model_preference': 'gpt',
'context_format': 'openai',
},
'cursor': {
'default_model_preference': 'claude',
'context_format': 'anthropic',
},
'continue': {
'supports_streaming': True,
'context_format': 'openai',
},
}
return configs.get(agent.name, {})
完整路由器实现
class LocalLLMRouter:
"""
完整本地LLM路由器,集成Serena。
用法:
router = LocalLLMRouter(workspace="/path/to/project")
await router.initialize()
response = await router.route("实现一个二分搜索函数")
print(response)
"""
def __init__(
self,
workspace: str,
config: RouterConfig = None,
session_id: str = None
):
self.workspace = workspace
self.config = config or DEFAULT_CONFIG
self.session_id = session_id or self._generate_session_id()
# 组件
self.serena: Optional[SerenaMCP] = None
self.discovery: Optional[ServiceDiscovery] = None
self.context: Optional[ContextManager] = None
self.security: Optional[SecurityModule] = None
self.selector: Optional[ModelSelector] = None
self.fallback: Optional[FallbackExecutor] = None
# 状态
self.os_info = detect_os()
self.coding_agent = detect_coding_agent()
self._initialized = False
async def initialize(self):
"""初始化所有路由器组件。"""
# 安全模块
self.security = SecurityModule(self.config.security)
# 服务发现
self.discovery = ServiceDiscovery(self.config.custom_endpoints)
services = await self.discovery.discover_all()
if not services:
raise RuntimeError("无本地LLM服务可用")
# 模型选择器
all_models = []
for service in services:
all_models.extend(m.id for m in service.models)
self.selector = ModelSelector(all_models)
# 上下文管理器
self.context = ContextManager(
session_id=self.session_id,
system_prompt=self._build_system_prompt(),
compaction_threshold=self.config.context.compaction_threshold,
compaction_target=self.config.context.compaction_target,
preserve_recent=self.config.context.preserve_recent_messages
)
# Serena MCP(如启用)
if self.config.serena_enabled:
self.serena = SerenaMCP(self.workspace)
try:
await self.serena.start()
except Exception as e:
logging.warning(f"Serena MCP 启动失败: {e}")
self.serena = None
# 后备执行器
self.fallback = FallbackExecutor(
self.discovery,
self.context,
self.config
)
self._initialized = True
async def route(
self,
query: str,
file_context: dict = None
) -> str:
"""
路由查询到适当的LLM。
参数:
query: 用户的查询
file_context: 可选的字典,包含'file'、'position'用于代码上下文
返回:
LLM响应字符串
"""
if not self._initialized:
await self.initialize()
# 步骤1: 分类任务
classification = classify_task(query)
# 步骤2: 首先使用Serena(如代码相关)
serena_context = {}
if self.serena and (classification.requires_serena or file_context):
serena_context = await self._gather_serena_context(
query, file_context, classification
)
# 步骤3: 构建丰富查询
enriched_query = self._build_enriched_query(query, serena_context)
# 步骤4: 选择模型
model = self.selector.select(
classification.category,
required_context=self.context.context.total_tokens + len(query) // 4
)
if not model:
raise RuntimeError("无合适模型可用")
# 步骤5: 更新上下文管理器与所选模型
self.context.set_model(model)
# 步骤6: 检查上下文并在需要时压缩
model_capability = MODEL_DATABASE.get(model)
if model_capability:
self.context.check_and_compact(model_capability.context_window)
# 步骤7: 使用后备执行
result = await self.fallback.execute_with_fallback(
enriched_query,
classification.category
)
# 步骤8: 审计记录
self.security.log_query(
session_id=self.session_id,
model=result.model or model,
service=result.service or 'unknown',
query=query,
tokens_in=len(query) // 4,
tokens_out=len(result.response or '') // 4,
success=result.success,
error=result.error
)
if not result.success:
raise RuntimeError(f"查询失败: {result.error}")
# 步骤9: 更新上下文与响应
self.context.add_message('user', query)
self.context.add_message('assistant', result.response)
# 步骤10: 如需要,通过Serena应用编辑
if self.serena and file_context and contains_code_edit(result.response):
await self._apply_serena_edits(result.response, file_context)
return result.response
async def _gather_serena_context(
self,
query: str,
file_context: dict,
classification: ClassificationResult
) -> dict:
"""从Serena收集代码上下文。"""
context = {}
if not file_context:
return context
file = file_context.get('file')
position = file_context.get('position', {})
line = position.get('line', 0)
char = position.get('character', 0)
try:
# 总是获取悬停信息
context['hover'] = await self.serena.get_hover_info(file, line, char)
# 获取重构任务的引用
if 'refactor' in query.lower() or 'rename' in query.lower():
context['references'] = await self.serena.get_references(file, line, char)
# 获取分析的诊断
if classification.category == TaskCategory.ANALYSIS:
context['diagnostics'] = await self.serena.get_diagnostics(file)
except Exception as e:
logging.warning(f"Serena上下文收集失败: {e}")
return context
def _build_enriched_query(self, query: str, serena_context: dict) -> str:
"""构建带有Serena上下文的丰富查询。"""
return build_enriched_query(query, serena_context)
async def _apply_serena_edits(self, response: str, file_context: dict):
"""通过Serena应用响应中的代码编辑。"""
edits = parse_code_edits(response)
if edits:
await self.serena.apply_edit(file_context['file'], edits)
def _build_system_prompt(self) -> str:
"""构建带有路由器上下文的系统提示。"""
return f"""You are a coding assistant running in a local, air-gapped environment.
环境:
- 操作系统: {self.os_info.platform} ({self.os_info.arch})
- 编码代理: {self.coding_agent.name}
- Serena LSP: {'enabled' if self.config.serena_enabled else 'disabled'}
指南:
- 提供简洁、准确的代码
- 使用Serena的语义信息当提供时
- 尊重安全约束(无外部调用)
- 专注于手头特定任务
"""
def _generate_session_id(self) -> str:
"""生成唯一会话ID。"""
import uuid
return str(uuid.uuid4())[:8]
# 实用函数
def contains_code_edit(response: str) -> bool:
"""检查响应是否包含代码编辑。"""
markers = ['```', 'def ', 'class ', 'function ', 'const ', 'let ', 'var ']
return any(marker in response for marker in markers)
def parse_code_edits(response: str) -> list:
"""从响应中解析代码编辑。"""
# 简单实现 - 提取代码块
import re
code_blocks = re.findall(r'```(?:\w+)?
(.*?)```', response, re.DOTALL)
return [{'content': block.strip()} for block in code_blocks]
资源
- Serena MCP: https://github.com/oraios/serena
- Serena 文档: https://github.com/oraios/serena#user-guide
- Ollama API: https://github.com/ollama/ollama/blob/main/docs/api.md
- LM Studio: https://lmstudio.ai/docs/developer
- Jan AI: https://jan.ai/docs/desktop/api-server
- OpenWebUI: https://docs.openwebui.com/
- LocalAI: https://localai.io/basics/getting_started/