name: mongodb-app-development version: “2.1.0” description: 掌握在应用程序中使用Node.js、Python和Java驱动程序集成MongoDB。学习连接、事务、错误处理和最佳实践。在构建使用MongoDB的应用程序时使用。 sasmp_version: “1.3.0” bonded_agent: 07-mongodb-application-development bond_type: PRIMARY_BOND
生产级技能配置
capabilities:
- 驱动程序集成
- 连接管理
- 错误处理
- 事务实现
- 变更流
input_validation: required_context: - 编程语言 - 操作类型 optional_context: - 框架 - 异步需求 - 连接池大小
output_format: code_implementation: 字符串 error_handling: 字符串 connection_config: 对象 best_practices: 数组
error_handling: common_errors: - code: APP001 condition: “连接池耗尽” recovery: “增加maxPoolSize,检查连接泄漏” - code: APP002 condition: “网络超时” recovery: “增加socketTimeoutMS,检查网络稳定性” - code: APP003 condition: “驱动程序版本不兼容” recovery: “更新驱动程序到最新稳定版本”
prerequisites: mongodb_version: “4.0+” required_knowledge: - 所选语言基础 - mongodb-crud driver_requirements: - “目标语言的官方MongoDB驱动程序”
testing: unit_test_template: | // 集成测试 beforeAll(async () => { await client.connect() }) afterAll(async () => { await client.close() }) it(‘应该执行CRUD操作’, async () => { const result = await collection.insertOne({ test: true }) expect(result.insertedId).toBeDefined() })
MongoDB应用开发
掌握MongoDB在生产应用程序中的集成。
快速开始
Node.js驱动程序设置
const { MongoClient } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017', {
useNewUrlParser: true,
useUnifiedTopology: true,
maxPoolSize: 10
});
async function main() {
try {
await client.connect();
const db = client.db('myapp');
const collection = db.collection('users');
// 使用集合
} finally {
await client.close();
}
}
main().catch(console.error);
连接池
// 配置连接池
const client = new MongoClient(
'mongodb://localhost:27017',
{
maxPoolSize: 10, // 最大连接数
minPoolSize: 5, // 最小连接数
maxIdleTimeMS: 45000, // 空闲超时
serverSelectionTimeoutMS: 5000
}
);
事务
const session = client.startSession();
try {
await session.withTransaction(async () => {
// 多个操作原子性一起执行
await users.insertOne({ name: 'John' }, { session });
await accounts.insertOne({ userId: '...', balance: 100 }, { session });
// 如果任何失败,全部回滚
});
} catch (error) {
console.error('事务失败:', error);
} finally {
await session.endSession();
}
错误处理
try {
await collection.insertOne({ email: 'test@example.com' });
} catch (error) {
if (error.code === 11000) {
console.log('重复键错误');
} else if (error.name === 'MongoNetworkError') {
console.log('网络错误 - 重试');
} else if (error.name === 'MongoWriteConcernError') {
console.log('写入关注失败');
} else {
console.log('其他错误:', error);
}
}
批量操作
const bulk = collection.initializeUnorderedBulkOp();
bulk.insert({ name: 'John', age: 30 });
bulk.insert({ name: 'Jane', age: 28 });
bulk.find({ age: { $lt: 25 } }).update({ $set: { status: 'young' } });
bulk.find({ name: 'old_user' }).remove();
await bulk.execute();
变更流
// 监控集合变化
const changeStream = collection.watch();
changeStream.on('change', (change) => {
console.log('变化:', change.operationType);
console.log('文档:', change.fullDocument);
});
// 使用管道过滤
const pipeline = [
{ $match: { 'operationType': 'insert' } },
{ $match: { 'fullDocument.status': 'active' } }
];
const filteredStream = collection.watch(pipeline);
Python集成
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
import asyncio
# 同步
client = MongoClient('mongodb://localhost:27017')
db = client['myapp']
users = db['users']
# CRUD
user = users.insert_one({'name': 'John', 'email': 'john@example.com'})
found = users.find_one({'_id': user.inserted_id})
users.update_one({'_id': user.inserted_id}, {'$set': {'age': 30}})
users.delete_one({'_id': user.inserted_id})
# 事务
def transfer_funds(from_user_id, to_user_id, amount):
with client.start_session() as session:
with session.start_transaction():
users.update_one(
{'_id': from_user_id},
{'$inc': {'balance': -amount}},
session=session
)
users.update_one(
{'_id': to_user_id},
{'$inc': {'balance': amount}},
session=session
)
# 错误处理
try:
users.insert_one({'email': 'test@example.com'})
except DuplicateKeyError:
print('邮箱已存在')
Java集成
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoCollection;
import org.bson.Document;
public class MongoDB {
public static void main(String[] args) {
// 连接
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoDatabase database = mongoClient.getDatabase("myapp");
MongoCollection<Document> users = database.getCollection("users");
// 插入
Document user = new Document("name", "John")
.append("email", "john@example.com")
.append("age", 30);
users.insertOne(user);
// 查找
Document found = users.find(new Document("email", "john@example.com")).first();
// 更新
users.updateOne(
new Document("email", "john@example.com"),
new Document("$set", new Document("age", 31))
);
// 删除
users.deleteOne(new Document("email", "john@example.com"));
// 关闭
mongoClient.close();
}
}
性能模式
// 分页
async function paginate(collection, page = 1, limit = 10) {
const skip = (page - 1) * limit;
return collection
.find({})
.skip(skip)
.limit(limit)
.toArray();
}
// 批处理
async function processBatch(collection, batchSize = 1000) {
const cursor = collection.find({});
let batch = [];
for (const doc of await cursor.toArray()) {
batch.push(doc);
if (batch.length === batchSize) {
await processBatch(batch);
batch = [];
}
}
if (batch.length > 0) {
await processBatch(batch);
}
}
// 懒加载
class LazyLoader {
constructor(collection, query) {
this.cursor = collection.find(query).batchSize(100);
}
async next() {
return this.cursor.hasNext() ? this.cursor.next() : null;
}
}
最佳实践
✅ 始终正确关闭连接 ✅ 使用连接池 ✅ 适当处理错误 ✅ 为多文档操作使用事务 ✅ 验证输入数据 ✅ 使用适当的写入关注 ✅ 监控连接状态 ✅ 实现重试逻辑 ✅ 为查询使用索引 ✅ 记录重要操作 ✅ 测试错误场景 ✅ 对Node.js使用async/await