API分页
概述
实现可扩展的分页策略,用于高效查询、导航和性能优化处理大数据集。
何时使用
- 返回大量资源集合
- 实现搜索结果分页
- 构建无限滚动界面
- 优化大数据集查询
- 管理客户端应用程序内存
- 提高API响应时间
指令
1. Offset/Limit分页
// Node.js offset/limit实现
app.get('/api/users', async (req, res) => {
const page = parseInt(req.query.page) || 1;
const limit = Math.min(parseInt(req.query.limit) || 20, 100); // 最大100
const offset = (page - 1) * limit;
try {
const [users, total] = await Promise.all([
User.find()
.skip(offset)
.limit(limit)
.select('id email firstName lastName createdAt'),
User.countDocuments()
]);
const totalPages = Math.ceil(total / limit);
res.json({
data: users,
pagination: {
page,
limit,
total,
totalPages,
hasNext: page < totalPages,
hasPrev: page > 1
},
links: {
self: `/api/users?page=${page}&limit=${limit}`,
first: `/api/users?page=1&limit=${limit}`,
last: `/api/users?page=${totalPages}&limit=${limit}`,
...(page > 1 && { prev: `/api/users?page=${page - 1}&limit=${limit}` }),
...(page < totalPages && { next: `/api/users?page=${page + 1}&limit=${limit}` })
}
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Python offset/limit
from flask import request
from sqlalchemy import func
@app.route('/api/users', methods=['GET'])
def list_users():
page = request.args.get('page', 1, type=int)
limit = min(request.args.get('limit', 20, type=int), 100)
offset = (page - 1) * limit
total = db.session.query(func.count(User.id)).scalar()
users = db.session.query(User).offset(offset).limit(limit).all()
total_pages = (total + limit - 1) // limit
return jsonify({
'data': [u.to_dict() for u in users],
'pagination': {
'page': page,
'limit': limit,
'total': total,
'totalPages': total_pages,
'hasNext': page < total_pages,
'hasPrev': page > 1
}
}), 200
2. 基于游标的分页
// 基于游标的分页以获得更好的性能
class CursorPagination {
static encode(value) {
return Buffer.from(String(value)).toString('base64');
}
static decode(cursor) {
return Buffer.from(cursor, 'base64').toString('utf-8');
}
static generateCursor(resource) {
return this.encode(`${resource.id}:${resource.createdAt.getTime()}`);
}
static parseCursor(cursor) {
if (!cursor) return null;
const decoded = this.decode(cursor);
const [id, timestamp] = decoded.split(':');
return { id, timestamp: parseInt(timestamp) };
}
}
app.get('/api/users/cursor', async (req, res) => {
const limit = Math.min(parseInt(req.query.limit) || 20, 100);
const after = req.query.after ? CursorPagination.parseCursor(req.query.after) : null;
try {
const query = {};
if (after) {
query.createdAt = { $lt: new Date(after.timestamp) };
}
const users = await User.find(query)
.sort({ createdAt: -1, _id: -1 })
.limit(limit + 1)
.select('id email firstName lastName createdAt');
const hasMore = users.length > limit;
const data = hasMore ? users.slice(0, limit) : users;
const nextCursor = hasMore ? CursorPagination.generateCursor(data[data.length - 1]) : null;
res.json({
data,
pageInfo: {
hasNextPage: hasMore,
endCursor: nextCursor,
totalCount: await User.countDocuments()
},
links: {
self: `/api/users/cursor?limit=${limit}`,
next: nextCursor ? `/api/users/cursor?limit=${limit}&after=${nextCursor}` : null
}
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
3. Keyset分页
// Keyset分页(对于大型数据集最高效)
app.get('/api/products/keyset', async (req, res) => {
const limit = Math.min(parseInt(req.query.limit) || 20, 100);
const lastId = req.query.lastId;
const sortBy = req.query.sort || 'price'; // price或createdAt
try {
const query = {};
// 根据排序字段构建查询
if (lastId) {
const lastProduct = await Product.findById(lastId);
if (sortBy === 'price') {
query.$or = [
{ price: { $lt: lastProduct.price } },
{ price: lastProduct.price, _id: { $lt: lastId } }
];
} else {
query.$or = [
{ createdAt: { $lt: lastProduct.createdAt } },
{ createdAt: lastProduct.createdAt, _id: { $lt: lastId } }
];
}
}
const products = await Product.find(query)
.sort({ [sortBy]: -1, _id: -1 })
.limit(limit + 1);
const hasMore = products.length > limit;
const data = hasMore ? products.slice(0, limit) : products;
res.json({
data,
pageInfo: {
hasMore,
lastId: data.length > 0 ? data[data.length - 1]._id : null
},
links: {
next: hasMore && data.length > 0
? `/api/products/keyset?lastId=${data[data.length - 1]._id}&sort=${sortBy}&limit=${limit}`
: null
}
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
4. 搜索分页
// 带分页的全文搜索
app.get('/api/search', async (req, res) => {
const query = req.query.q;
const page = parseInt(req.query.page) || 1;
const limit = Math.min(parseInt(req.query.limit) || 20, 100);
const offset = (page - 1) * limit;
if (!query) {
return res.status(400).json({ error: '搜索查询必需' });
}
try {
// MongoDB文本搜索示例
const [results, total] = await Promise.all([
Product.find(
{ $text: { $search: query } },
{ score: { $meta: 'textScore' } }
)
.sort({ score: { $meta: 'textScore' } })
.skip(offset)
.limit(limit),
Product.countDocuments({ $text: { $search: query } })
]);
const totalPages = Math.ceil(total / limit);
res.json({
query,
results,
pagination: {
page,
limit,
total,
totalPages
}
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Elasticsearch分页
async function searchElasticsearch(query, page = 1, limit = 20) {
const from = (page - 1) * limit;
const response = await esClient.search({
index: 'products',
body: {
from,
size: limit,
query: {
multi_match: {
query,
fields: ['name^2', 'description', 'category']
}
}
}
});
return {
results: response.hits.hits.map(hit => hit._source),
pagination: {
page,
limit,
total: response.hits.total.value,
totalPages: Math.ceil(response.hits.total.value / limit)
}
};
}
5. 分页响应格式
// Offset/Limit响应
{
"data": [...],
"pagination": {
"page": 2,
"limit": 20,
"total": 145,
"totalPages": 8,
"hasNext": true,
"hasPrev": true
},
"links": {
"self": "/api/users?page=2&limit=20",
"first": "/api/users?page=1&limit=20",
"prev": "/api/users?page=1&limit=20",
"next": "/api/users?page=3&limit=20",
"last": "/api/users?page=8&limit=20"
}
}
// 基于游标响应
{
"data": [...],
"pageInfo": {
"hasNextPage": true,
"endCursor": "Y3JlYXRlZEF0OjE2NzA4ODA2MzU3NQ==",
"totalCount": 1250
},
"links": {
"next": "/api/users?limit=20&after=Y3JlYXRlZEF0OjE2NzA4ODA2MzU3NQ=="
}
}
// Keyset响应
{
"data": [...],
"pageInfo": {
"hasMore": true,
"lastId": "507f1f77bcf86cd799439011"
},
"links": {
"next": "/api/products?lastId=507f1f77bcf86cd799439011&sort=price"
}
}
6. Python分页(SQLAlchemy)
from flask import request, jsonify
from flask_sqlalchemy import Pagination
@app.route('/api/users', methods=['GET'])
def list_users():
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
pagination: Pagination = User.query.paginate(
page=page,
per_page=per_page,
error_out=False
)
return jsonify({
'data': [user.to_dict() for user in pagination.items],
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total': pagination.total,
'pages': pagination.pages,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
}
}), 200
# 基于游标的分页与graphene
class UserNode(relay.Node):
class Meta:
model = User
@classmethod
def get_node(cls, info, id):
return User.query.get(id)
class Query(graphene.ObjectType):
users = relay.ConnectionField(UserNode)
def resolve_users(self, info, **kwargs):
return User.query.all()
最佳实践
✅ 应该做
- 对于大数据集使用基于游标的分页
- 设置合理的最大限制(例如,100)
- 在可行的情况下包含总数
- 提供导航链接
- 文档化分页策略
- 对排序使用的字段使用索引
- 在适当的时候缓存分页结果
- 处理边缘情况(空结果)
- 实施一致的分页格式
- 对于非常大的数据集使用keyset
❌ 不应该做
- 使用偏移量处理数十亿行数据
- 允许无限制的页面大小
- 对于每个请求计算行数
- 无排序的分页
- 分页过程中更改排序顺序
- 没有游标深度分页
- 跳过大数据集的分页
- 直接暴露数据库分页
- 混合分页策略
- 忽略性能影响
性能提示
- 对排序使用的字段建立索引
- 使用数据库原生分页
- 在应用层面实现缓存
- 监控查询性能
- 对大数据集使用基于游标的分页
- 尽可能避免COUNT查询
- 考虑为频繁访问的数据去规范化