GraphQL性能优化Skill graphql-performance

GraphQL性能优化技能专注于通过查询复杂性分析、深度限制、批处理与缓存策略、DataLoader优化、监控跟踪和数据库查询优化等技术,提高GraphQL API的响应速度和可扩展性。适用于后端开发、API设计和系统架构,帮助防止N+1查询问题、优化响应时间、保护服务器免受恶意查询。关键词:GraphQL、性能优化、API开发、查询分析、缓存策略、数据库优化、后端开发、架构设计。

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

名称:graphql-performance 用户可调用:false 描述:用于通过查询复杂性分析、批处理、缓存策略、深度限制、监控和数据库优化来优化GraphQL API性能。 允许工具:[]

GraphQL 性能

应用 GraphQL 性能优化技术来创建高效、可扩展的 API。此技能涵盖查询复杂性分析、深度限制、批处理和缓存策略、DataLoader 优化、监控、跟踪和数据库查询优化。

查询复杂性分析

查询复杂性分析通过计算和限制计算成本来防止昂贵查询压倒服务器。

import { GraphQLError } from 'graphql';
import { ApolloServer } from '@apollo/server';

// 复杂性计算器
const getComplexity = (field, childComplexity, args) => {
  // 字段的基础复杂性
  let complexity = 1;

  // 基于限制参数的列表乘数
  if (args.limit) {
    complexity = args.limit;
  } else if (args.first) {
    complexity = args.first;
  }

  // 添加子复杂性
  return complexity + childComplexity;
};

// 基于指令的复杂性
const schema = `
  directive @complexity(
    value: Int!
    multipliers: [String!]
  ) on FIELD_DEFINITION

  type Query {
    user(id: ID!): User @complexity(value: 1)
    users(limit: Int): [User!]! @complexity(
      value: 1,
      multipliers: ["limit"]
    )
    posts(first: Int): [Post!]! @complexity(
      value: 5,
      multipliers: ["first"]
    )
  }

  type User {
    id: ID!
    posts: [Post!]! @complexity(value: 10)
  }
`;

// 复杂性验证插件
const complexityPlugin = {
  requestDidStart: () => ({
    async didResolveOperation({ request, document, operationName }) {
      const complexity = calculateComplexity({
        document,
        operationName,
        variables: request.variables
      });

      const maxComplexity = 1000;

      if (complexity > maxComplexity) {
        throw new GraphQLError(
          `查询过于复杂: ${complexity}. ` +
          `最大允许: ${maxComplexity}`,
          {
            extensions: {
              code: 'QUERY_TOO_COMPLEX',
              complexity,
              maxComplexity
            }
          }
        );
      }
    }
  })
};

// 手动复杂性计算
const calculateComplexity = ({ document, operationName, variables }) => {
  let totalComplexity = 0;

  const visit = (node, multiplier = 1) => {
    if (node.kind === 'Field') {
      // 从指令或默认获取字段复杂性
      const complexity = getFieldComplexity(node);

      // 处理来自参数的乘数
      const args = getArguments(node, variables);
      const fieldMultiplier = getMultiplier(args);

      totalComplexity += complexity * multiplier * fieldMultiplier;

      // 访问子字段
      if (node.selectionSet) {
        node.selectionSet.selections.forEach(child =>
          visit(child, multiplier * fieldMultiplier)
        );
      }
    }
  };

  visit(document);
  return totalComplexity;
};

深度限制

防止深度嵌套查询,这些查询可能导致性能问题和潜在的拒绝服务攻击。

import { ValidationContext, GraphQLError } from 'graphql';

const depthLimit = (maxDepth: number) => {
  return (validationContext: ValidationContext) => {
    return {
      Field(node, key, parent, path, ancestors) {
        const depth = ancestors.filter(
          ancestor => ancestor.kind === 'Field'
        ).length;

        if (depth > maxDepth) {
          validationContext.reportError(
            new GraphQLError(
              `查询超过最大深度 ${maxDepth}. ` +
              `发现深度为 ${depth}.`,
              {
                nodes: [node],
                extensions: {
                  code: 'DEPTH_LIMIT_EXCEEDED',
                  depth,
                  maxDepth
                }
              }
            )
          );
        }
      }
    };
  };
};

// 与 Apollo Server 一起使用
const server = new ApolloServer({
  typeDefs,
  resolvers,
  validationRules: [depthLimit(7)]
});

// 示例查询
// ✅ 允许 (深度: 4)
query {
  user {
    posts {
      comments {
        author {
          username
        }
      }
    }
  }
}

// ❌ 拒绝 (深度: 8)
query {
  user {
    friends {
      friends {
        friends {
          friends {
            friends {
              friends {
                friends {
                  username
                }
              }
            }
          }
        }
      }
    }
  }
}

查询成本分析

实施基于成本的速率限制,以保护免受昂贵查询的侵害。

interface CostConfig {
  objectCost: number;
  scalarCost: number;
  defaultListSize: number;
}

const calculateQueryCost = (
  document,
  variables,
  config: CostConfig
) => {
  let totalCost = 0;

  const visit = (node, multiplier = 1) => {
    if (node.kind === 'Field') {
      const fieldType = getFieldType(node);

      // 列表成本
      if (isListType(fieldType)) {
        const listSize = getListSize(node, variables) ||
          config.defaultListSize;
        multiplier *= listSize;
      }

      // 字段成本
      if (isObjectType(fieldType)) {
        totalCost += config.objectCost * multiplier;
      } else {
        totalCost += config.scalarCost * multiplier;
      }

      // 访问子节点
      if (node.selectionSet) {
        node.selectionSet.selections.forEach(child =>
          visit(child, multiplier)
        );
      }
    }
  };

  visit(document);
  return totalCost;
};

// 基于成本的速率限制
const costLimitPlugin = {
  requestDidStart: () => ({
    async didResolveOperation({ request, document, contextValue }) {
      const cost = calculateQueryCost(
        document,
        request.variables,
        { objectCost: 1, scalarCost: 0.1, defaultListSize: 10 }
      );

      // 检查用户的速率限制
      const limit = await getRateLimit(contextValue.user);
      const used = await getCostUsed(contextValue.user);

      if (used + cost > limit) {
        throw new GraphQLError('速率限制超出', {
          extensions: {
            code: 'RATE_LIMIT_EXCEEDED',
            cost,
            used,
            limit
          }
        });
      }

      // 跟踪成本使用
      await incrementCostUsed(contextValue.user, cost);
    }
  })
};

使用 DataLoader 进行批处理

通过将多个请求批处理为单个数据库查询来优化数据获取。

import DataLoader from 'dataloader';

// 基本 DataLoader 设置
const createUserLoader = (db) => {
  return new DataLoader<string, User>(
    async (userIds) => {
      // 所有用户的单个查询
      const users = await db.users.findByIds(userIds);

      // 映射以保持顺序
      const userMap = new Map(users.map(u => [u.id, u]));
      return userIds.map(id => userMap.get(id) || null);
    },
    {
      // 请求期间的缓存
      cache: true,
      // 最多批处理 100 个
      maxBatchSize: 100,
      // 在批处理前等待 10ms
      batchScheduleFn: callback => setTimeout(callback, 10)
    }
  );
};

// 带有连接的高级批处理
const createPostsLoader = (db) => {
  return new DataLoader<string, Post[]>(
    async (authorIds) => {
      // 包含所有作者 ID 的单个查询
      const posts = await db.posts.query()
        .whereIn('authorId', authorIds)
        .select();

      // 按作者 ID 分组
      const postsByAuthor = authorIds.map(authorId =>
        posts.filter(post => post.authorId === authorId)
      );

      return postsByAuthor;
    }
  );
};

// 多键加载器
interface PostKey {
  authorId: string;
  status: string;
}

const createFilteredPostsLoader = (db) => {
  return new DataLoader<PostKey, Post[]>(
    async (keys) => {
      // 提取唯一的作者 ID 和状态
      const authorIds = [...new Set(keys.map(k => k.authorId))];
      const statuses = [...new Set(keys.map(k => k.status))];

      // 所有组合的单个查询
      const posts = await db.posts.query()
        .whereIn('authorId', authorIds)
        .whereIn('status', statuses)
        .select();

      // 映射回原始键
      return keys.map(key =>
        posts.filter(post =>
          post.authorId === key.authorId &&
          post.status === key.status
        )
      );
    },
    {
      cacheKeyFn: (key) => `${key.authorId}:${key.status}`
    }
  );
};

// 带有自定义缓存的加载器
import { LRUCache } from 'lru-cache';

const createCachedLoader = (db) => {
  const cache = new LRUCache<string, User>({
    max: 500,
    ttl: 1000 * 60 * 5 // 5 分钟
  });

  return new DataLoader<string, User>(
    async (userIds) => {
      const users = await db.users.findByIds(userIds);
      const userMap = new Map(users.map(u => [u.id, u]));
      return userIds.map(id => userMap.get(id) || null);
    },
    {
      cacheMap: cache
    }
  );
};

响应缓存策略

实施多级缓存以实现最佳性能。

import { createHash } from 'crypto';

// 字段级缓存
const cacheControl = {
  User: {
    __cacheControl: { maxAge: 3600 }, // 1 小时

    posts: {
      __cacheControl: { maxAge: 300 } // 5 分钟
    }
  },

  Post: {
    __cacheControl: { maxAge: 600, scope: 'PUBLIC' }
  }
};

// Redis 缓存
import Redis from 'ioredis';

const redis = new Redis();

const cacheQuery = async (key: string, ttl: number, fn: () => any) => {
  // 尝试缓存
  const cached = await redis.get(key);
  if (cached) {
    return JSON.parse(cached);
  }

  // 执行并缓存
  const result = await fn();
  await redis.setex(key, ttl, JSON.stringify(result));

  return result;
};

const resolvers = {
  Query: {
    posts: async (_, args) => {
      const cacheKey = `posts:${JSON.stringify(args)}`;

      return cacheQuery(cacheKey, 300, async () => {
        return db.posts.find(args);
      });
    },

    // 带有哈希的自动缓存
    user: async (_, { id }) => {
      const cacheKey = `user:${id}`;

      return cacheQuery(cacheKey, 3600, async () => {
        return db.users.findById(id);
      });
    }
  }
};

// 带有 APQ 的 CDN 缓存
// 自动持久化查询减少带宽并启用 CDN 缓存
const server = new ApolloServer({
  typeDefs,
  resolvers,
  plugins: [
    {
      async requestDidStart() {
        return {
          async responseForOperation({ request, operation }) {
            // 仅当存在 APQ 哈希时缓存
            if (!request.extensions?.persistedQuery?.sha256Hash) {
              return null;
            }

            // 在 CDN 缓存 GET 请求
            return {
              http: {
                headers: new Map([
                  ['cache-control', 'public, max-age=300']
                ])
              }
            };
          }
        };
      }
    }
  ]
});

持久化查询和 APQ

实施自动持久化查询以减少负载大小并启用更好的缓存。

import { ApolloServer } from '@apollo/server';
import { KeyvAdapter } from '@apollo/utils.keyvadapter';
import Keyv from 'keyv';

// 带有 Redis 后端的 APQ
const server = new ApolloServer({
  typeDefs,
  resolvers,
  persistedQueries: {
    cache: new KeyvAdapter(new Keyv('redis://localhost:6379'))
  }
});

// 客户端发送哈希而不是完整查询
// 首次请求:
// POST /graphql
// {
//   "query": "query GetUser { user(id: \"1\") { id name } }",
//   "extensions": {
//     "persistedQuery": {
//       "version": 1,
//       "sha256Hash": "abc123..."
//     }
//   }
// }

// 后续请求 (减少 99%):
// GET /graphql?extensions={"persistedQuery":{"version":1,"sha256Hash":"abc123..."}}

// 查询白名单
const allowedQueries = new Map([
  ['getUser', 'query GetUser($id: ID!) { user(id: $id) { id name } }'],
  ['getPosts', 'query GetPosts { posts { id title } }']
]);

const whitelistPlugin = {
  requestDidStart: () => ({
    async didResolveSource({ source }) {
      const hash = source.extensions?.persistedQuery?.sha256Hash;

      if (!hash || !allowedQueries.has(hash)) {
        throw new GraphQLError('查询不在白名单中', {
          extensions: { code: 'FORBIDDEN' }
        });
      }
    }
  })
};

数据库查询优化

优化数据库查询以高效支持 GraphQL。

// 使用 info 参数进行选择性字段加载
import { GraphQLResolveInfo } from 'graphql';
import { parseResolveInfo } from 'graphql-parse-resolve-info';

const resolvers = {
  Query: {
    users: async (_, args, { db }, info: GraphQLResolveInfo) => {
      // 解析请求的字段
      const parsedInfo = parseResolveInfo(info);
      const fields = Object.keys(parsedInfo.fields);

      // 仅选择请求的字段
      return db.users.query().select(fields);
    },

    // 基于请求字段的条件连接
    posts: async (_, args, { db }, info: GraphQLResolveInfo) => {
      const parsedInfo = parseResolveInfo(info);

      let query = db.posts.query();

      // 仅当请求作者时连接
      if (parsedInfo.fields.author) {
        query = query.withGraphFetched('author');
      }

      // 仅当请求评论时连接
      if (parsedInfo.fields.comments) {
        query = query.withGraphFetched('comments');
      }

      return query;
    }
  }
};

// 优化的关系加载
const optimizedResolvers = {
  Query: {
    users: async (_, { limit, offset }, { db }) => {
      // 使用连接而不是 N 个查询
      return db.users.query()
        .limit(limit)
        .offset(offset)
        .withGraphFetched('[posts, profile]');
    }
  },

  User: {
    posts: async (parent, _, { db }) => {
      // 如果已经通过连接获取,返回它
      if (parent.posts) {
        return parent.posts;
      }

      // 否则,单独获取
      return db.posts.query().where('authorId', parent.id);
    }
  }
};

// GraphQL 查询的数据库索引
// CREATE INDEX idx_posts_author_id ON posts(author_id);
// CREATE INDEX idx_posts_status ON posts(status);
// CREATE INDEX idx_posts_created_at ON posts(created_at DESC);
// CREATE INDEX idx_posts_author_status ON posts(author_id, status);

监控和分析

实施全面监控以识别性能瓶颈。

import { ApolloServer } from '@apollo/server';

// 定时插件
const timingPlugin = {
  requestDidStart() {
    const start = Date.now();

    return {
      async willSendResponse({ response }) {
        const duration = Date.now() - start;

        // 添加定时到响应
        response.extensions = {
          ...response.extensions,
          timing: { duration }
        };
      }
    };
  }
};

// 详细的解析器定时
const detailedTimingPlugin = {
  requestDidStart() {
    const resolverTimings = {};

    return {
      async executionDidStart() {
        return {
          willResolveField({ info }) {
            const start = Date.now();

            return () => {
              const duration = Date.now() - start;
              const path = info.path.key;

              resolverTimings[path] = duration;
            };
          }
        };
      },

      async willSendResponse({ response }) {
        response.extensions = {
          ...response.extensions,
          resolverTimings
        };
      }
    };
  }
};

// 性能跟踪
const performancePlugin = {
  requestDidStart() {
    return {
      async didResolveOperation({ request, operation }) {
        // 跟踪操作指标
        trackMetric('graphql.operation', 1, {
          operation: operation.operation,
          name: operation.name?.value || 'anonymous'
        });
      },

      async didEncounterErrors({ errors }) {
        errors.forEach(error => {
          trackMetric('graphql.error', 1, {
            code: error.extensions?.code || 'UNKNOWN'
          });
        });
      },

      async willSendResponse({ response }) {
        const responseSize = JSON.stringify(response).length;

        trackMetric('graphql.response_size', responseSize);
      }
    };
  }
};

跟踪和可观测性

实施 GraphQL 操作的分布式跟踪。

import { trace, SpanStatusCode } from '@opentelemetry/api';

// OpenTelemetry 跟踪插件
const tracingPlugin = {
  requestDidStart() {
    const tracer = trace.getTracer('graphql-server');

    return {
      async didResolveOperation({ request, operation }) {
        const span = tracer.startSpan('graphql.operation', {
          attributes: {
            'graphql.operation.type': operation.operation,
            'graphql.operation.name': operation.name?.value
          }
        });

        return {
          async executionDidStart() {
            return {
              willResolveField({ info }) {
                const fieldSpan = tracer.startSpan(
                  `graphql.resolve.${info.fieldName}`,
                  { attributes: { 'graphql.field': info.fieldName } }
                );

                return () => {
                  fieldSpan.end();
                };
              }
            };
          },

          async willSendResponse({ errors }) {
            if (errors) {
              span.setStatus({
                code: SpanStatusCode.ERROR,
                message: errors[0].message
              });
            }

            span.end();
          }
        };
      }
    };
  }
};

// Apollo Studio 跟踪
const server = new ApolloServer({
  typeDefs,
  resolvers,
  plugins: [
    require('apollo-server-plugin-response-cache')(),
    {
      requestDidStart() {
        return {
          async willSendResponse({ response, metrics }) {
            // 发送到 Apollo Studio
            if (process.env.APOLLO_KEY) {
              sendToApolloStudio({
                operation: metrics.operationName,
                duration: metrics.duration,
                errors: response.errors
              });
            }
          }
        };
      }
    }
  ]
});

分页优化

为大型数据集实施高效的分页策略。

// 带有数据库优化的基于游标的分页
const resolvers = {
  Query: {
    posts: async (_, { first, after }, { db }) => {
      const limit = first || 10;

      let query = db.posts.query()
        .orderBy('createdAt', 'desc')
        .limit(limit + 1); // 多取一个以确定是否有下一页

      if (after) {
        const cursor = decodeCursor(after);
        query = query.where('createdAt', '<', cursor.createdAt);
      }

      const posts = await query;
      const hasNextPage = posts.length > limit;

      const edges = posts.slice(0, limit).map(post => ({
        cursor: encodeCursor({ createdAt: post.createdAt }),
        node: post
      }));

      return {
        edges,
        pageInfo: {
          hasNextPage,
          endCursor: edges[edges.length - 1]?.cursor
        }
      };
    }
  }
};

// 用于更好性能的键集分页
const keysetPagination = async (table, { after, limit }) => {
  let query = db(table)
    .orderBy([
      { column: 'createdAt', order: 'desc' },
      { column: 'id', order: 'desc' }
    ])
    .limit(limit + 1);

  if (after) {
    const cursor = JSON.parse(Buffer.from(after, 'base64').toString());
    query = query.where(function() {
      this.where('createdAt', '<', cursor.createdAt)
        .orWhere(function() {
          this.where('createdAt', '=', cursor.createdAt)
            .andWhere('id', '<', cursor.id);
        });
    });
  }

  return query;
};

最佳实践

  1. 实施查询复杂性限制:通过复杂性分析防止昂贵查询压倒服务器
  2. 使用深度限制:设置最大查询深度以防止导致性能问题的深度嵌套查询
  3. 使用 DataLoader 进行批处理:始终使用 DataLoader 处理相关数据,避免 N+1 查询问题
  4. 策略性缓存:基于数据波动性实施多级缓存(DataLoader、Redis、CDN)
  5. 监控性能:跟踪解析器定时、查询复杂性和错误率以识别瓶颈
  6. 优化数据库查询:基于请求字段使用选择性字段加载和条件连接
  7. 实施 APQ:使用自动持久化查询以减少负载大小并启用 CDN 缓存
  8. 使用游标分页:对于大型数据集,优先使用基于游标的分页而不是偏移
  9. 添加适当索引:为常见查询模式和筛选字段创建数据库索引
  10. 启用跟踪:使用 OpenTelemetry 或 Apollo Studio 进行分布式跟踪和调试

常见陷阱

  1. 无查询限制:允许无限制查询,可能导致拒绝服务
  2. 效率低下的解析器:编写不使用批处理或缓存的解析器,导致 N+1 问题
  3. 缺少索引:未为 GraphQL 查询模式创建数据库索引
  4. 过度缓存:缓存数据过于激进,导致提供过时数据
  5. 忽略 info 参数:不使用 GraphQLResolveInfo 来优化字段选择
  6. 无监控:部署时无性能监控,无法识别问题
  7. 阻塞操作:在解析器中使用同步操作,阻塞事件循环
  8. 效率低下的分页:对大型数据集使用基于偏移的分页
  9. 无速率限制:允许用户无基于成本的限制查询
  10. 缓存雪崩:未正确处理缓存过期,导致所有请求同时命中数据库

何时使用此技能

使用 GraphQL 性能优化技能当:

  • 构建需要扩展的新 GraphQL API
  • 经历慢查询响应时间
  • 在生产环境中调试 N+1 查询问题
  • 实施速率限制和查询成本分析
  • 添加缓存层以提高性能
  • 为 GraphQL 模式优化数据库查询
  • 设置监控和可观测性
  • 保护免受恶意或昂贵查询
  • 迁移到生产环境并需要性能调优
  • 识别和修复性能瓶颈

资源