MongoDB应用开发Skill mongodb-app-development

掌握MongoDB数据库在Node.js、Python和Java应用程序中的集成开发技术,包括连接管理、事务处理、错误处理和最佳实践。关键词:MongoDB, 应用开发, 数据库集成, Node.js, Python, Java, 后端开发。

后端开发 0 次安装 0 次浏览 更新于 3/14/2026

name: mongodb-app-development version: “2.1.0” description: 掌握在应用程序中使用Node.js、Python和Java驱动程序集成MongoDB。学习连接、事务、错误处理和最佳实践。在构建使用MongoDB的应用程序时使用。 sasmp_version: “1.3.0” bonded_agent: 07-mongodb-application-development bond_type: PRIMARY_BOND

生产级技能配置

capabilities:

  • 驱动程序集成
  • 连接管理
  • 错误处理
  • 事务实现
  • 变更流

input_validation: required_context: - 编程语言 - 操作类型 optional_context: - 框架 - 异步需求 - 连接池大小

output_format: code_implementation: 字符串 error_handling: 字符串 connection_config: 对象 best_practices: 数组

error_handling: common_errors: - code: APP001 condition: “连接池耗尽” recovery: “增加maxPoolSize,检查连接泄漏” - code: APP002 condition: “网络超时” recovery: “增加socketTimeoutMS,检查网络稳定性” - code: APP003 condition: “驱动程序版本不兼容” recovery: “更新驱动程序到最新稳定版本”

prerequisites: mongodb_version: “4.0+” required_knowledge: - 所选语言基础 - mongodb-crud driver_requirements: - “目标语言的官方MongoDB驱动程序”

testing: unit_test_template: | // 集成测试 beforeAll(async () => { await client.connect() }) afterAll(async () => { await client.close() }) it(‘应该执行CRUD操作’, async () => { const result = await collection.insertOne({ test: true }) expect(result.insertedId).toBeDefined() })

MongoDB应用开发

掌握MongoDB在生产应用程序中的集成。

快速开始

Node.js驱动程序设置

const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017', {
  useNewUrlParser: true,
  useUnifiedTopology: true,
  maxPoolSize: 10
});

async function main() {
  try {
    await client.connect();
    const db = client.db('myapp');
    const collection = db.collection('users');

    // 使用集合

  } finally {
    await client.close();
  }
}

main().catch(console.error);

连接池

// 配置连接池
const client = new MongoClient(
  'mongodb://localhost:27017',
  {
    maxPoolSize: 10,        // 最大连接数
    minPoolSize: 5,         // 最小连接数
    maxIdleTimeMS: 45000,   // 空闲超时
    serverSelectionTimeoutMS: 5000
  }
);

事务

const session = client.startSession();

try {
  await session.withTransaction(async () => {
    // 多个操作原子性一起执行
    await users.insertOne({ name: 'John' }, { session });
    await accounts.insertOne({ userId: '...', balance: 100 }, { session });

    // 如果任何失败,全部回滚
  });
} catch (error) {
  console.error('事务失败:', error);
} finally {
  await session.endSession();
}

错误处理

try {
  await collection.insertOne({ email: 'test@example.com' });
} catch (error) {
  if (error.code === 11000) {
    console.log('重复键错误');
  } else if (error.name === 'MongoNetworkError') {
    console.log('网络错误 - 重试');
  } else if (error.name === 'MongoWriteConcernError') {
    console.log('写入关注失败');
  } else {
    console.log('其他错误:', error);
  }
}

批量操作

const bulk = collection.initializeUnorderedBulkOp();

bulk.insert({ name: 'John', age: 30 });
bulk.insert({ name: 'Jane', age: 28 });
bulk.find({ age: { $lt: 25 } }).update({ $set: { status: 'young' } });
bulk.find({ name: 'old_user' }).remove();

await bulk.execute();

变更流

// 监控集合变化
const changeStream = collection.watch();

changeStream.on('change', (change) => {
  console.log('变化:', change.operationType);
  console.log('文档:', change.fullDocument);
});

// 使用管道过滤
const pipeline = [
  { $match: { 'operationType': 'insert' } },
  { $match: { 'fullDocument.status': 'active' } }
];

const filteredStream = collection.watch(pipeline);

Python集成

from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
import asyncio

# 同步
client = MongoClient('mongodb://localhost:27017')
db = client['myapp']
users = db['users']

# CRUD
user = users.insert_one({'name': 'John', 'email': 'john@example.com'})
found = users.find_one({'_id': user.inserted_id})
users.update_one({'_id': user.inserted_id}, {'$set': {'age': 30}})
users.delete_one({'_id': user.inserted_id})

# 事务
def transfer_funds(from_user_id, to_user_id, amount):
    with client.start_session() as session:
        with session.start_transaction():
            users.update_one(
                {'_id': from_user_id},
                {'$inc': {'balance': -amount}},
                session=session
            )
            users.update_one(
                {'_id': to_user_id},
                {'$inc': {'balance': amount}},
                session=session
            )

# 错误处理
try:
    users.insert_one({'email': 'test@example.com'})
except DuplicateKeyError:
    print('邮箱已存在')

Java集成

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoCollection;
import org.bson.Document;

public class MongoDB {
    public static void main(String[] args) {
        // 连接
        MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
        MongoDatabase database = mongoClient.getDatabase("myapp");
        MongoCollection<Document> users = database.getCollection("users");

        // 插入
        Document user = new Document("name", "John")
            .append("email", "john@example.com")
            .append("age", 30);
        users.insertOne(user);

        // 查找
        Document found = users.find(new Document("email", "john@example.com")).first();

        // 更新
        users.updateOne(
            new Document("email", "john@example.com"),
            new Document("$set", new Document("age", 31))
        );

        // 删除
        users.deleteOne(new Document("email", "john@example.com"));

        // 关闭
        mongoClient.close();
    }
}

性能模式

// 分页
async function paginate(collection, page = 1, limit = 10) {
  const skip = (page - 1) * limit;
  return collection
    .find({})
    .skip(skip)
    .limit(limit)
    .toArray();
}

// 批处理
async function processBatch(collection, batchSize = 1000) {
  const cursor = collection.find({});

  let batch = [];
  for (const doc of await cursor.toArray()) {
    batch.push(doc);

    if (batch.length === batchSize) {
      await processBatch(batch);
      batch = [];
    }
  }

  if (batch.length > 0) {
    await processBatch(batch);
  }
}

// 懒加载
class LazyLoader {
  constructor(collection, query) {
    this.cursor = collection.find(query).batchSize(100);
  }

  async next() {
    return this.cursor.hasNext() ? this.cursor.next() : null;
  }
}

最佳实践

✅ 始终正确关闭连接 ✅ 使用连接池 ✅ 适当处理错误 ✅ 为多文档操作使用事务 ✅ 验证输入数据 ✅ 使用适当的写入关注 ✅ 监控连接状态 ✅ 实现重试逻辑 ✅ 为查询使用索引 ✅ 记录重要操作 ✅ 测试错误场景 ✅ 对Node.js使用async/await