配置Dapr发布订阅 configuring-dapr-pubsub

本技能详细讲解如何为事件驱动微服务架构配置Dapr发布/订阅组件,支持Kafka和Redis两种主流消息中间件。涵盖从基础组件配置、订阅模式(声明式与编程式)、事件发布到Kubernetes生产部署的全流程,特别适用于构建多智能体(Agent)通信系统、微服务间解耦以及云原生应用开发。关键词:Dapr发布订阅,微服务事件驱动,Kafka配置,Redis消息队列,云原生架构,智能体通信,Kubernetes部署,FastAPI集成。

微服务 0 次安装 0 次浏览 更新于 3/2/2026

name: 配置Dapr发布订阅 description: | 为基于事件驱动的微服务配置Dapr发布/订阅组件,支持Kafka或Redis后端。 适用于连接智能体间通信、设置事件订阅或集成Dapr边车。 涵盖组件配置、订阅模式、发布事件和Kubernetes部署。 不适用于直接使用Kafka客户端或非Dapr消息模式。

配置Dapr发布/订阅

使用Dapr发布/订阅连接基于事件驱动的微服务,支持Kafka或Redis后端。

快速开始

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
# 应用组件
kubectl apply -f components/pubsub.yaml

# 使用Dapr CLI测试
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'

组件配置

Kafka(生产环境)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    # 必填项
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"

    # 消费者设置
    - name: consumerGroup
      value: "{namespace}-{appId}"  # 按部署模板化
    - name: consumeRetryInterval
      value: "100ms"
    - name: heartbeatInterval
      value: "3s"
    - name: sessionTimeout
      value: "10s"

    # 性能设置
    - name: maxMessageBytes
      value: "1048576"  # 1MB
    - name: channelBufferSize
      value: "256"

带SASL认证的Kafka

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-secure
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka.example.com:9093"
    - name: authType
      value: "password"
    - name: saslUsername
      value: "dapr-user"
    - name: saslPassword
      secretKeyRef:
        name: kafka-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"

Redis(开发/简单场景)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis-master.redis.svc.cluster.local:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secrets
        key: password

订阅模式

声明式订阅(推荐)

# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: task-created-subscription
spec:
  pubsubname: pubsub
  topic: task-created
  routes:
    default: /dapr/task-created
  scopes:
    - triage-agent
    - concepts-agent

编程式订阅(FastAPI)

from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/dapr/subscribe")
async def subscribe():
    """Dapr调用此接口发现订阅。"""
    return [
        {
            "pubsubname": "pubsub",
            "topic": "task-created",
            "route": "/dapr/task-created"
        },
        {
            "pubsubname": "pubsub",
            "topic": "task-completed",
            "route": "/dapr/task-completed"
        }
    ]

@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
    """处理传入的CloudEvent。"""
    event = await request.json()

    # CloudEvent包装器 - 数据嵌套在内
    task_data = event.get("data", event)
    task_id = task_data.get("task_id")

    # 处理事件
    print(f"任务已创建: {task_id}")

    return {"status": "SUCCESS"}

发布事件

从FastAPI服务发布

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    """通过Dapr边车发布事件。"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()

# 用法
await publish_event("task-created", {
    "task_id": "123",
    "title": "学习Python",
    "user_id": "user-456"
})

使用CloudEvent元数据

async def publish_cloudevent(topic: str, data: dict, event_type: str):
    """使用显式CloudEvent字段发布。"""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={
                "Content-Type": "application/cloudevents+json",
                "ce-specversion": "1.0",
                "ce-type": event_type,
                "ce-source": "triage-agent",
                "ce-id": str(uuid.uuid4())
            }
        )

Kubernetes部署

组件作用域限制

限制组件对特定应用的访问:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka:9092"
scopes:
  - triage-agent
  - concepts-agent
  - debug-agent

带Dapr边车的应用部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: triage-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: triage-agent
  template:
    metadata:
      labels:
        app: triage-agent
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "triage-agent"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
        - name: triage-agent
          image: myapp/triage-agent:latest
          ports:
            - containerPort: 8000
          env:
            - name: DAPR_HTTP_PORT
              value: "3500"

多智能体路由模式

分诊智能体 → 专业智能体

# triage_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.post("/api/question")
async def handle_question(request: Request):
    data = await request.json()
    question = data["question"]

    # 基于内容路由
    if "python" in question.lower() or "code" in question.lower():
        topic = "concepts-request"
    elif "error" in question.lower() or "bug" in question.lower():
        topic = "debug-request"
    else:
        topic = "concepts-request"  # 默认

    # 发布到相应的智能体
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json={
                "question": question,
                "user_id": data["user_id"],
                "session_id": data["session_id"]
            }
        )

    return {"status": "routed", "topic": topic}

专业智能体处理器

# concepts_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.get("/dapr/subscribe")
async def subscribe():
    return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]

@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
    event = await request.json()
    data = event.get("data", event)

    # 使用LLM处理
    response = await process_with_llm(data["question"])

    # 发布响应
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
            json={
                "session_id": data["session_id"],
                "response": response,
                "agent": "concepts"
            }
        )

    return {"status": "SUCCESS"}

本地开发

使用Dapr CLI运行

# 先启动订阅者
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
  --resources-path ./components -- uvicorn concepts:app --port 8001

# 启动发布者
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
  --resources-path ./components -- uvicorn triage:app --port 8000

带Dapr的Docker Compose

version: "3.8"
services:
  triage-agent:
    build: ./services/triage
    ports:
      - "8000:8000"

  triage-agent-dapr:
    image: daprio/daprd:latest
    command: ["./daprd",
      "--app-id", "triage-agent",
      "--app-port", "8000",
      "--dapr-http-port", "3500",
      "--resources-path", "/components"
    ]
    volumes:
      - ./components:/components
    network_mode: "service:triage-agent"
    depends_on:
      - triage-agent

  kafka:
    image: confluentinc/cp-kafka:latest
    # ... kafka配置

故障排除

检查Dapr边车

# 查看边车日志
kubectl logs deploy/triage-agent -c daprd

# 检查组件注册
curl http://localhost:3500/v1.0/metadata

常见问题

错误 原因 修复方法
component not found 组件未加载 检查--resources-path或K8s命名空间
connection refused Kafka无法访问 验证组件中的broker地址
consumer group rebalance 多个实例 每个应用使用唯一的consumerGroup
event not received 主题/路由错误 检查订阅配置

调试事件流

# 发布测试事件
dapr publish --pubsub pubsub --topic test --data '{"test": true}'

# 检查消费者日志
kubectl logs deploy/my-app -c daprd | grep -i subscribe

验证

运行:python scripts/verify.py

相关技能

  • deploying-kafka-k8s - 使用Strimzi设置Kafka集群
  • scaffolding-fastapi-dapr - 带Dapr的FastAPI服务
  • scaffolding-openai-agents - 智能体编排模式