ApolloServerGraphQLAPI开发模式Skill apollo-server-patterns

这个技能专注于使用Apollo Server构建高效、可扩展的GraphQL API,包括模式设计、解析器实现、数据源管理、错误处理、订阅、指令、批处理、缓存、联邦和性能监控。适用于后端开发、微服务架构和实时应用。关键词:GraphQL, Apollo Server, API开发, 后端, 微服务, 数据源, 解析器, 联邦。

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

名称: apollo-server-patterns 用户可调用: false 描述: 当使用Apollo Server构建需要解析器、数据源、模式设计和联邦的GraphQL API时使用。 允许工具:

  • 读取
  • 写入
  • 编辑
  • Grep
  • Glob
  • Bash

Apollo Server 模式

掌握Apollo Server以构建生产就绪的GraphQL API,具有正确的模式设计、高效的解析器和可扩展的架构。

概述

Apollo Server是一个符合规范的GraphQL服务器,可与任何GraphQL模式配合使用。它提供了模式拼接、联邦、数据源和生产GraphQL API的内置监控等功能。

安装与设置

安装Apollo Server

# 用于Express
npm install @apollo/server graphql express cors body-parser

# 用于独立服务器
npm install @apollo/server graphql

# 附加工具
npm install graphql-tag dataloader

基本服务器设置

// server.js
import { ApolloServer } from '@apollo/server';
import { startStandaloneServer } from '@apollo/server/standalone';
import { typeDefs } from './schema.js';
import { resolvers } from './resolvers.js';

const server = new ApolloServer({
  typeDefs,
  resolvers,
  formatError: (formattedError, error) => {
    // 自定义错误格式化
    if (formattedError.extensions?.code === 'INTERNAL_SERVER_ERROR') {
      return {
        ...formattedError,
        message: '发生内部错误'
      };
    }
    return formattedError;
  },
  plugins: [
    {
      async requestDidStart() {
        return {
          async willSendResponse({ response }) {
            console.log('响应已发送');
          }
        };
      }
    }
  ]
});

const { url } = await startStandaloneServer(server, {
  listen: { port: 4000 },
  context: async ({ req }) => {
    const token = req.headers.authorization || '';
    const user = await getUserFromToken(token);
    return { user };
  }
});

console.log(`服务器准备就绪于 ${url}`);

核心模式

1. 模式定义

// schema.js
import { gql } from 'graphql-tag';

export const typeDefs = gql`
  type User {
    id: ID!
    email: String!
    name: String!
    posts: [Post!]!
    createdAt: String!
  }

  type Post {
    id: ID!
    title: String!
    body: String!
    author: User!
    comments: [Comment!]!
    published: Boolean!
    createdAt: String!
    updatedAt: String!
  }

  type Comment {
    id: ID!
    body: String!
    author: User!
    post: Post!
    createdAt: String!
  }

  input CreatePostInput {
    title: String!
    body: String!
  }

  input UpdatePostInput {
    title: String
    body: String
    published: Boolean
  }

  type Query {
    me: User
    user(id: ID!): User
    users(limit: Int, offset: Int): [User!]!
    post(id: ID!): Post
    posts(published: Boolean, authorId: ID): [Post!]!
  }

  type Mutation {
    signup(email: String!, password: String!, name: String!): AuthPayload!
    login(email: String!, password: String!): AuthPayload!
    createPost(input: CreatePostInput!): Post!
    updatePost(id: ID!, input: UpdatePostInput!): Post!
    deletePost(id: ID!): Boolean!
    createComment(postId: ID!, body: String!): Comment!
  }

  type Subscription {
    postCreated: Post!
    commentAdded(postId: ID!): Comment!
  }

  type AuthPayload {
    token: String!
    user: User!
  }
`;

2. 解析器

// resolvers.js
export const resolvers = {
  Query: {
    me: (parent, args, context) => {
      if (!context.user) {
        throw new Error('未认证');
      }
      return context.user;
    },

    user: async (parent, { id }, { dataSources }) => {
      return dataSources.usersAPI.getUserById(id);
    },

    users: async (parent, { limit = 10, offset = 0 }, { dataSources }) => {
      return dataSources.usersAPI.getUsers({ limit, offset });
    },

    post: async (parent, { id }, { dataSources }) => {
      return dataSources.postsAPI.getPostById(id);
    },

    posts: async (parent, { published, authorId }, { dataSources }) => {
      return dataSources.postsAPI.getPosts({ published, authorId });
    }
  },

  Mutation: {
    signup: async (parent, { email, password, name }, { dataSources }) => {
      const user = await dataSources.usersAPI.createUser({
        email,
        password,
        name
      });
      const token = generateToken(user);
      return { token, user };
    },

    login: async (parent, { email, password }, { dataSources }) => {
      const user = await dataSources.usersAPI.authenticate(email, password);
      if (!user) {
        throw new Error('无效凭证');
      }
      const token = generateToken(user);
      return { token, user };
    },

    createPost: async (parent, { input }, { user, dataSources }) => {
      if (!user) {
        throw new Error('未认证');
      }
      return dataSources.postsAPI.createPost({
        ...input,
        authorId: user.id
      });
    },

    updatePost: async (parent, { id, input }, { user, dataSources }) => {
      const post = await dataSources.postsAPI.getPostById(id);
      if (post.authorId !== user.id) {
        throw new Error('未授权');
      }
      return dataSources.postsAPI.updatePost(id, input);
    },

    deletePost: async (parent, { id }, { user, dataSources }) => {
      const post = await dataSources.postsAPI.getPostById(id);
      if (post.authorId !== user.id) {
        throw new Error('未授权');
      }
      await dataSources.postsAPI.deletePost(id);
      return true;
    }
  },

  // 字段解析器
  User: {
    posts: async (parent, args, { dataSources }) => {
      return dataSources.postsAPI.getPostsByAuthorId(parent.id);
    }
  },

  Post: {
    author: async (parent, args, { dataSources }) => {
      return dataSources.usersAPI.getUserById(parent.authorId);
    },

    comments: async (parent, args, { dataSources }) => {
      return dataSources.commentsAPI.getCommentsByPostId(parent.id);
    }
  },

  Comment: {
    author: async (parent, args, { dataSources }) => {
      return dataSources.usersAPI.getUserById(parent.authorId);
    },

    post: async (parent, args, { dataSources }) => {
      return dataSources.postsAPI.getPostById(parent.postId);
    }
  }
};

3. 数据源

// dataSources/UsersAPI.js
import { RESTDataSource } from '@apollo/datasource-rest';

export class UsersAPI extends RESTDataSource {
  constructor() {
    super();
    this.baseURL = 'https://api.example.com/';
  }

  async getUserById(id) {
    return this.get(`users/${id}`);
  }

  async getUsers({ limit, offset }) {
    return this.get('users', {
      params: { limit, offset }
    });
  }

  async createUser({ email, password, name }) {
    return this.post('users', {
      body: { email, password, name }
    });
  }

  async authenticate(email, password) {
    try {
      const response = await this.post('auth/login', {
        body: { email, password }
      });
      return response.user;
    } catch (error) {
      return null;
    }
  }
}

// dataSources/PostsDB.js
import DataLoader from 'dataloader';

export class PostsDB {
  constructor(db) {
    this.db = db;
    this.loader = new DataLoader(this.batchGetPosts.bind(this));
  }

  async batchGetPosts(ids) {
    const posts = await this.db
      .select('*')
      .from('posts')
      .whereIn('id', ids);

    // 以相同顺序返回帖子
    return ids.map(id => posts.find(post => post.id === id));
  }

  async getPostById(id) {
    return this.loader.load(id);
  }

  async getPosts({ published, authorId }) {
    let query = this.db.select('*').from('posts');

    if (published !== undefined) {
      query = query.where('published', published);
    }

    if (authorId) {
      query = query.where('author_id', authorId);
    }

    return query;
  }

  async getPostsByAuthorId(authorId) {
    return this.db
      .select('*')
      .from('posts')
      .where('author_id', authorId);
  }

  async createPost({ title, body, authorId }) {
    const [post] = await this.db('posts')
      .insert({
        title,
        body,
        author_id: authorId,
        published: false,
        created_at: new Date(),
        updated_at: new Date()
      })
      .returning('*');

    return post;
  }

  async updatePost(id, updates) {
    const [post] = await this.db('posts')
      .where('id', id)
      .update({
        ...updates,
        updated_at: new Date()
      })
      .returning('*');

    return post;
  }

  async deletePost(id) {
    await this.db('posts').where('id', id).delete();
  }
}

4. 上下文与认证

// context.js
import jwt from 'jsonwebtoken';
import { UsersAPI } from './dataSources/UsersAPI.js';
import { PostsDB } from './dataSources/PostsDB.js';
import { CommentsDB } from './dataSources/CommentsDB.js';

export async function createContext({ req }) {
  // 从头部提取令牌
  const token = req.headers.authorization?.replace('Bearer ', '') || '';

  // 验证和解码令牌
  let user = null;
  if (token) {
    try {
      const decoded = jwt.verify(token, process.env.JWT_SECRET);
      user = await getUserById(decoded.userId);
    } catch (error) {
      console.error('无效令牌:', error);
    }
  }

  // 创建数据源
  const dataSources = {
    usersAPI: new UsersAPI(),
    postsDB: new PostsDB(db),
    commentsDB: new CommentsDB(db)
  };

  return {
    user,
    dataSources,
    db
  };
}

// 授权助手
export function requireAuth(user) {
  if (!user) {
    throw new Error('未认证');
  }
}

export function requireRole(user, role) {
  requireAuth(user);
  if (user.role !== role) {
    throw new Error('未授权');
  }
}

5. 错误处理

// errors.js
import { GraphQLError } from 'graphql';

export class AuthenticationError extends GraphQLError {
  constructor(message) {
    super(message, {
      extensions: {
        code: 'UNAUTHENTICATED',
        http: { status: 401 }
      }
    });
  }
}

export class ForbiddenError extends GraphQLError {
  constructor(message) {
    super(message, {
      extensions: {
        code: 'FORBIDDEN',
        http: { status: 403 }
      }
    });
  }
}

export class ValidationError extends GraphQLError {
  constructor(message, fields) {
    super(message, {
      extensions: {
        code: 'BAD_USER_INPUT',
        validationErrors: fields,
        http: { status: 400 }
      }
    });
  }
}

// 在解析器中使用
import { AuthenticationError, ForbiddenError } from './errors.js';

const resolvers = {
  Mutation: {
    deletePost: async (parent, { id }, { user, dataSources }) => {
      if (!user) {
        throw new AuthenticationError('必须登录');
      }

      const post = await dataSources.postsDB.getPostById(id);
      if (post.authorId !== user.id) {
        throw new ForbiddenError('只能删除自己的帖子');
      }

      await dataSources.postsDB.deletePost(id);
      return true;
    }
  }
};

6. 订阅

// server-with-subscriptions.js
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { createServer } from 'http';
import express from 'express';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();

const typeDefs = gql`
  type Subscription {
    postCreated: Post!
    commentAdded(postId: ID!): Comment!
  }
`;

const resolvers = {
  Mutation: {
    createPost: async (parent, { input }, { user, dataSources }) => {
      const post = await dataSources.postsDB.createPost({
        ...input,
        authorId: user.id
      });

      // 发布订阅事件
      pubsub.publish('POST_CREATED', { postCreated: post });

      return post;
    },

    createComment: async (parent, { postId, body }, { user, dataSources }) => {
      const comment = await dataSources.commentsDB.createComment({
        postId,
        body,
        authorId: user.id
      });

      pubsub.publish(`COMMENT_ADDED_${postId}`, { commentAdded: comment });

      return comment;
    }
  },

  Subscription: {
    postCreated: {
      subscribe: () => pubsub.asyncIterator(['POST_CREATED'])
    },

    commentAdded: {
      subscribe: (parent, { postId }) =>
        pubsub.asyncIterator([`COMMENT_ADDED_${postId}`])
    }
  }
};

// 创建模式
const schema = makeExecutableSchema({ typeDefs, resolvers });

// 创建HTTP服务器
const app = express();
const httpServer = createServer(app);

// 创建WebSocket服务器
const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql'
});

const serverCleanup = useServer({ schema }, wsServer);

// 创建Apollo Server
const server = new ApolloServer({
  schema,
  plugins: [
    ApolloServerPluginDrainHttpServer({ httpServer }),
    {
      async serverWillStart() {
        return {
          async drainServer() {
            await serverCleanup.dispose();
          }
        };
      }
    }
  ]
});

await server.start();

app.use(
  '/graphql',
  cors(),
  express.json(),
  expressMiddleware(server, {
    context: createContext
  })
);

httpServer.listen(4000, () => {
  console.log('服务器运行于 http://localhost:4000/graphql');
});

7. 模式指令

// directives.js
import { mapSchema, getDirective, MapperKind } from '@graphql-tools/utils';
import { defaultFieldResolver } from 'graphql';

// 在模式中定义指令
const typeDefs = gql`
  directive @auth(requires: Role = USER) on FIELD_DEFINITION | OBJECT

  enum Role {
    ADMIN
    USER
    GUEST
  }

  type Query {
    me: User @auth
    users: [User!]! @auth(requires: ADMIN)
  }
`;

// 实现指令
function authDirective(directiveName) {
  return {
    authDirectiveTypeDefs: `directive @${directiveName}(requires: Role = USER)
      on FIELD_DEFINITION | OBJECT`,
    authDirectiveTransformer: (schema) =>
      mapSchema(schema, {
        [MapperKind.OBJECT_FIELD]: (fieldConfig) => {
          const authDirective = getDirective(
            schema,
            fieldConfig,
            directiveName
          )?.[0];

          if (authDirective) {
            const { requires } = authDirective;
            const { resolve = defaultFieldResolver } = fieldConfig;

            fieldConfig.resolve = async function (source, args, context, info) {
              const { user } = context;

              if (!user) {
                throw new Error('未认证');
              }

              if (requires && user.role !== requires) {
                throw new Error(`需要${requires}角色`);
              }

              return resolve(source, args, context, info);
            };
          }

          return fieldConfig;
        }
      })
  };
}

// 应用到模式
const { authDirectiveTypeDefs, authDirectiveTransformer } = authDirective('auth');

let schema = makeExecutableSchema({
  typeDefs: [authDirectiveTypeDefs, typeDefs],
  resolvers
});

schema = authDirectiveTransformer(schema);

8. 使用DataLoader进行批处理和缓存

// loaders.js
import DataLoader from 'dataloader';

export function createLoaders(db) {
  // 批量加载用户
  const userLoader = new DataLoader(async (userIds) => {
    const users = await db
      .select('*')
      .from('users')
      .whereIn('id', userIds);

    return userIds.map(id => users.find(user => user.id === id));
  });

  // 批量加载帖子并缓存
  const postLoader = new DataLoader(
    async (postIds) => {
      const posts = await db
        .select('*')
        .from('posts')
        .whereIn('id', postIds);

      return postIds.map(id => posts.find(post => post.id === id));
    },
    {
      // 缓存5分钟
      cacheMap: new Map(),
      cacheKeyFn: (key) => key,
      batch: true,
      maxBatchSize: 100
    }
  );

  // 按帖子ID加载评论(一对多)
  const commentsByPostLoader = new DataLoader(async (postIds) => {
    const comments = await db
      .select('*')
      .from('comments')
      .whereIn('post_id', postIds);

    return postIds.map(postId =>
      comments.filter(comment => comment.post_id === postId)
    );
  });

  return {
    userLoader,
    postLoader,
    commentsByPostLoader
  };
}

// 在上下文中使用
export async function createContext({ req }) {
  const loaders = createLoaders(db);

  return {
    loaders,
    // ... 其他上下文
  };
}

// 在解析器中使用
const resolvers = {
  Post: {
    author: (parent, args, { loaders }) => {
      return loaders.userLoader.load(parent.authorId);
    },

    comments: (parent, args, { loaders }) => {
      return loaders.commentsByPostLoader.load(parent.id);
    }
  }
};

9. 联邦

// subgraph-users.js
import { ApolloServer } from '@apollo/server';
import { buildSubgraphSchema } from '@apollo/subgraph';
import gql from 'graphql-tag';

const typeDefs = gql`
  extend schema
    @link(url: "https://specs.apollo.dev/federation/v2.0",
          import: ["@key", "@shareable"])

  type User @key(fields: "id") {
    id: ID!
    email: String!
    name: String!
  }

  type Query {
    user(id: ID!): User
    users: [User!]!
  }
`;

const resolvers = {
  Query: {
    user: (parent, { id }, { dataSources }) => {
      return dataSources.usersDB.getUserById(id);
    },

    users: (parent, args, { dataSources }) => {
      return dataSources.usersDB.getUsers();
    }
  },

  User: {
    __resolveReference: (user, { dataSources }) => {
      return dataSources.usersDB.getUserById(user.id);
    }
  }
};

const server = new ApolloServer({
  schema: buildSubgraphSchema({ typeDefs, resolvers })
});

// subgraph-posts.js
const typeDefs = gql`
  extend schema
    @link(url: "https://specs.apollo.dev/federation/v2.0",
          import: ["@key"])

  type User @key(fields: "id") {
    id: ID!
    posts: [Post!]!
  }

  type Post @key(fields: "id") {
    id: ID!
    title: String!
    body: String!
    author: User!
  }

  type Query {
    post(id: ID!): Post
    posts: [Post!]!
  }
`;

const resolvers = {
  User: {
    posts: (user, args, { dataSources }) => {
      return dataSources.postsDB.getPostsByAuthorId(user.id);
    }
  },

  Post: {
    author: (post) => {
      return { __typename: 'User', id: post.authorId };
    }
  }
};

10. 性能监控

// plugins/monitoring.js
export const monitoringPlugin = {
  async requestDidStart() {
    const start = Date.now();

    return {
      async willSendResponse({ response, errors }) {
        const duration = Date.now() - start;

        console.log({
          duration,
          hasErrors: !!errors,
          operationName: request.operationName
        });

        // 发送到监控服务
        if (duration > 1000) {
          await metrics.recordSlowQuery({
            operation: request.operationName,
            duration
          });
        }
      },

      async didEncounterErrors({ errors }) {
        errors.forEach(error => {
          console.error('GraphQL 错误:', error);
          // 发送到错误跟踪服务
          errorTracker.captureException(error);
        });
      }
    };
  }
};

// 使用
const server = new ApolloServer({
  typeDefs,
  resolvers,
  plugins: [monitoringPlugin]
});

最佳实践

  1. 使用DataLoader - 批量化和缓存数据库查询
  2. 实现适当的认证 - 使用认证保护解析器
  3. 精心设计模式 - 首先考虑客户端需求
  4. 使用输入类型 - 正确验证突变输入
  5. 优雅处理错误 - 返回有意义的错误消息
  6. 实施监控 - 跟踪性能和错误
  7. 使用数据源 - 分离数据获取逻辑
  8. 利用联邦 - 将大型模式拆分为子图
  9. 适当缓存 - 使用Redis进行共享缓存
  10. 文档化模式 - 为类型和字段添加描述

常见陷阱

  1. N+1查询问题 - 未使用DataLoader进行批处理
  2. 解析器中的过度获取 - 加载不必要的数据
  3. 缺少错误处理 - 未捕获和格式化错误
  4. 模式设计不佳 - 未遵循GraphQL最佳实践
  5. 无认证 - 未认证就暴露敏感数据
  6. 阻塞操作 - 在解析器中执行同步操作
  7. 内存泄漏 - 未清理订阅
  8. 缺少验证 - 未验证输入数据
  9. 暴露内部细节 - 将数据库错误泄露给客户端
  10. 无限率限制 - 允许无限查询复杂性

使用场景

  • 构建GraphQL API
  • 使用联邦创建微服务
  • 开发实时应用
  • 构建移动后端
  • 创建统一API网关
  • 开发管理仪表板
  • 构建电商平台
  • 创建内容管理系统
  • 开发社交平台
  • 构建分析API

资源