MongoDB模式和最佳实践
概览
MongoDB是一个文档型NoSQL数据库,提供高性能、高可用性和易扩展性。这项技能涵盖了模式设计模式、查询优化、索引策略和构建MongoDB应用程序的最佳实践。
前提条件
- 理解NoSQL数据库概念
- 了解JavaScript/TypeScript
- 熟悉文档数据结构
- 基本了解数据库索引
关键概念
MongoDB架构
- 文档模型:数据存储为BSON文档(类似JSON)
- 集合:文档的集合(类似于SQL中的表)
- 索引:用于高效查询的B-tree索引
- 副本集:通过复制实现高可用性
- 分片:跨多个服务器的水平扩展
模式设计哲学
MongoDB的灵活模式允许:
- 嵌入式文档:单个文档中的相关数据
- 引用:文档之间的链接
- 混合方法:两者的结合
- 模式验证:在数据库级别强制数据结构
实施指南
模式设计原则
为读取性能而反规范化
// 好:为读取繁重的工作负载反规范化
interface PostWithAuthor {
_id: string
title: string
content: string
authorName: string // 为更快的读取而反规范化
authorEmail: string
createdAt: Date
}
// 坏:总是需要连接
interface Post {
_id: string
title: string
content: string
authorId: ObjectId // 需要连接以获取作者信息
createdAt: Date
}
嵌入与引用
// 好:对于1对少的关系嵌入
interface User {
_id: string
name: string
preferences: {
theme: 'light' | 'dark'
notifications: boolean
} // 嵌入 - 小型,很少变化
createdAt: Date
}
// 好:对于1对多或多对多引用
interface User {
_id: string
name: string
posts: ObjectId[] // 引用 - 许多帖子,可以增长
createdAt: Date
}
interface Post {
_id: string
title: string
authorId: ObjectId // 引用回用户
createdAt: Date
}
模式版本控制
interface User {
_id: string
name: string
email: string
version: number // 模式版本
createdAt: Date
}
// 迁移策略
async function migrateUserSchema() {
const users = await User.find({ version: { $lt: 2 } })
for (const user of users) {
await User.updateOne(
{ _id: user._id },
{
$set: {
'newField': 'default value',
version: 2
}
}
)
}
}
Mongoose设置
连接配置
// config/database.ts
import mongoose from 'mongoose'
const MONGODB_URI = process.env.MONGODB_URI || 'mongodb://localhost:27017/myapp'
export async function connectDatabase() {
try {
await mongoose.connect(MONGODB_URI)
console.log('Connected to MongoDB')
} catch (error) {
console.error('MongoDB connection error:', error)
process.exit(1)
}
}
export async function disconnectDatabase() {
await mongoose.disconnect()
console.log('Disconnected from MongoDB')
}
连接池
// config/database.ts
import mongoose from 'mongoose'
const options = {
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000,
maxPoolSize: 50,
minPoolSize: 5,
}
export async function connectDatabase() {
await mongoose.connect(process.env.MONGODB_URI, options)
}
多个连接
// config/database.ts
import mongoose from 'mongoose'
const connections: Record<string, mongoose.Connection> = {}
export function getConnection(name: string = 'default'): mongoose.Connection {
if (!connections[name]) {
connections[name] = mongoose.createConnection(name)
}
return connections[name]
}
export async function connectAll() {
await Promise.all(Object.values(connections).map(conn => conn.asPromise()))
}
export async function disconnectAll() {
await Promise.all(Object.values(connections).map(conn => conn.close()))
}
模型定义
基本模型
// models/User.ts
import mongoose, { Document, Schema } from 'mongoose'
interface IUser {
name: string
email: string
password: string
createdAt: Date
updatedAt: Date
}
const userSchema = new Schema<IUser>({
name: {
type: String,
required: true,
trim: true,
minlength: 2,
maxlength: 50
},
email: {
type: String,
required: true,
unique: true,
trim: true,
lowercase: true
},
password: {
type: String,
required: true,
minlength: 8
},
}, {
timestamps: true,
})
export const User = mongoose.model<IUser>('User', userSchema)
带方法的模型
// models/User.ts
import mongoose, { Document, Schema } from 'mongoose'
interface IUser {
name: string
email: string
password: string
comparePassword(candidatePassword: string): Promise<boolean>
}
const userSchema = new Schema<IUser>({
name: { type: String, required: true },
email: { type: String, required: true, unique: true },
password: { type: String, required: true },
})
userSchema.methods.comparePassword = async function(candidatePassword: string) {
const bcrypt = require('bcrypt')
return bcrypt.compare(candidatePassword, this.password)
}
export const User = mongoose.model<IUser>('User', userSchema)
带虚拟属性的模型
// models/User.ts
import mongoose, { Document, Schema } from 'mongoose'
interface IUser {
firstName: string
lastName: string
}
const userSchema = new Schema<IUser>({
firstName: { type: String, required: true },
lastName: { type: String, required: true },
})
userSchema.virtual('fullName').get(function() {
return `${this.firstName} ${this.lastName}`
})
userSchema.virtual('fullName').set(function(v) {
const parts = v.split(' ')
this.firstName = parts[0] || ''
this.lastName = parts[1] || ''
})
// 包含虚拟属性在JSON中
userSchema.set('toJSON', { virtuals: true })
export const User = mongoose.model<IUser>('User', userSchema)
带钩子的模型
// models/User.ts
import mongoose, { Schema } from 'mongoose'
const userSchema = new Schema({
name: { type: String, required: true },
email: { type: String, required: true },
})
// 预保存钩子
userSchema.pre('save', async function(next) {
console.log('Saving user:', this.name)
next()
})
// 保存后钩子
userSchema.post('save', async function(doc, next) {
console.log('User saved:', doc.name)
next()
})
// 预删除钩子
userSchema.pre('remove', async function(next) {
console.log('Removing user:', this.name)
// 清理相关数据
await Post.deleteMany({ authorId: this._id })
next()
})
// 删除后钩子
userSchema.post('remove', async function(doc, next) {
console.log('User removed:', doc.name)
next()
})
export const User = mongoose.model('User', userSchema)
查询模式
基本CRUD操作
// 创建
const user = await User.create({
name: 'John Doe',
email: 'john@example.com',
password: 'hashed_password'
})
// 读取 - 查找全部
const users = await User.find().sort({ createdAt: -1 })
// 读取 - 查找一个
const user = await User.findById(userId)
// 读取 - 根据条件查找
const user = await User.findOne({ email: 'john@example.com' })
// 更新
const updatedUser = await User.findByIdAndUpdate(
userId,
{ name: 'John Smith' },
{ new: true }
)
// 删除
await User.findByIdAndDelete(userId)
// 删除多个
await User.deleteMany({ status: 'inactive' })
高级过滤
// 字符串查询
const users = await User.find({
name: { $regex: /^John/i }, // 不区分大小写
email: { $in: ['test@example.com', 'admin@example.com'] },
})
// 数字查询
const products = await Product.find({
price: { $gte: 10, $lte: 100 },
quantity: { $gt: 0 }
})
// 日期查询
const posts = await Post.find({
createdAt: {
$gte: new Date('2024-01-01'),
$lte: new Date('2024-12-31')
}
})
// 数组查询
const users = await User.find({
tags: { $all: ['admin', 'moderator'] }, // 必须拥有全部标签
tags: { $in: ['admin'] }, // 必须至少拥有一个标签
tags: { $size: 2 }, // 必须恰好拥有2个标签
})
// 逻辑运算符
const users = await User.find({
$or: [
{ email: 'john@example.com' },
{ name: 'John Doe' }
],
$and: [
{ status: 'active' },
{ age: { $gte: 18 } }
]
})
// 否定
const users = await User.find({
email: { $ne: 'admin@example.com' }, // 不等于
status: { $ne: 'inactive' }
})
投影
// 选择特定字段
const users = await User.find({})
.select('name email createdAt')
// 排除字段
const users = await User.find({})
.select('-password -__v')
// 嵌套投影
const posts = await Post.find({})
.select('title author.name author.email')
分页
// 跳过和限制
const page = 1
const limit = 10
const users = await User.find({})
.skip((page - 1) * limit)
.limit(limit)
.sort({ createdAt: -1 })
// 获取分页总数
const total = await User.countDocuments()
const totalPages = Math.ceil(total / limit)
排序
// 单字段排序
const users = await User.find({}).sort({ name: 1 })
// 多字段排序
const users = await User.find({}).sort({ name: 1, createdAt: -1 })
// 复杂排序
const users = await User.find({}).sort({
name: 1,
createdAt: -1,
score: -1
})
聚合管道
基本聚合
// 按组分组并计数
const result = await User.aggregate([
{
$group: {
_id: '$status',
count: { $sum: 1 }
}
}
])
// 平均值计算
const result = await Product.aggregate([
{
$group: {
_id: '$category',
avgPrice: { $avg: '$price' }
}
}
])
查找和展开
// 将用户与其帖子连接
const result = await User.aggregate([
{
$lookup: {
from: 'posts',
localField: '_id',
foreignField: 'authorId',
as: 'userPosts'
}
},
{
$unwind: '$userPosts'
}
])
多面搜索
// 多面搜索及计数
const result = await Product.aggregate([
{
$match: {
category: category,
price: { $gte: minPrice, $lte: maxPrice },
tags: { $in: selectedTags }
}
},
{
$facet: {
categories: [
{ $group: { _id: '$category' } },
{ $group: { _id: '$brand' } }
],
priceRanges: [
{
$bucketAuto: {
buckets: [
{ to: 50, label: 'Under $50' },
{ from: 50, to: 100, label: '$50 - $100' },
{ from: 100, to: 200, label: '$100 - $200' },
{ from: 200, label: 'Over $200' }
],
outputPath: 'price'
}
}
]
}
}
])
时间序列聚合
// 每日聚合
const result = await Order.aggregate([
{
$match: {
createdAt: {
$gte: new Date('2024-01-01'),
$lte: new Date('2024-01-31')
}
}
},
{
$group: {
_id: {
year: { $year: '$createdAt' },
month: { $month: '$createdAt' },
day: { $dayOfMonth: '$createdAt' }
},
totalSales: { $sum: '$total' },
avgOrderValue: { $avg: '$total' }
}
},
{
$sort: {
'_id.year': 1,
'_id.month': 1,
'_id.day': 1
}
}
])
多阶段管道
const result = await Order.aggregate([
// 第1阶段:匹配文档
{
$match: {
status: 'completed'
}
},
// 第2阶段:按用户分组
{
$group: {
_id: '$userId',
totalSpent: { $sum: '$total' },
orderCount: { $sum: 1 }
}
},
// 第3阶段:按总花费排序
{
$sort: { totalSpent: -1 }
},
// 第4阶段:限制结果
{
$limit: 10
}
])
索引策略
单字段索引
// models/User.ts
import mongoose, { Schema } from 'mongoose'
const userSchema = new Schema({
email: { type: String, unique: true, index: true },
name: { type: String, index: true },
createdAt: { type: Date, index: true }
})
复合索引
// models/Product.ts
import mongoose, { Schema } from 'mongoose'
const productSchema = new Schema({
name: { type: String },
category: { type: String },
price: { type: Number },
stock: { type: Number },
})
// 为类别+价格创建复合索引
productSchema.index({ category: 1, price: -1 })
// 为库存+类别创建复合索引
productSchema.index({ stock: 1, category: 1 })
文本索引
// models/Article.ts
import mongoose, { Schema } from 'mongoose'
const articleSchema = new Schema({
title: { type: String },
content: { type: String },
tags: [String],
})
// 为全文搜索创建文本索引
articleSchema.index({ title: 'text', content: 'text' })
// 为标签创建文本索引
articleSchema.index({ tags: 1 })
地理空间索引
// models/Location.ts
import mongoose, { Schema } from 'mongoose'
const locationSchema = new Schema({
name: { type: String },
location: {
type: {
type: 'Point',
coordinates: [Number]
},
index: '2dsphere'
}
})
TTL索引
// models/Session.ts
import mongoose, { Schema } from 'mongoose'
const sessionSchema = new Schema({
userId: { type: String },
token: { type: String },
expiresAt: { type: Date, index: { expiresAt: 1 } }
})
// 为自动过期创建TTL索引
sessionSchema.index({ expiresAt: 1 }, { expireAfterSeconds: 3600 })
事务
基本事务
import mongoose from 'mongoose'
const session = await mongoose.startSession()
try {
const user = await User.create([{ name: 'John' }], { session })
const order = await Order.create([
{ userId: user[0]._id, total: 100 }
], { session })
await session.commitTransaction()
console.log('Transaction committed')
} catch (error) {
await session.abortTransaction()
console.error('Transaction aborted:', error)
}
多操作事务
const session = await mongoose.startSession()
try {
// 创建用户
const user = await User.create([{ name: 'John' }], { session })
// 创建帖子
await Order.create([
{ title: 'Post 1', authorId: user[0]._id },
{ title: 'Post 2', authorId: user[0]._id }
], { session })
// 更新用户统计信息
await User.findByIdAndUpdate(
user[0]._id,
{ postCount: 2 },
{ session }
)
await session.commitTransaction()
} catch (error) {
await session.abortTransaction()
throw error
}
带错误处理的事务
async function transferFunds(fromId: string, toId: string, amount: number) {
const session = await mongoose.startSession()
try {
// 获取两个用户
const [fromUser, toUser] = await User.find({
_id: { $in: [fromId, toId] }
}).session(session)
if (!fromUser || !toUser) {
throw new Error('User not found')
}
if (fromUser.balance < amount) {
throw new Error('Insufficient funds')
}
// 转移资金
await User.findByIdAndUpdate(
fromId,
{ $inc: { balance: -amount } },
{ session }
)
await User.findByIdAndUpdate(
toId,
{ $inc: { balance: amount } },
{ session }
)
// 创建交易记录
await Transaction.create([
{ fromId, toId, amount, type: 'transfer' }
], { session })
await session.commitTransaction()
return { success: true }
} catch (error) {
await session.abortTransaction()
throw error
}
}
变更流
观察更改
// models/User.ts
import mongoose, { Schema } from 'mongoose'
const userSchema = new Schema({
name: { type: String },
email: { type: String },
status: { type: String, enum: ['active', 'inactive'] }
})
// 观察更改
const changeStream = User.watch()
changeStream.on('change', (change) => {
console.log('Change detected:', change)
switch (change.operationType) {
case 'insert':
console.log('New user:', change.fullDocument)
break
case 'update':
console.log('User updated:', change.fullDocument)
break
case 'delete':
console.log('User deleted:', change.documentKey)
break
}
})
带管道的变更流
const userChangeStream = User.watch()
userChangeStream.on('change', async (change) => {
if (change.operationType === 'update') {
// 获取相关数据
const posts = await Post.find({ authorId: change.documentKey })
// 更新帖子中的非规范化数据
await Post.updateMany(
{ authorId: change.documentKey },
{ $set: { authorName: change.fullDocument.name } }
)
}
})
聚合变更流
const changeStream = User.watch()
changeStream.on('change', async (change) => {
// 聚合更改上的统计信息
const stats = await User.aggregate([
{ $group: { _id: '$status', count: { $sum: 1 } }
])
console.log('Updated stats:', stats)
})
性能优化
查询优化
// 好:对于只读操作使用lean()
const users = await User.find({}).lean()
// 坏:不需要时获取完整文档
const users = await User.find({})
// 好:只选择需要的字段
const users = await User.find({})
.select('name email')
// 坏:获取所有字段
const users = await User.find({})
大型文档的投影
// 好:对于大型文档使用投影
const posts = await Post.find({})
.select('title createdAt author.name')
// 坏:获取整个文档
const posts = await Post.find({})
基于游标的分页
// 好:对于大型数据集使用游标
const users = await User.find({})
.sort({ _id: 1 })
.cursor()
// 坏:对于大型数据集使用Skip/Limit
const users = await User.find({})
.skip(1000)
.limit(100)
批量操作
// 好:对于插入使用批量操作
const users = await User.insertMany([
{ name: 'User 1', email: 'user1@example.com' },
{ name: 'User 2', email: 'user2@example.com' },
{ name: 'User 3', email: 'user3@example.com' },
])
// 坏:多个单独插入
for (const userData of userDataArray) {
await User.create(userData)
}
索引使用
// 好:确保查询字段存在索引
const users = await User.find({ email: 'john@example.com' })
// 电子邮件具有唯一索引
const products = await Product.find({
category: 'electronics',
price: { $gte: 100, $lte: 500 }
})
// 类别和价格具有复合索引
数据验证
模式验证
// models/User.ts
import mongoose, { Schema } from 'mongoose'
const userSchema = new Schema({
name: {
type: String,
required: [true, 'Name is required'],
minlength: 2,
maxlength: 50
},
email: {
type: String,
required: true,
unique: true,
trim: true,
lowercase: true,
validate: {
validator: (v: string) => /^[^\w-\.]+@[a-zA-Z0-9]+\.[a-zA-Z]{2,}$/.test(v),
message: 'Invalid email format'
}
},
age: {
type: Number,
min: 18,
max: 120,
validate: {
validator: (v: number) => Number.isInteger(v) && v >= 18,
message: 'Age must be an integer >= 18'
}
}
})
自定义验证器
// validators/password.validator.ts
import { Schema } from 'mongoose'
export const passwordValidator = {
validator: (v: string) => {
if (v.length < 8) {
return false
}
if (!/[A-Z]/.test(v)) {
return false
}
if (!/[a-z]/.test(v)) {
return false
}
if (!/[0-9]/.test(v)) {
return false
}
return true
},
message: 'Password must contain uppercase, lowercase, and numbers'
}
// 在模式中的使用
const userSchema = new Schema({
password: {
type: String,
required: true,
validate: passwordValidator
}
})
异步验证
// validators/unique.validator.ts
import { Schema } from 'mongoose'
export const uniqueEmailValidator = {
validator: async (v: string) => {
const user = await User.findOne({ email: v })
return !user
},
message: 'Email already exists'
}
// 在模式中的使用
const userSchema = new Schema({
email: {
type: String,
required: true,
unique: true,
validate: uniqueEmailValidator
}
})
最佳实践
-
模式设计
- 对于1对少的关系使用嵌入
- 对于1对多或多对多使用引用
- 实施模式版本控制
- 为读取繁重的工作负载反规范化
-
查询优化
- 对于只读操作使用lean()
- 只选择需要的字段
- 使用适当的索引
- 避免大型结果集
-
索引策略
- 在频繁查询的字段上创建索引
- 对于多字段查询使用复合索引
- 监控并移除未使用的索引
- 考虑为基于时间的数据使用TTL索引
-
性能
- 使用连接池
- 对于大型数据集实现分页
- 适当时使用批量操作
- 监控查询性能
-
数据完整性
- 实施模式验证
- 对于多文档操作使用事务
- 优雅地处理写入冲突
- 实施适当的错误处理
-
变更流
- 对于实时更新使用变更流
- 实施适当的错误处理
- 考虑聚合统计信息
- 处理重新连接场景