GraphQL实施 graphql-implementation

设计和实现 GraphQL APIs,包括模式设计、解析器、查询、变更、订阅和最佳实践。适用于构建 GraphQL 服务器、设计模式或从 REST 迁移到 GraphQL。

后端开发 0 次安装 0 次浏览 更新于 3/4/2026

GraphQL 实施

概述

使用适当的模式设计、解析器模式、错误处理和性能优化来实现 GraphQL APIs,以实现灵活的客户端-服务器通信。

何时使用

  • 设计新的 GraphQL APIs
  • 创建 GraphQL 模式和类型
  • 实现解析器和变更
  • 添加订阅以获取实时数据
  • 从 REST 迁移到 GraphQL
  • 优化 GraphQL 性能

指令

1. GraphQL 模式设计

type User {
  id: ID!
  email: String!
  firstName: String!
  lastName: String!
  role: UserRole!
  posts: [Post!]!
  createdAt: DateTime!
  updatedAt: DateTime!
}

enum UserRole {
  ADMIN
  USER
  MODERATOR
}

type Post {
  id: ID!
  title: String!
  content: String!
  author: User!
  comments: [Comment!]!
  publishedAt: DateTime
  createdAt: DateTime!
}

type Comment {
  id: ID!
  text: String!
  author: User!
  post: Post!
  createdAt: DateTime!
}

type Query {
  user(id: ID!): User
  users(limit: Int, offset: Int): [User!]!
  post(id: ID!): Post
  posts(authorId: ID, limit: Int, offset: Int): [Post!]!
  search(query: String!): [SearchResult!]!
}

union SearchResult = User | Post | Comment

type Mutation {
  createUser(input: CreateUserInput!): User!
  updateUser(id: ID!, input: UpdateUserInput!): User!
  deleteUser(id: ID!): Boolean!
  createPost(input: CreatePostInput!): Post!
  updatePost(id: ID!, input: UpdatePostInput!): Post!
  deletePost(id: ID!): Boolean!
  createComment(postId: ID!, text: String!): Comment!
}

input CreateUserInput {
  email: String!
  firstName: String!
  lastName: String!
  role: UserRole!
}

input UpdateUserInput {
  email: String
  firstName: String
  lastName: String
  role: UserRole
}

input CreatePostInput {
  title: String!
  content: String!
}

input UpdatePostInput {
  title: String
  content: String
  publishedAt: DateTime
}

type Subscription {
  userCreated: User!
  postPublished: Post!
  commentAdded(postId: ID!): Comment!
}

scalar DateTime

2. Node.js Apollo Server 实施

const { ApolloServer, gql } = require('apollo-server-express');
const express = require('express');

const typeDefs = gql`
  type Query {
    user(id: ID!): User
    users: [User!]!
  }

  type User {
    id: ID!
    email: String!
    firstName: String!
    lastName: String!
    posts: [Post!]!
  }

  type Post {
    id: ID!
    title: String!
    content: String!
    author: User!
  }

  type Mutation {
    createUser(email: String!, firstName: String!, lastName: String!): User!
    createPost(title: String!, content: String!): Post!
  }
`;

const resolvers = {
  Query: {
    user: async (_, { id }, { db }) => {
      return db.users.findById(id);
    },
    users: async (_, __, { db }) => {
      return db.users.findAll();
    }
  },

  User: {
    posts: async (user, _, { db }) => {
      return db.posts.findByAuthorId(user.id);
    }
  },

  Post: {
    author: async (post, _, { db }) => {
      return db.users.findById(post.authorId);
    }
  },

  Mutation: {
    createUser: async (_, { email, firstName, lastName }, { db }) => {
      const user = { id: Date.now().toString(), email, firstName, lastName };
      db.users.save(user);
      return user;
    },
    createPost: async (_, { title, content }, { user, db }) => {
      if (!user) throw new Error('Unauthorized');
      const post = { id: Date.now().toString(), title, content, authorId: user.id };
      db.posts.save(post);
      return post;
    }
  }
};

const server = new ApolloServer({
  typeDefs,
  resolvers,
  context: ({ req }) => ({
    user: req.user,
    db: require('./database')
  })
});

const app = express();
server.start().then(() => {
  server.applyMiddleware({ app });
  app.listen(4000, () => console.log('GraphQL 服务器在 4000 端口运行'));
});

3. Python GraphQL 实施 (Graphene)

import graphene
from datetime import datetime
from typing import List

class UserType(graphene.ObjectType):
    id = graphene.ID(required=True)
    email = graphene.String(required=True)
    first_name = graphene.String(required=True)
    last_name = graphene.String(required=True)
    posts = graphene.List(lambda: PostType)

class PostType(graphene.ObjectType):
    id = graphene.ID(required=True)
    title = graphene.String(required=True)
    content = graphene.String(required=True)
    author = graphene.Field(UserType)
    created_at = graphene.DateTime(required=True)

class Query(graphene.ObjectType):
    user = graphene.Field(UserType, id=graphene.ID(required=True))
    users = graphene.List(UserType)
    posts = graphene.List(PostType, author_id=graphene.ID())

    def resolve_user(self, info, id):
        return User.objects.get(pk=id)

    def resolve_users(self, info):
        return User.objects.all()

    def resolve_posts(self, info, author_id=None):
        if author_id:
            return Post.objects.filter(author_id=author_id)
        return Post.objects.all()

class CreateUserMutation(graphene.Mutation):
    class Arguments:
        email = graphene.String(required=True)
        first_name = graphene.String(required=True)
        last_name = graphene.String(required=True)

    user = graphene.Field(UserType)
    success = graphene.Boolean()

    def mutate(self, info, email, first_name, last_name):
        user = User.objects.create(
            email=email,
            first_name=first_name,
            last_name=last_name
        )
        return CreateUserMutation(user=user, success=True)

class Mutation(graphene.ObjectType):
    create_user = CreateUserMutation.Field()

schema = graphene.Schema(query=Query, mutation=Mutation)

4. 查询示例

# 获取带有帖子的用户
query GetUserWithPosts {
  user(id: "123") {
    id
    email
    firstName
    posts {
      id
      title
      createdAt
    }
  }
}

# 分页用户查询
query GetUsers($limit: Int, $offset: Int) {
  users(limit: $limit, offset: $offset) {
    id
    email
    firstName
  }
}

# 跨类型搜索
query Search($query: String!) {
  search(query: $query) {
    ... on User {
      id
      email
    }
    ... on Post {
      id
      title
    }
  }
}

# 创建用户变更
mutation CreateUser($input: CreateUserInput!) {
  createUser(input: $input) {
    id
    email
    firstName
  }
}

# 订阅新评论
subscription OnCommentAdded($postId: ID!) {
  commentAdded(postId: $postId) {
    id
    text
    author {
      firstName
    }
  }
}

5. 错误处理

const resolvers = {
  Query: {
    user: async (_, { id }) => {
      try {
        const user = await User.findById(id);
        if (!user) {
          throw new GraphQLError('用户未找到', {
            extensions: {
              code: 'NOT_FOUND',
              userId: id
            }
          });
        }
        return user;
      } catch (error) {
        throw new GraphQLError('数据库错误', {
          originalError: error,
          extensions: { code: 'INTERNAL_ERROR' }
        });
      }
    }
  }
};

server.formatError = (formattedError) => ({
  message: formattedError.message,
  code: formattedError.extensions?.code || 'INTERNAL_ERROR',
  timestamp: new Date().toISOString()
});

最佳实践

✅ 应该做

  • 使用清晰、描述性的字段名称
  • 围绕客户端需求设计模式
  • 实施适当的错误处理
  • 为变更使用输入类型
  • 添加订阅以获取实时数据
  • 有效缓存解析器
  • 验证输入数据
  • 为可扩展性使用联合

❌ 不应该做

  • 过度嵌套查询
  • 暴露内部数据库 ID
  • 在没有授权的情况下返回敏感数据
  • 创建过于复杂的模式
  • 忘记处理空值
  • 忽略 N+1 查询问题
  • 跳过错误消息

性能提示

  • 使用 DataLoader 批量数据库查询
  • 实施查询复杂性分析
  • 在解析器级别缓存
  • 使用连接模式进行分页
  • 监控查询执行时间