AzureCosmosDB技能 azure-cosmosdb

Azure Cosmos DB 是一个全球分布式、多模型数据库服务,提供低延迟、弹性可扩展性和多种一致性模型。掌握分区键设计、CRUD操作、查询优化、一致性级别选择和成本优化等关键技能。

Azure 0 次安装 1 次浏览 更新于 3/4/2026

Azure Cosmos DB 技能

加载方式:base.md + [typescript.md | python.md]

Azure Cosmos DB 是一个全球分布式、多模型数据库,具有保证的低延迟、弹性可扩展性和多种一致性模型。

来源: Cosmos DB 文档 | 分区 | SDK


核心原则

明智选择分区键,为您的访问模式设计,理解一致性权衡。

Cosmos DB 在分区之间分配数据。您的分区键选择决定了可扩展性、性能和成本。为均匀分布和查询效率而设计。


Cosmos DB API

API 用例
NoSQL (Core) 文档数据库,最灵活
MongoDB 兼容 MongoDB 协议
PostgreSQL 分布式 PostgreSQL (Citus)
Apache Cassandra 宽列存储
Apache Gremlin 图形数据库
Table 键值(与 Azure 表存储兼容)

这项技能侧重于 NoSQL (Core) API - 最常见的选择。


关键概念

概念 描述
容器 项目集合(类似表)
项目 单个文档/记录(JSON)
分区键 决定数据分布
逻辑分区 具有相同分区键的项目
物理分区 存储单元(最大 50GB,10K RU/s)
RU(请求单元) 吞吐量货币

分区键设计

好的分区键

// 高基数,均匀分布,用于查询

// 电子商务:用户数据的 userId
{ "id": "order-123", "userId": "user-456", ... }  // PK: /userId

// 多租户:租户信息的 tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... }  // PK: /tenantId

// IoT:遥测的 deviceId
{ "id": "reading-1", "deviceId": "device-789", ... }  // PK: /deviceId

// 日志:合成键(日期+类别)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... }  // PK: /partitionKey

层次分区键

// 用于多级分布(例如,租户 → 用户)
// 容器创建时:/tenantId, /userId

{
  "id": "order-123",
  "tenantId": "acme-corp",
  "userId": "user-456",
  "items": [...]
}

// 在租户和用户内高效查询

不好的分区键

// 避免:
// - 低基数(状态,类型,布尔值)
// - 单调递增(时间戳,自增)
// - 频繁更新的字段
// - 查询中未使用的字段

// 不好:只有 3 个值 → 热分区
{ "status": "pending" | "completed" | "cancelled" }

// 不好:所有写入都进入最新分区
{ "timestamp": "2024-01-15T10:30:00Z" }

SDK 设置(TypeScript)

安装

npm install @azure/cosmos

初始化客户端

// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';

const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;

const client = new CosmosClient({ endpoint, key });

// 或者使用连接字符串
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);

export const database: Database = client.database(databaseId);

export function getContainer(containerId: string): Container {
  return database.container(containerId);
}

类型定义

// types/cosmos.ts
export interface BaseItem {
  id: string;
  _ts?: number;      // 自动生成的时间戳
  _etag?: string;    // 用于乐观并发
}

export interface User extends BaseItem {
  userId: string;    // 分区键
  email: string;
  name: string;
  createdAt: string;
  updatedAt: string;
}

export interface Order extends BaseItem {
  userId: string;    // 分区键
  orderId: string;
  items: OrderItem[];
  total: number;
  status: 'pending' | 'paid' | 'shipped' | 'delivered';
  createdAt: string;
}

export interface OrderItem {
  productId: string;
  name: string;
  quantity: number;
  price: number;
}

CRUD 操作

创建项目

import { getContainer } from './cosmosdb';
import { User } from './types';

const usersContainer = getContainer('users');

async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
  const now = new Date().toISOString();
  const user: User = {
    id: crypto.randomUUID(),
    ...data,
    createdAt: now,
    updatedAt: now
  };

  const { resource } = await usersContainer.items.create(user);
  return resource as User;
}

读取项目(点读取)

// 最有效的读取 - 需要 id 和分区键
async function getUser(userId: string, id: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(id, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

// 如果 id 等于分区键值
async function getUserById(userId: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(userId, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

查询项目

// 分区内查询(高效)
async function getUserOrders(userId: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    })
    .fetchAll();

  return resources;
}

// 跨分区查询(慎用)
async function getOrdersByStatus(status: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.status = @status',
      parameters: [{ name: '@status', value: status }]
    })
    .fetchAll();

  return resources;
}

// 分页查询
async function getOrdersPaginated(
  userId: string,
  pageSize: number = 10,
  continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
  const ordersContainer = getContainer('orders');

  const queryIterator = ordersContainer.items.query<Order>(
    {
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    },
    {
      maxItemCount: pageSize,
      continuationToken
    }
  );

  const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();

  return {
    items: resources,
    continuationToken: nextToken
  };
}

更新项目

// 替换整个项目
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('User not found');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  const { resource } = await usersContainer.item(id, userId).replace(updated);
  return resource as User;
}

// 部分更新(补丁操作)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
  const { resource } = await usersContainer.item(id, userId).patch(operations);
  return resource as User;
}

// 用法:
await patchUser('user-123', 'user-123', [
  { op: 'set', path: '/name', value: 'New Name' },
  { op: 'set', path: '/updatedAt', value: new Date().toISOString() },
  { op: 'incr', path: '/loginCount', value: 1 }
]);

删除项目

async function deleteUser(userId: string, id: string): Promise<void> {
  await usersContainer.item(id, userId).delete();
}

乐观并发(ETags)

async function updateUserWithETag(
  userId: string,
  id: string,
  updates: Partial<User>,
  etag: string
): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('User not found');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  try {
    const { resource } = await usersContainer.item(id, userId).replace(updated, {
      accessCondition: { type: 'IfMatch', condition: etag }
    });
    return resource as User;
  } catch (error: any) {
    if (error.code === 412) {
      throw new Error('Document was modified by another process');
    }
    throw error;
  }
}

一致性级别

级别 保证 延迟 用例
线性化读取 最高 财务,库存
有界陈旧 在边界内一致 排行榜,计数器
会话 读取您的写入 中等 用户会话(默认)
一致前缀 有序读取 社交 Feed
最终 无排序保证 最低 分析,日志

按请求设置一致性

// 覆盖默认一致性
const { resource } = await usersContainer.item(id, userId).read<User>({
  consistencyLevel: 'Strong'
});

// 对于查询
const { resources } = await container.items.query(
  { query: 'SELECT * FROM c' },
  { consistencyLevel: 'BoundedStaleness' }
).fetchAll();

批量操作

事务性批量(同分区)

async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
  const ordersContainer = getContainer('orders');

  const operations = [
    { operationType: 'Create' as const, resourceBody: order },
    ...items.map(item => ({
      operationType: 'Create' as const,
      resourceBody: { ...item, userId, orderId: order.orderId }
    }))
  ];

  const { result } = await ordersContainer.items.batch(operations, userId);

  // 检查任何操作是否失败
  if (result.some(r => r.statusCode >= 400)) {
    throw new Error('Batch operation failed');
  }
}

批量操作

// 适用于大规模导入(非事务性)
async function bulkImportUsers(users: User[]): Promise<void> {
  const operations = users.map(user => ({
    operationType: 'Create' as const,
    resourceBody: user,
    partitionKey: user.userId
  }));

  // 按块处理
  const chunkSize = 100;
  for (let i = 0; i < operations.length; i += chunkSize) {
    const chunk = operations.slice(i, i + chunkSize);
    await usersContainer.items.bulk(chunk);
  }
}

变更源

处理更改

import { ChangeFeedStartFrom } from '@azure/cosmos';

async function processChangeFeed(): Promise<void> {
  const container = getContainer('orders');

  const changeFeedIterator = container.items.changeFeed({
    changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
  });

  while (changeFeedIterator.hasMoreResults) {
    const { result: items, statusCode } = await changeFeedIterator.fetchNext();

    if (statusCode === 304) {
      // 无新更改
      await sleep(1000);
      continue;
    }

    for (const item of items) {
      console.log('Changed item:', item);
      // 处理更改...
    }
  }
}

// 生产中,使用带租约容器的变更源处理器

变更源处理器模式

async function startChangeFeedProcessor(): Promise<void> {
  const sourceContainer = getContainer('orders');
  const leaseContainer = getContainer('leases');

  const changeFeedProcessor = sourceContainer.items.changeFeed
    .for(item => {
      // 处理每个更改
      console.log('Processing:', item);
    })
    .withLeaseContainer(leaseContainer)
    .build();

  await changeFeedProcessor.start();
}

Python SDK

安装

pip install azure-cosmos

设置和操作

# cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid

# 初始化客户端
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']

client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)


def get_container(container_name: str):
    return database.get_container_client(container_name)


# CRUD 操作
users_container = get_container('users')


def create_user(email: str, name: str, user_id: str = None) -> dict:
    user_id = user_id or str(uuid.uuid4())
    now = datetime.utcnow().isoformat()

    user = {
        'id': user_id,
        'userId': user_id,  # 分区键
        'email': email,
        'name': name,
        'createdAt': now,
        'updatedAt': now
    }

    return users_container.create_item(user)


def get_user(user_id: str) -> Optional[dict]:
    try:
        return users_container.read_item(item=user_id, partition_key=user_id)
    except CosmosResourceNotFoundError:
        return None


def query_users(email_domain: str) -> List[dict]:
    query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
    parameters = [{'name': '@domain', 'value': email_domain}]

    return list(users_container.query_items(
        query=query,
        parameters=parameters,
        enable_cross_partition_query=True
    ))


def update_user(user_id: str, **updates) -> dict:
    user = get_user(user_id)
    if not user:
        raise ValueError('User not found')

    user.update(updates)
    user['updatedAt'] = datetime.utcnow().isoformat()

    return users_container.replace_item(item=user_id, body=user)


def delete_user(user_id: str) -> None:
    users_container.delete_item(item=user_id, partition_key=user_id)


# 分页查询
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
    query = "SELECT * FROM c ORDER BY c.createdAt DESC"

    items = users_container.query_items(
        query=query,
        enable_cross_partition_query=True,
        max_item_count=page_size,
        continuation_token=continuation_token
    )

    page = items.by_page()
    results = list(next(page))

    return {
        'items': results,
        'continuation_token': page.continuation_token
    }

索引

自定义索引策略

{
  "indexingMode": "consistent",
  "automatic": true,
  "includedPaths": [
    { "path": "/userId/?" },
    { "path": "/status/?" },
    { "path": "/createdAt/?" }
  ],
  "excludedPaths": [
    { "path": "/content/*" },
    { "path": "/_etag/?" }
  ],
  "compositeIndexes": [
    [
      { "path": "/userId", "order": "ascending" },
      { "path": "/createdAt", "order": "descending" }
    ]
  ]
}

创建带索引的容器

await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  indexingPolicy: {
    indexingMode: 'consistent',
    includedPaths: [
      { path: '/userId/?' },
      { path: '/status/?' },
      { path: '/createdAt/?' }
    ],
    excludedPaths: [
      { path: '/*' }  // 默认排除所有
    ]
  }
});

吞吐量管理

预配吞吐量

// 容器级别
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  throughput: 1000  // RU/s
});

// 调整吞吐量
const container = database.container('orders');
await container.throughput.replace(2000);

自动扩展

await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  maxThroughput: 10000  // 自动扩展 10% 至 100%
});

无服务器

// 不需要配置吞吐量
// 按请求付费(适合开发/测试,间歇性工作负载)
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] }
  // 无吞吐量 = 无服务器
});

CLI 快速参考

# Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create \
  --account-name myaccount \
  --database-name mydb \
  --name orders \
  --partition-key-path /userId \
  --throughput 400

# 查询
az cosmosdb sql query --account-name myaccount --database-name mydb \
  --container-name orders --query "SELECT * FROM c"

# 密钥
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings

成本优化

策略 影响
正确的分区键 避免热分区(浪费 RUs)
仅对您查询的内容建立索引 降低写入 RU 成本
使用点读取 1 RU 与 3+ RU 用于查询
开发/测试使用无服务器 按请求付费
生产使用自动扩展 在低流量期间缩小规模
临时数据的 TTL 自动删除旧项目

时效生活 (TTL)

// 在容器上启用 TTL
await database.containers.createIfNotExists({
  id: 'sessions',
  partitionKey: { paths: ['/userId'] },
  defaultTtl: 3600  // 1 小时
});

// 每个项目的 TTL
const session = {
  id: 'session-123',
  userId: 'user-456',
  ttl: 1800  // 覆盖:30 分钟
};

反模式

  • 不良分区键 - 低基数导致热分区
  • 跨分区查询 - 成本高昂;设计用于单分区查询
  • 过度索引 - 增加写入成本;仅对查询的路径建立索引
  • 大项目 - 最大 2MB;在 Azure Blob Storage 中存储 Blob
  • 忽略 RU 成本 - 监控和优化昂贵的查询
  • 到处使用强一致性 - 使用会话(默认)除非必需
  • 没有重试逻辑 - 使用指数退避处理 429(节流)
  • 缺少 TTL - 为临时/会话数据设置 TTL