name: 配置Dapr发布订阅 description: | 为基于事件驱动的微服务配置Dapr发布/订阅组件,支持Kafka或Redis后端。 适用于连接智能体间通信、设置事件订阅或集成Dapr边车。 涵盖组件配置、订阅模式、发布事件和Kubernetes部署。 不适用于直接使用Kafka客户端或非Dapr消息模式。
配置Dapr发布/订阅
使用Dapr发布/订阅连接基于事件驱动的微服务,支持Kafka或Redis后端。
快速开始
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
- name: disableTls
value: "true"
# 应用组件
kubectl apply -f components/pubsub.yaml
# 使用Dapr CLI测试
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'
组件配置
Kafka(生产环境)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
# 必填项
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
# 消费者设置
- name: consumerGroup
value: "{namespace}-{appId}" # 按部署模板化
- name: consumeRetryInterval
value: "100ms"
- name: heartbeatInterval
value: "3s"
- name: sessionTimeout
value: "10s"
# 性能设置
- name: maxMessageBytes
value: "1048576" # 1MB
- name: channelBufferSize
value: "256"
带SASL认证的Kafka
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-secure
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka.example.com:9093"
- name: authType
value: "password"
- name: saslUsername
value: "dapr-user"
- name: saslPassword
secretKeyRef:
name: kafka-secrets
key: password
- name: saslMechanism
value: "SCRAM-SHA-256"
Redis(开发/简单场景)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "redis-master.redis.svc.cluster.local:6379"
- name: redisPassword
secretKeyRef:
name: redis-secrets
key: password
订阅模式
声明式订阅(推荐)
# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-created-subscription
spec:
pubsubname: pubsub
topic: task-created
routes:
default: /dapr/task-created
scopes:
- triage-agent
- concepts-agent
编程式订阅(FastAPI)
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/dapr/subscribe")
async def subscribe():
"""Dapr调用此接口发现订阅。"""
return [
{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
},
{
"pubsubname": "pubsub",
"topic": "task-completed",
"route": "/dapr/task-completed"
}
]
@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
"""处理传入的CloudEvent。"""
event = await request.json()
# CloudEvent包装器 - 数据嵌套在内
task_data = event.get("data", event)
task_id = task_data.get("task_id")
# 处理事件
print(f"任务已创建: {task_id}")
return {"status": "SUCCESS"}
发布事件
从FastAPI服务发布
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
"""通过Dapr边车发布事件。"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
# 用法
await publish_event("task-created", {
"task_id": "123",
"title": "学习Python",
"user_id": "user-456"
})
使用CloudEvent元数据
async def publish_cloudevent(topic: str, data: dict, event_type: str):
"""使用显式CloudEvent字段发布。"""
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={
"Content-Type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-type": event_type,
"ce-source": "triage-agent",
"ce-id": str(uuid.uuid4())
}
)
Kubernetes部署
组件作用域限制
限制组件对特定应用的访问:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka:9092"
scopes:
- triage-agent
- concepts-agent
- debug-agent
带Dapr边车的应用部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: triage-agent
spec:
replicas: 2
selector:
matchLabels:
app: triage-agent
template:
metadata:
labels:
app: triage-agent
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "triage-agent"
dapr.io/app-port: "8000"
dapr.io/enable-api-logging: "true"
spec:
containers:
- name: triage-agent
image: myapp/triage-agent:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
多智能体路由模式
分诊智能体 → 专业智能体
# triage_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.post("/api/question")
async def handle_question(request: Request):
data = await request.json()
question = data["question"]
# 基于内容路由
if "python" in question.lower() or "code" in question.lower():
topic = "concepts-request"
elif "error" in question.lower() or "bug" in question.lower():
topic = "debug-request"
else:
topic = "concepts-request" # 默认
# 发布到相应的智能体
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json={
"question": question,
"user_id": data["user_id"],
"session_id": data["session_id"]
}
)
return {"status": "routed", "topic": topic}
专业智能体处理器
# concepts_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.get("/dapr/subscribe")
async def subscribe():
return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]
@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
event = await request.json()
data = event.get("data", event)
# 使用LLM处理
response = await process_with_llm(data["question"])
# 发布响应
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
json={
"session_id": data["session_id"],
"response": response,
"agent": "concepts"
}
)
return {"status": "SUCCESS"}
本地开发
使用Dapr CLI运行
# 先启动订阅者
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
--resources-path ./components -- uvicorn concepts:app --port 8001
# 启动发布者
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
--resources-path ./components -- uvicorn triage:app --port 8000
带Dapr的Docker Compose
version: "3.8"
services:
triage-agent:
build: ./services/triage
ports:
- "8000:8000"
triage-agent-dapr:
image: daprio/daprd:latest
command: ["./daprd",
"--app-id", "triage-agent",
"--app-port", "8000",
"--dapr-http-port", "3500",
"--resources-path", "/components"
]
volumes:
- ./components:/components
network_mode: "service:triage-agent"
depends_on:
- triage-agent
kafka:
image: confluentinc/cp-kafka:latest
# ... kafka配置
故障排除
检查Dapr边车
# 查看边车日志
kubectl logs deploy/triage-agent -c daprd
# 检查组件注册
curl http://localhost:3500/v1.0/metadata
常见问题
| 错误 | 原因 | 修复方法 |
|---|---|---|
component not found |
组件未加载 | 检查--resources-path或K8s命名空间 |
connection refused |
Kafka无法访问 | 验证组件中的broker地址 |
consumer group rebalance |
多个实例 | 每个应用使用唯一的consumerGroup |
event not received |
主题/路由错误 | 检查订阅配置 |
调试事件流
# 发布测试事件
dapr publish --pubsub pubsub --topic test --data '{"test": true}'
# 检查消费者日志
kubectl logs deploy/my-app -c daprd | grep -i subscribe
验证
运行:python scripts/verify.py
相关技能
deploying-kafka-k8s- 使用Strimzi设置Kafka集群scaffolding-fastapi-dapr- 带Dapr的FastAPI服务scaffolding-openai-agents- 智能体编排模式