Azure Cosmos DB 技能
加载方式:base.md + [typescript.md | python.md]
Azure Cosmos DB 是一个全球分布式、多模型数据库,具有保证的低延迟、弹性可扩展性和多种一致性模型。
来源: Cosmos DB 文档 | 分区 | SDK
核心原则
明智选择分区键,为您的访问模式设计,理解一致性权衡。
Cosmos DB 在分区之间分配数据。您的分区键选择决定了可扩展性、性能和成本。为均匀分布和查询效率而设计。
Cosmos DB API
| API | 用例 |
|---|---|
| NoSQL (Core) | 文档数据库,最灵活 |
| MongoDB | 兼容 MongoDB 协议 |
| PostgreSQL | 分布式 PostgreSQL (Citus) |
| Apache Cassandra | 宽列存储 |
| Apache Gremlin | 图形数据库 |
| Table | 键值(与 Azure 表存储兼容) |
这项技能侧重于 NoSQL (Core) API - 最常见的选择。
关键概念
| 概念 | 描述 |
|---|---|
| 容器 | 项目集合(类似表) |
| 项目 | 单个文档/记录(JSON) |
| 分区键 | 决定数据分布 |
| 逻辑分区 | 具有相同分区键的项目 |
| 物理分区 | 存储单元(最大 50GB,10K RU/s) |
| RU(请求单元) | 吞吐量货币 |
分区键设计
好的分区键
// 高基数,均匀分布,用于查询
// 电子商务:用户数据的 userId
{ "id": "order-123", "userId": "user-456", ... } // PK: /userId
// 多租户:租户信息的 tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId
// IoT:遥测的 deviceId
{ "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId
// 日志:合成键(日期+类别)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKey
层次分区键
// 用于多级分布(例如,租户 → 用户)
// 容器创建时:/tenantId, /userId
{
"id": "order-123",
"tenantId": "acme-corp",
"userId": "user-456",
"items": [...]
}
// 在租户和用户内高效查询
不好的分区键
// 避免:
// - 低基数(状态,类型,布尔值)
// - 单调递增(时间戳,自增)
// - 频繁更新的字段
// - 查询中未使用的字段
// 不好:只有 3 个值 → 热分区
{ "status": "pending" | "completed" | "cancelled" }
// 不好:所有写入都进入最新分区
{ "timestamp": "2024-01-15T10:30:00Z" }
SDK 设置(TypeScript)
安装
npm install @azure/cosmos
初始化客户端
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// 或者使用连接字符串
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container {
return database.container(containerId);
}
类型定义
// types/cosmos.ts
export interface BaseItem {
id: string;
_ts?: number; // 自动生成的时间戳
_etag?: string; // 用于乐观并发
}
export interface User extends BaseItem {
userId: string; // 分区键
email: string;
name: string;
createdAt: string;
updatedAt: string;
}
export interface Order extends BaseItem {
userId: string; // 分区键
orderId: string;
items: OrderItem[];
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
createdAt: string;
}
export interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}
CRUD 操作
创建项目
import { getContainer } from './cosmosdb';
import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
const now = new Date().toISOString();
const user: User = {
id: crypto.randomUUID(),
...data,
createdAt: now,
updatedAt: now
};
const { resource } = await usersContainer.items.create(user);
return resource as User;
}
读取项目(点读取)
// 最有效的读取 - 需要 id 和分区键
async function getUser(userId: string, id: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(id, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// 如果 id 等于分区键值
async function getUserById(userId: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(userId, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
查询项目
// 分区内查询(高效)
async function getUserOrders(userId: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
})
.fetchAll();
return resources;
}
// 跨分区查询(慎用)
async function getOrdersByStatus(status: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.status = @status',
parameters: [{ name: '@status', value: status }]
})
.fetchAll();
return resources;
}
// 分页查询
async function getOrdersPaginated(
userId: string,
pageSize: number = 10,
continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query<Order>(
{
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
},
{
maxItemCount: pageSize,
continuationToken
}
);
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return {
items: resources,
continuationToken: nextToken
};
}
更新项目
// 替换整个项目
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
const { resource } = await usersContainer.item(id, userId).replace(updated);
return resource as User;
}
// 部分更新(补丁操作)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
const { resource } = await usersContainer.item(id, userId).patch(operations);
return resource as User;
}
// 用法:
await patchUser('user-123', 'user-123', [
{ op: 'set', path: '/name', value: 'New Name' },
{ op: 'set', path: '/updatedAt', value: new Date().toISOString() },
{ op: 'incr', path: '/loginCount', value: 1 }
]);
删除项目
async function deleteUser(userId: string, id: string): Promise<void> {
await usersContainer.item(id, userId).delete();
}
乐观并发(ETags)
async function updateUserWithETag(
userId: string,
id: string,
updates: Partial<User>,
etag: string
): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
try {
const { resource } = await usersContainer.item(id, userId).replace(updated, {
accessCondition: { type: 'IfMatch', condition: etag }
});
return resource as User;
} catch (error: any) {
if (error.code === 412) {
throw new Error('Document was modified by another process');
}
throw error;
}
}
一致性级别
| 级别 | 保证 | 延迟 | 用例 |
|---|---|---|---|
| 强 | 线性化读取 | 最高 | 财务,库存 |
| 有界陈旧 | 在边界内一致 | 高 | 排行榜,计数器 |
| 会话 | 读取您的写入 | 中等 | 用户会话(默认) |
| 一致前缀 | 有序读取 | 低 | 社交 Feed |
| 最终 | 无排序保证 | 最低 | 分析,日志 |
按请求设置一致性
// 覆盖默认一致性
const { resource } = await usersContainer.item(id, userId).read<User>({
consistencyLevel: 'Strong'
});
// 对于查询
const { resources } = await container.items.query(
{ query: 'SELECT * FROM c' },
{ consistencyLevel: 'BoundedStaleness' }
).fetchAll();
批量操作
事务性批量(同分区)
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
const ordersContainer = getContainer('orders');
const operations = [
{ operationType: 'Create' as const, resourceBody: order },
...items.map(item => ({
operationType: 'Create' as const,
resourceBody: { ...item, userId, orderId: order.orderId }
}))
];
const { result } = await ordersContainer.items.batch(operations, userId);
// 检查任何操作是否失败
if (result.some(r => r.statusCode >= 400)) {
throw new Error('Batch operation failed');
}
}
批量操作
// 适用于大规模导入(非事务性)
async function bulkImportUsers(users: User[]): Promise<void> {
const operations = users.map(user => ({
operationType: 'Create' as const,
resourceBody: user,
partitionKey: user.userId
}));
// 按块处理
const chunkSize = 100;
for (let i = 0; i < operations.length; i += chunkSize) {
const chunk = operations.slice(i, i + chunkSize);
await usersContainer.items.bulk(chunk);
}
}
变更源
处理更改
import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise<void> {
const container = getContainer('orders');
const changeFeedIterator = container.items.changeFeed({
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
});
while (changeFeedIterator.hasMoreResults) {
const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// 无新更改
await sleep(1000);
continue;
}
for (const item of items) {
console.log('Changed item:', item);
// 处理更改...
}
}
}
// 生产中,使用带租约容器的变更源处理器
变更源处理器模式
async function startChangeFeedProcessor(): Promise<void> {
const sourceContainer = getContainer('orders');
const leaseContainer = getContainer('leases');
const changeFeedProcessor = sourceContainer.items.changeFeed
.for(item => {
// 处理每个更改
console.log('Processing:', item);
})
.withLeaseContainer(leaseContainer)
.build();
await changeFeedProcessor.start();
}
Python SDK
安装
pip install azure-cosmos
设置和操作
# cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid
# 初始化客户端
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
def get_container(container_name: str):
return database.get_container_client(container_name)
# CRUD 操作
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict:
user_id = user_id or str(uuid.uuid4())
now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # 分区键
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]:
try:
return users_container.read_item(item=user_id, partition_key=user_id)
except CosmosResourceNotFoundError:
return None
def query_users(email_domain: str) -> List[dict]:
query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict:
user = get_user(user_id)
if not user:
raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None:
users_container.delete_item(item=user_id, partition_key=user_id)
# 分页查询
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}
索引
自定义索引策略
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/userId/?" },
{ "path": "/status/?" },
{ "path": "/createdAt/?" }
],
"excludedPaths": [
{ "path": "/content/*" },
{ "path": "/_etag/?" }
],
"compositeIndexes": [
[
{ "path": "/userId", "order": "ascending" },
{ "path": "/createdAt", "order": "descending" }
]
]
}
创建带索引的容器
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
indexingPolicy: {
indexingMode: 'consistent',
includedPaths: [
{ path: '/userId/?' },
{ path: '/status/?' },
{ path: '/createdAt/?' }
],
excludedPaths: [
{ path: '/*' } // 默认排除所有
]
}
});
吞吐量管理
预配吞吐量
// 容器级别
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
throughput: 1000 // RU/s
});
// 调整吞吐量
const container = database.container('orders');
await container.throughput.replace(2000);
自动扩展
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
maxThroughput: 10000 // 自动扩展 10% 至 100%
});
无服务器
// 不需要配置吞吐量
// 按请求付费(适合开发/测试,间歇性工作负载)
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] }
// 无吞吐量 = 无服务器
});
CLI 快速参考
# Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create \
--account-name myaccount \
--database-name mydb \
--name orders \
--partition-key-path /userId \
--throughput 400
# 查询
az cosmosdb sql query --account-name myaccount --database-name mydb \
--container-name orders --query "SELECT * FROM c"
# 密钥
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings
成本优化
| 策略 | 影响 |
|---|---|
| 正确的分区键 | 避免热分区(浪费 RUs) |
| 仅对您查询的内容建立索引 | 降低写入 RU 成本 |
| 使用点读取 | 1 RU 与 3+ RU 用于查询 |
| 开发/测试使用无服务器 | 按请求付费 |
| 生产使用自动扩展 | 在低流量期间缩小规模 |
| 临时数据的 TTL | 自动删除旧项目 |
时效生活 (TTL)
// 在容器上启用 TTL
await database.containers.createIfNotExists({
id: 'sessions',
partitionKey: { paths: ['/userId'] },
defaultTtl: 3600 // 1 小时
});
// 每个项目的 TTL
const session = {
id: 'session-123',
userId: 'user-456',
ttl: 1800 // 覆盖:30 分钟
};
反模式
- 不良分区键 - 低基数导致热分区
- 跨分区查询 - 成本高昂;设计用于单分区查询
- 过度索引 - 增加写入成本;仅对查询的路径建立索引
- 大项目 - 最大 2MB;在 Azure Blob Storage 中存储 Blob
- 忽略 RU 成本 - 监控和优化昂贵的查询
- 到处使用强一致性 - 使用会话(默认)除非必需
- 没有重试逻辑 - 使用指数退避处理 429(节流)
- 缺少 TTL - 为临时/会话数据设置 TTL