序列化Skill serialization

序列化技能用于将数据结构转换为可存储或传输的格式,支持JSON、Protocol Buffers、MessagePack等多种格式,适用于数据交换、API开发、存储优化和跨平台通信。关键词:数据序列化,JSON序列化,Protobuf,MessagePack,API开发,数据交换,编码解码,模式进化。

架构设计 0 次安装 0 次浏览 更新于 3/24/2026

名称: 序列化 描述: 数据序列化和反序列化模式跨格式。当实现数据交换、API载荷、存储格式、编码/解码或跨语言通信时使用。关键词:序列化,反序列化,JSON,YAML,TOML,XML,Protocol Buffers,protobuf,MessagePack,CBOR,serde,编码,解码,模式,模式进化,版本控制,向后兼容,向前兼容,二进制格式,文本格式,数据交换,gRPC,API合同。

序列化

概述

序列化是将数据结构转换为可以存储或传输并在以后重建的格式的过程。这个技能涵盖了JSON最佳实践、像Protocol Buffers和MessagePack这样的二进制格式、模式进化策略和性能考虑。

关键概念

JSON序列化最佳实践

一致的命名约定:

// JavaScript/TypeScript API的camelCase
interface UserResponse {
  userId: string;
  firstName: string;
  lastName: string;
  emailAddress: string;
  createdAt: string;
}

// Python/Ruby API的snake_case
interface UserResponseSnake {
  user_id: string;
  first_name: string;
  last_name: string;
  email_address: string;
  created_at: string;
}

// 大小写转换工具
function toSnakeCase(str: string): string {
  return str.replace(/[A-Z]/g, (letter) => `_${letter.toLowerCase()}`);
}

function toCamelCase(str: string): string {
  return str.replace(/_([a-z])/g, (_, letter) => letter.toUpperCase());
}

function convertKeys(obj: any, converter: (key: string) => string): any {
  if (Array.isArray(obj)) {
    return obj.map((item) => convertKeys(item, converter));
  }
  if (obj !== null && typeof obj === "object") {
    return Object.fromEntries(
      Object.entries(obj).map(([key, value]) => [
        converter(key),
        convertKeys(value, converter),
      ]),
    );
  }
  return obj;
}

日期/时间处理:

// 始终使用ISO 8601格式
const dateFormats = {
  // 首选:带时区的完整ISO 8601
  iso8601: "2024-12-19T14:30:00.000Z",

  // 仅日期
  dateOnly: "2024-12-19",

  // 带时区偏移
  withOffset: "2024-12-19T14:30:00+00:00",

  // Unix时间戳(秒)- 用于精确计时
  unixSeconds: 1703000000,

  // Unix时间戳(毫秒)
  unixMillis: 1703000000000,
};

class DateSerializer {
  static toJSON(date: Date): string {
    return date.toISOString();
  }

  static fromJSON(value: string | number): Date {
    if (typeof value === "number") {
      // 处理秒和毫秒
      return new Date(value < 1e12 ? value * 1000 : value);
    }
    return new Date(value);
  }

  static toUnix(date: Date): number {
    return Math.floor(date.getTime() / 1000);
  }
}

空值、未定义与省略:

interface ApiResponse {
  // 必填字段 - 始终存在
  id: string;

  // 可选字段 - 可能省略
  nickname?: string;

  // 可为空的字段 - 存在但可能为null
  deletedAt: string | null;
}

// 序列化策略
const serializationStrategies = {
  // 策略1: 省略undefined,保留null
  omitUndefined: (obj: any) => JSON.parse(JSON.stringify(obj)),

  // 策略2: 将undefined转换为null
  undefinedToNull: (obj: any) =>
    JSON.parse(JSON.stringify(obj, (_, v) => (v === undefined ? null : v))),

  // 策略3: 显式处理
  explicit: (obj: any) => {
    const result: any = {};
    for (const [key, value] of Object.entries(obj)) {
      if (value !== undefined) {
        result[key] = value;
      }
    }
    return result;
  },
};

自定义JSON序列化:

class CustomSerializer {
  private serializers: Map<string, (value: any) => any> = new Map();
  private deserializers: Map<string, (value: any) => any> = new Map();

  registerType<T>(
    typeName: string,
    serialize: (value: T) => any,
    deserialize: (value: any) => T,
  ): void {
    this.serializers.set(typeName, serialize);
    this.deserializers.set(typeName, deserialize);
  }

  serialize(value: any): string {
    return JSON.stringify(value, (key, val) => {
      if (val instanceof Date) {
        return { __type: "Date", value: val.toISOString() };
      }
      if (val instanceof Map) {
        return { __type: "Map", value: Array.from(val.entries()) };
      }
      if (val instanceof Set) {
        return { __type: "Set", value: Array.from(val) };
      }
      if (val instanceof BigInt) {
        return { __type: "BigInt", value: val.toString() };
      }
      return val;
    });
  }

  deserialize<T>(json: string): T {
    return JSON.parse(json, (key, val) => {
      if (val && typeof val === "object" && "__type" in val) {
        switch (val.__type) {
          case "Date":
            return new Date(val.value);
          case "Map":
            return new Map(val.value);
          case "Set":
            return new Set(val.value);
          case "BigInt":
            return BigInt(val.value);
        }
      }
      return val;
    });
  }
}

// 用法
const serializer = new CustomSerializer();
const data = {
  id: 1,
  created: new Date(),
  tags: new Set(["a", "b"]),
  metadata: new Map([["key", "value"]]),
};

const json = serializer.serialize(data);
const restored = serializer.deserialize(json);

Protocol Buffers (Protobuf)

模式定义 (.proto):

syntax = "proto3";

package myapp;

// 枚举定义
enum OrderStatus {
  ORDER_STATUS_UNSPECIFIED = 0;
  ORDER_STATUS_PENDING = 1;
  ORDER_STATUS_CONFIRMED = 2;
  ORDER_STATUS_SHIPPED = 3;
  ORDER_STATUS_DELIVERED = 4;
  ORDER_STATUS_CANCELLED = 5;
}

// 消息定义
message User {
  string id = 1;
  string email = 2;
  string name = 3;
  optional string phone = 4;
  repeated string roles = 5;
  map<string, string> metadata = 6;
  google.protobuf.Timestamp created_at = 7;
}

message Address {
  string street = 1;
  string city = 2;
  string state = 3;
  string postal_code = 4;
  string country = 5;
}

message Order {
  string id = 1;
  string user_id = 2;
  OrderStatus status = 3;
  repeated OrderItem items = 4;
  Address shipping_address = 5;
  int64 total_cents = 6;
  string currency = 7;
  google.protobuf.Timestamp created_at = 8;
  google.protobuf.Timestamp updated_at = 9;
}

message OrderItem {
  string product_id = 1;
  string name = 2;
  int32 quantity = 3;
  int64 price_cents = 4;
}

// 服务定义(用于gRPC)
service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (Order);
  rpc GetOrder(GetOrderRequest) returns (Order);
  rpc ListOrders(ListOrdersRequest) returns (ListOrdersResponse);
  rpc UpdateOrderStatus(UpdateOrderStatusRequest) returns (Order);
}

message CreateOrderRequest {
  string user_id = 1;
  repeated OrderItem items = 2;
  Address shipping_address = 3;
}

message GetOrderRequest {
  string order_id = 1;
}

message ListOrdersRequest {
  string user_id = 1;
  int32 page_size = 2;
  string page_token = 3;
}

message ListOrdersResponse {
  repeated Order orders = 1;
  string next_page_token = 2;
}

message UpdateOrderStatusRequest {
  string order_id = 1;
  OrderStatus status = 2;
}

TypeScript使用 protobufjs:

import * as protobuf from "protobufjs";

class ProtobufSerializer {
  private root: protobuf.Root;

  async load(protoPath: string): Promise<void> {
    this.root = await protobuf.load(protoPath);
  }

  encode<T>(typeName: string, payload: T): Uint8Array {
    const MessageType = this.root.lookupType(typeName);
    const errMsg = MessageType.verify(payload);
    if (errMsg) throw new Error(errMsg);

    const message = MessageType.create(payload);
    return MessageType.encode(message).finish();
  }

  decode<T>(typeName: string, buffer: Uint8Array): T {
    const MessageType = this.root.lookupType(typeName);
    const message = MessageType.decode(buffer);
    return MessageType.toObject(message, {
      longs: String,
      enums: String,
      defaults: true,
    }) as T;
  }
}

// 用法
const serializer = new ProtobufSerializer();
await serializer.load("./schema.proto");

const order = {
  id: "ord_123",
  userId: "usr_456",
  status: "ORDER_STATUS_PENDING",
  items: [
    { productId: "prod_789", name: "Widget", quantity: 2, priceCents: 1999 },
  ],
  totalCents: 3998,
  currency: "USD",
};

const buffer = serializer.encode("myapp.Order", order);
const decoded = serializer.decode<typeof order>("myapp.Order", buffer);

Python使用:

from google.protobuf import json_format
import myapp_pb2

# 创建消息
order = myapp_pb2.Order(
    id='ord_123',
    user_id='usr_456',
    status=myapp_pb2.ORDER_STATUS_PENDING,
    total_cents=3998,
    currency='USD'
)

# 添加重复字段
item = order.items.add()
item.product_id = 'prod_789'
item.name = 'Widget'
item.quantity = 2
item.price_cents = 1999

# 序列化
binary_data = order.SerializeToString()

# 反序列化
parsed_order = myapp_pb2.Order()
parsed_order.ParseFromString(binary_data)

# 转换到/从JSON
json_str = json_format.MessageToJson(order)
from_json = json_format.Parse(json_str, myapp_pb2.Order())

MessagePack

基本使用:

import * as msgpack from "@msgpack/msgpack";

// 简单编码/解码
const data = {
  name: "Alice",
  age: 30,
  tags: ["developer", "typescript"],
  active: true,
  metadata: { key: "value" },
};

const encoded = msgpack.encode(data);
const decoded = msgpack.decode(encoded);

// 带选项
const encoder = new msgpack.Encoder({
  extensionCodec: createCustomCodec(),
  ignoreUndefined: true,
});

const decoder = new msgpack.Decoder({
  extensionCodec: createCustomCodec(),
});

自定义扩展类型:

import { ExtensionCodec } from "@msgpack/msgpack";

function createCustomCodec(): ExtensionCodec {
  const codec = new ExtensionCodec();

  // 日期扩展(类型0)
  codec.register({
    type: 0,
    encode: (value: unknown): Uint8Array | null => {
      if (value instanceof Date) {
        const ms = value.getTime();
        const buffer = new ArrayBuffer(8);
        new DataView(buffer).setBigInt64(0, BigInt(ms));
        return new Uint8Array(buffer);
      }
      return null;
    },
    decode: (data: Uint8Array): Date => {
      const ms = new DataView(data.buffer).getBigInt64(0);
      return new Date(Number(ms));
    },
  });

  // BigInt扩展(类型1)
  codec.register({
    type: 1,
    encode: (value: unknown): Uint8Array | null => {
      if (typeof value === "bigint") {
        return new TextEncoder().encode(value.toString());
      }
      return null;
    },
    decode: (data: Uint8Array): bigint => {
      return BigInt(new TextDecoder().decode(data));
    },
  });

  return codec;
}

流式编码器/解码器:

import { Encoder, Decoder, decodeMultiStream } from "@msgpack/msgpack";

// 编码多个消息到流
async function encodeStream(
  messages: any[],
  stream: WritableStream<Uint8Array>,
): Promise<void> {
  const encoder = new Encoder();
  const writer = stream.getWriter();

  for (const message of messages) {
    const encoded = encoder.encode(message);
    await writer.write(encoded);
  }

  await writer.close();
}

// 从流解码
async function* decodeStream<T>(
  stream: ReadableStream<Uint8Array>,
): AsyncIterable<T> {
  for await (const message of decodeMultiStream(stream)) {
    yield message as T;
  }
}

Rust Serde模式

基本Serde使用:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    id: String,
    email: String,
    name: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    phone: Option<String>,
    tags: Vec<String>,
}

// 序列化到JSON
let user = User {
    id: "usr_123".to_string(),
    email: "user@example.com".to_string(),
    name: "Alice".to_string(),
    phone: None,
    tags: vec!["admin".to_string()],
};

let json = serde_json::to_string(&user)?;
let pretty = serde_json::to_string_pretty(&user)?;

// 从JSON反序列化
let parsed: User = serde_json::from_str(&json)?;

带Serde属性的自定义序列化:

use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime};

#[derive(Serialize, Deserialize)]
struct Order {
    #[serde(rename = "orderId")]
    id: String,

    #[serde(rename = "userId")]
    user_id: String,

    // 跳过序列化默认值
    #[serde(skip_serializing_if = "Vec::is_empty", default)]
    items: Vec<OrderItem>,

    // 时间戳的自定义序列化
    #[serde(with = "timestamp_serde")]
    created_at: SystemTime,

    // 将嵌套结构扁平化到父级
    #[serde(flatten)]
    metadata: OrderMetadata,

    // 完全跳过字段
    #[serde(skip)]
    internal_state: String,
}

#[derive(Serialize, Deserialize)]
struct OrderItem {
    product_id: String,
    quantity: u32,
    price_cents: i64,
}

#[derive(Serialize, Deserialize)]
struct OrderMetadata {
    source: String,
    campaign: Option<String>,
}

// 自定义时间戳序列化模块
mod timestamp_serde {
    use serde::{Deserialize, Deserializer, Serializer};
    use std::time::{SystemTime, UNIX_EPOCH};

    pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        let duration = time.duration_since(UNIX_EPOCH)
            .map_err(serde::ser::Error::custom)?;
        serializer.serialize_u64(duration.as_secs())
    }

    pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
    where
        D: Deserializer<'de>,
    {
        let secs = u64::deserialize(deserializer)?;
        Ok(UNIX_EPOCH + std::time::Duration::from_secs(secs))
    }
}

Serde与多种格式:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Config {
    name: String,
    port: u16,
    database_url: String,
    features: Vec<String>,
}

// JSON
let json_str = serde_json::to_string(&config)?;
let from_json: Config = serde_json::from_str(&json_str)?;

// YAML
let yaml_str = serde_yaml::to_string(&config)?;
let from_yaml: Config = serde_yaml::from_str(&yaml_str)?;

// TOML
let toml_str = toml::to_string(&config)?;
let from_toml: Config = toml::from_str(&toml_str)?;

// MessagePack
let msgpack_bytes = rmp_serde::to_vec(&config)?;
let from_msgpack: Config = rmp_serde::from_slice(&msgpack_bytes)?;

// Bincode(二进制)
let bincode_bytes = bincode::serialize(&config)?;
let from_bincode: Config = bincode::deserialize(&bincode_bytes)?;

自定义序列化/反序列化实现:

use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;

struct UserId(u64);

impl Serialize for UserId {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        // 序列化为带前缀的字符串
        serializer.serialize_str(&format!("usr_{}", self.0))
    }
}

impl<'de> Deserialize<'de> for UserId {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        struct UserIdVisitor;

        impl<'de> serde::de::Visitor<'de> for UserIdVisitor {
            type Value = UserId;

            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
                formatter.write_str("像'usr_123'这样的用户ID字符串")
            }

            fn visit_str<E>(self, value: &str) -> Result<UserId, E>
            where
                E: serde::de::Error,
            {
                if let Some(id_str) = value.strip_prefix("usr_") {
                    id_str.parse::<u64>()
                        .map(UserId)
                        .map_err(|_| E::custom("无效的用户ID数字"))
                } else {
                    Err(E::custom("用户ID必须以'usr_'开头"))
                }
            }
        }

        deserializer.deserialize_str(UserIdVisitor)
    }
}

枚举序列化策略:

use serde::{Deserialize, Serialize};

// 外部标记(默认)
#[derive(Serialize, Deserialize)]
enum Message {
    Text(String),
    Image { url: String, width: u32, height: u32 },
}
// JSON: {"Text": "hello"} 或 {"Image": {"url": "...", "width": 100, "height": 100}}

// 内部标记
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum MessageInternal {
    Text { content: String },
    Image { url: String, width: u32, height: u32 },
}
// JSON: {"type": "Text", "content": "hello"}

// 相邻标记
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
enum MessageAdjacent {
    Text(String),
    Image { url: String, width: u32, height: u32 },
}
// JSON: {"type": "Text", "data": "hello"}

// 未标记(根据形状确定)
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum Value {
    String(String),
    Number(i64),
    Boolean(bool),
}
// JSON: "hello" 或 123 或 true

用于gRPC的Protocol Buffers

带gRPC的服务定义:

syntax = "proto3";

package users.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 管理用户账户的用户服务
service UserService {
  // 一元RPC
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc UpdateUser(UpdateUserRequest) returns (User);
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);

  // 服务器流式RPC
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // 客户端流式RPC
  rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateUsersResponse);

  // 双向流式RPC
  rpc SyncUsers(stream UserUpdate) returns (stream UserUpdate);
}

message User {
  string id = 1;
  string email = 2;
  string name = 3;
  optional string phone = 4;
  repeated string roles = 5;
  google.protobuf.Timestamp created_at = 6;
  google.protobuf.Timestamp updated_at = 7;
}

message GetUserRequest {
  string id = 1;
}

message CreateUserRequest {
  string email = 1;
  string name = 2;
  optional string phone = 3;
  repeated string roles = 4;
}

message UpdateUserRequest {
  string id = 1;
  optional string email = 2;
  optional string name = 3;
  optional string phone = 4;
  repeated string roles = 5;
}

message DeleteUserRequest {
  string id = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
  optional string filter = 3;
}

message BatchCreateUsersResponse {
  repeated User users = 1;
  int32 success_count = 2;
  int32 failure_count = 3;
}

message UserUpdate {
  enum UpdateType {
    UPDATE_TYPE_UNSPECIFIED = 0;
    UPDATE_TYPE_CREATED = 1;
    UPDATE_TYPE_UPDATED = 2;
    UPDATE_TYPE_DELETED = 3;
  }

  UpdateType type = 1;
  User user = 2;
}

Rust gRPC服务器实现(tonic):

use tonic::{transport::Server, Request, Response, Status};

pub mod users {
    tonic::include_proto!("users.v1");
}

use users::user_service_server::{UserService, UserServiceServer};
use users::{User, GetUserRequest, CreateUserRequest, ListUsersRequest};

#[derive(Default)]
pub struct UserServiceImpl {}

#[tonic::async_trait]
impl UserService for UserServiceImpl {
    async fn get_user(
        &self,
        request: Request<GetUserRequest>,
    ) -> Result<Response<User>, Status> {
        let req = request.into_inner();

        // 业务逻辑在这里
        let user = User {
            id: req.id,
            email: "user@example.com".to_string(),
            name: "Alice".to_string(),
            phone: None,
            roles: vec!["user".to_string()],
            created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
            updated_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
        };

        Ok(Response::new(user))
    }

    async fn create_user(
        &self,
        request: Request<CreateUserRequest>,
    ) -> Result<Response<User>, Status> {
        let req = request.into_inner();

        // 验证
        if req.email.is_empty() {
            return Err(Status::invalid_argument("email是必填项"));
        }

        // 创建用户逻辑...
        let user = User {
            id: uuid::Uuid::new_v4().to_string(),
            email: req.email,
            name: req.name,
            phone: req.phone,
            roles: req.roles,
            created_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
            updated_at: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
        };

        Ok(Response::new(user))
    }

    type ListUsersStream = tokio_stream::wrappers::ReceiverStream<Result<User, Status>>;

    async fn list_users(
        &self,
        request: Request<ListUsersRequest>,
    ) -> Result<Response<Self::ListUsersStream>, Status> {
        let (tx, rx) = tokio::sync::mpsc::channel(128);

        tokio::spawn(async move {
            // 从数据库流式传输用户
            for i in 0..10 {
                let user = User {
                    id: format!("usr_{}", i),
                    email: format!("user{}@example.com", i),
                    name: format!("User {}", i),
                    phone: None,
                    roles: vec![],
                    created_at: None,
                    updated_at: None,
                };

                if tx.send(Ok(user)).await.is_err() {
                    break;
                }
            }
        });

        Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let service = UserServiceImpl::default();

    Server::builder()
        .add_service(UserServiceServer::new(service))
        .serve(addr)
        .await?;

    Ok(())
}

Rust gRPC客户端:

use users::user_service_client::UserServiceClient;
use users::{GetUserRequest, CreateUserRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = UserServiceClient::connect("http://[::1]:50051").await?;

    // 一元调用
    let request = tonic::Request::new(GetUserRequest {
        id: "usr_123".to_string(),
    });

    let response = client.get_user(request).await?;
    println!("User: {:?}", response.into_inner());

    // 创建用户
    let request = tonic::Request::new(CreateUserRequest {
        email: "new@example.com".to_string(),
        name: "New User".to_string(),
        phone: None,
        roles: vec!["user".to_string()],
    });

    let response = client.create_user(request).await?;
    println!("Created: {:?}", response.into_inner());

    // 流式调用
    let request = tonic::Request::new(ListUsersRequest {
        page_size: 10,
        page_token: String::new(),
        filter: None,
    });

    let mut stream = client.list_users(request).await?.into_inner();

    while let Some(user) = stream.message().await? {
        println!("Received user: {:?}", user);
    }

    Ok(())
}

模式进化与版本控制

字段编号策略(Protobuf):

message User {
  // 核心字段: 1-15(1字节标签,最有效)
  string id = 1;
  string email = 2;
  string name = 3;

  // 常见字段: 16-100
  optional string phone = 16;
  optional string avatar_url = 17;

  // 保留给未来使用: 101-200
  reserved 101 to 200;

  // 扩展字段: 201+
  map<string, string> metadata = 201;

  // 已弃用字段(永远不要重用编号!)
  reserved 50, 51;
  reserved "old_field", "legacy_field";
}

JSON模式版本控制:

interface SchemaVersion {
  version: number;
  schema: object;
  migrate?: (data: any, fromVersion: number) => any;
}

class VersionedSerializer {
  private versions: Map<number, SchemaVersion> = new Map();
  private currentVersion: number = 1;

  registerVersion(version: SchemaVersion): void {
    this.versions.set(version.version, version);
    if (version.version > this.currentVersion) {
      this.currentVersion = version.version;
    }
  }

  serialize(data: any): { version: number; data: any } {
    return {
      version: this.currentVersion,
      data,
    };
  }

  deserialize(payload: { version: number; data: any }): any {
    let data = payload.data;
    let version = payload.version;

    // 如果需要,通过版本迁移
    while (version < this.currentVersion) {
      const nextVersion = version + 1;
      const schema = this.versions.get(nextVersion);

      if (schema?.migrate) {
        data = schema.migrate(data, version);
      }

      version = nextVersion;
    }

    return data;
  }
}

// 示例用法
const serializer = new VersionedSerializer();

serializer.registerVersion({
  version: 1,
  schema: { type: "object", properties: { name: { type: "string" } } },
});

serializer.registerVersion({
  version: 2,
  schema: {
    type: "object",
    properties: {
      firstName: { type: "string" },
      lastName: { type: "string" },
    },
  },
  migrate: (data, fromVersion) => {
    if (fromVersion === 1) {
      const [firstName, ...rest] = (data.name || "").split(" ");
      return {
        firstName,
        lastName: rest.join(" "),
      };
    }
    return data;
  },
});

向后/向前兼容性

兼容性规则:

// 维护兼容性的规则
const compatibilityRules = {
  // 向后兼容(新代码读取旧数据)
  backwardCompatible: [
    "添加带默认值的可选字段",
    "添加新的枚举值(不在位置0)",
    "删除必填字段(视为可选)",
    "加宽数字类型(int32 -> int64)",
    "添加新的消息类型",
  ],

  // 向前兼容(旧代码读取新数据)
  forwardCompatible: [
    "添加可选字段(旧代码忽略)",
    "添加新的枚举值(旧代码使用默认值)",
    "旧代码忽略未知字段",
  ],

  // 破坏性变更(避免!)
  breakingChanges: [
    "更改字段类型",
    "更改字段编号",
    "删除必填字段",
    "重命名字段(在JSON中)",
    "将字段从可选更改为必填",
  ],
};

防御性反序列化:

class SafeDeserializer<T> {
  constructor(
    private schema: {
      required: string[];
      optional: string[];
      defaults: Partial<T>;
    },
  ) {}

  deserialize(json: string): T {
    let parsed: any;

    try {
      parsed = JSON.parse(json);
    } catch (e) {
      throw new DeserializationError("无效的JSON");
    }

    if (typeof parsed !== "object" || parsed === null) {
      throw new DeserializationError("预期对象");
    }

    // 检查必填字段
    for (const field of this.schema.required) {
      if (!(field in parsed)) {
        throw new DeserializationError(`缺少必填字段: ${field}`);
      }
    }

    // 为缺失的可选字段应用默认值
    const result = { ...this.schema.defaults } as T;

    for (const key of [...this.schema.required, ...this.schema.optional]) {
      if (key in parsed) {
        (result as any)[key] = parsed[key];
      }
    }

    // 忽略未知字段(向前兼容)
    return result;
  }
}

class DeserializationError extends Error {
  constructor(message: string) {
    super(message);
    this.name = "DeserializationError";
  }
}

联合类型与鉴别器:

// 使用可区分联合类型进行扩展
type Event =
  | { type: "user.created"; payload: UserCreatedPayload }
  | { type: "user.updated"; payload: UserUpdatedPayload }
  | { type: "order.created"; payload: OrderCreatedPayload };

function deserializeEvent(json: string): Event | null {
  const data = JSON.parse(json);

  // 优雅地处理未知事件类型
  switch (data.type) {
    case "user.created":
      return { type: "user.created", payload: data.payload };
    case "user.updated":
      return { type: "user.updated", payload: data.payload };
    case "order.created":
      return { type: "order.created", payload: data.payload };
    default:
      // 向前兼容:忽略未知类型
      console.warn(`未知事件类型: ${data.type}`);
      return null;
  }
}

自定义序列化器

类型安全序列化器框架:

interface Serializer<T> {
  serialize(value: T): any;
  deserialize(raw: any): T;
}

class SerializerRegistry {
  private serializers: Map<string, Serializer<any>> = new Map();

  register<T>(name: string, serializer: Serializer<T>): void {
    this.serializers.set(name, serializer);
  }

  get<T>(name: string): Serializer<T> {
    const serializer = this.serializers.get(name);
    if (!serializer) {
      throw new Error(`没有为以下名称注册序列化器: ${name}`);
    }
    return serializer;
  }
}

// 内置序列化器
const dateSerializer: Serializer<Date> = {
  serialize: (date) => date.toISOString(),
  deserialize: (raw) => new Date(raw),
};

const decimalSerializer: Serializer<number> = {
  serialize: (num) => num.toFixed(2),
  deserialize: (raw) => parseFloat(raw),
};

const moneySerializer: Serializer<{ amount: number; currency: string }> = {
  serialize: (money) => ({
    amount: Math.round(money.amount * 100),
    currency: money.currency,
  }),
  deserialize: (raw) => ({
    amount: raw.amount / 100,
    currency: raw.currency,
  }),
};

基于装饰器的序列化:

import "reflect-metadata";

const SERIALIZABLE_KEY = Symbol("serializable");
const PROPERTY_KEY = Symbol("property");

interface PropertyOptions {
  name?: string;
  serializer?: Serializer<any>;
  optional?: boolean;
  default?: any;
}

function Serializable(options?: { discriminator?: string }) {
  return function (constructor: Function) {
    Reflect.defineMetadata(SERIALIZABLE_KEY, options || {}, constructor);
  };
}

function Property(options?: PropertyOptions) {
  return function (target: any, propertyKey: string) {
    const existing = Reflect.getMetadata(PROPERTY_KEY, target) || [];
    existing.push({ key: propertyKey, options: options || {} });
    Reflect.defineMetadata(PROPERTY_KEY, existing, target);
  };
}

@Serializable()
class User {
  @Property()
  id: string;

  @Property({ name: "email_address" })
  email: string;

  @Property({ serializer: dateSerializer })
  createdAt: Date;

  @Property({ optional: true, default: [] })
  tags: string[];
}

function serialize<T>(instance: T): any {
  const prototype = Object.getPrototypeOf(instance);
  const properties = Reflect.getMetadata(PROPERTY_KEY, prototype) || [];

  const result: any = {};

  for (const { key, options } of properties) {
    const value = (instance as any)[key];
    const outputKey = options.name || key;

    if (value === undefined && options.optional) {
      continue;
    }

    if (options.serializer) {
      result[outputKey] = options.serializer.serialize(value);
    } else {
      result[outputKey] = value;
    }
  }

  return result;
}

性能考虑

基准比较:

import Benchmark from "benchmark";
import * as msgpack from "@msgpack/msgpack";

const testData = {
  id: "user_123456789",
  email: "user@example.com",
  name: "Test User",
  age: 30,
  active: true,
  roles: ["admin", "user"],
  metadata: {
    lastLogin: "2024-12-19T00:00:00Z",
    preferences: { theme: "dark", language: "en" },
  },
};

const suite = new Benchmark.Suite();

suite
  .add("JSON.stringify", () => {
    JSON.stringify(testData);
  })
  .add("JSON.parse", () => {
    JSON.parse(JSON.stringify(testData));
  })
  .add("MessagePack encode", () => {
    msgpack.encode(testData);
  })
  .add("MessagePack decode", () => {
    msgpack.decode(msgpack.encode(testData));
  })
  .on("cycle", (event: any) => {
    console.log(String(event.target));
  })
  .run();

大小优化:

// 减少载荷大小的策略

// 1. 字段名缩短(带映射)
const fieldMap = {
  userId: "u",
  firstName: "fn",
  lastName: "ln",
  emailAddress: "e",
  createdAt: "ca",
};

function compressKeys(obj: any, map: Record<string, string>): any {
  const result: any = {};
  for (const [key, value] of Object.entries(obj)) {
    const newKey = map[key] || key;
    result[newKey] =
      typeof value === "object" && value !== null
        ? compressKeys(value, map)
        : value;
  }
  return result;
}

// 2. 基于数组的编码用于已知模式
interface UserTuple {
  0: string; // id
  1: string; // email
  2: string; // name
  3: number; // createdAt (unix时间戳)
}

function toTuple(user: User): UserTuple {
  return [user.id, user.email, user.name, user.createdAt.getTime()];
}

function fromTuple(tuple: UserTuple): User {
  return {
    id: tuple[0],
    email: tuple[1],
    name: tuple[2],
    createdAt: new Date(tuple[3]),
  };
}

// 3. 用于更新的增量编码
function createDelta(original: any, updated: any): any {
  const delta: any = {};

  for (const key of Object.keys(updated)) {
    if (JSON.stringify(original[key]) !== JSON.stringify(updated[key])) {
      delta[key] = updated[key];
    }
  }

  return delta;
}

function applyDelta(original: any, delta: any): any {
  return { ...original, ...delta };
}

大型载荷的流式处理:

import { createReadStream, createWriteStream } from "fs";
import { Transform } from "stream";

class JSONLineSerializer extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk: any, encoding: string, callback: Function): void {
    try {
      const line = JSON.stringify(chunk) + "
";
      callback(null, line);
    } catch (error) {
      callback(error);
    }
  }
}

class JSONLineDeserializer extends Transform {
  private buffer: string = "";

  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk: Buffer, encoding: string, callback: Function): void {
    this.buffer += chunk.toString();
    const lines = this.buffer.split("
");
    this.buffer = lines.pop() || "";

    for (const line of lines) {
      if (line.trim()) {
        try {
          this.push(JSON.parse(line));
        } catch (error) {
          // 跳过格式错误的行
        }
      }
    }

    callback();
  }

  _flush(callback: Function): void {
    if (this.buffer.trim()) {
      try {
        this.push(JSON.parse(this.buffer));
      } catch (error) {
        // 跳过格式错误的行
      }
    }
    callback();
  }
}

最佳实践

JSON

  • 使用一致的命名约定(camelCase或snake_case)
  • 始终使用ISO 8601处理日期
  • 显式处理null/undefined
  • 保持载荷大小合理
  • 在处理前验证输入

Protocol Buffers

  • 为已弃用字段保留字段编号
  • 使用optional用于可能缺失的字段
  • 避免更改字段类型
  • 为常见模式使用已知类型(Timestamp、Duration、Empty)
  • 使用包名版本化您的.proto文件(例如,myapp.v1)
  • 对于gRPC:设计具有清晰一元/流式语义的服务

Rust Serde

  • 对标准情况使用派生宏
  • 利用serde属性进行字段重命名和控制
  • 为复杂类型实现自定义序列化/反序列化
  • 使用skip_serializing_if省略可选字段
  • 根据JSON兼容性需求选择枚举标记策略
  • 以最少的代码支持多种格式(JSON、YAML、TOML)

模式进化

  • 从一开始就规划模式更改
  • 始终将新字段添加为可选
  • 永远不要重用字段编号或名称
  • 测试向后/向前兼容性
  • 记录破坏性变更
  • 在protobuf中使用版本化包(例如,users.v1、users.v2)
  • 为破坏性变更实现迁移逻辑

性能

  • 根据用例选择格式(JSON用于调试,二进制用于性能)
  • 对大型载荷使用流式处理
  • 考虑对大型JSON进行压缩
  • 在您的特定上下文中分析序列化
  • 缓存序列化器/反序列化器
  • 对于Rust:bincode最快,MessagePack平衡大小和速度

示例

完整序列化层

// 支持多种格式的通用序列化层
interface SerializationFormat {
  name: string;
  contentType: string;
  encode<T>(data: T): Buffer;
  decode<T>(buffer: Buffer): T;
}

const jsonFormat: SerializationFormat = {
  name: "json",
  contentType: "application/json",
  encode: (data) => Buffer.from(JSON.stringify(data)),
  decode: (buffer) => JSON.parse(buffer.toString()),
};

const msgpackFormat: SerializationFormat = {
  name: "msgpack",
  contentType: "application/msgpack",
  encode: (data) => Buffer.from(msgpack.encode(data)),
  decode: (buffer) => msgpack.decode(buffer) as any,
};

class SerializationService {
  private formats: Map<string, SerializationFormat> = new Map();
  private defaultFormat: string = "json";

  constructor() {
    this.registerFormat(jsonFormat);
    this.registerFormat(msgpackFormat);
  }

  registerFormat(format: SerializationFormat): void {
    this.formats.set(format.name, format);
  }

  serialize<T>(
    data: T,
    formatName?: string,
  ): {
    buffer: Buffer;
    contentType: string;
  } {
    const format = this.formats.get(formatName || this.defaultFormat);
    if (!format) {
      throw new Error(`未知格式: ${formatName}`);
    }

    return {
      buffer: format.encode(data),
      contentType: format.contentType,
    };
  }

  deserialize<T>(buffer: Buffer, contentType: string): T {
    const format = Array.from(this.formats.values()).find(
      (f) => f.contentType === contentType,
    );

    if (!format) {
      throw new Error(`未知内容类型: ${contentType}`);
    }

    return format.decode(buffer);
  }

  // 内容协商助手
  negotiate(acceptHeader: string): SerializationFormat {
    const accepted = acceptHeader.split(",").map((s) => s.trim().split(";")[0]);

    for (const type of accepted) {
      const format = Array.from(this.formats.values()).find(
        (f) => f.contentType === type,
      );
      if (format) return format;
    }

    return this.formats.get(this.defaultFormat)!;
  }
}

// Express中间件
function serializationMiddleware(service: SerializationService) {
  return (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction,
  ) => {
    // 确定响应格式
    const format = service.negotiate(req.headers.accept || "application/json");

    // 覆盖res.json
    const originalJson = res.json.bind(res);
    res.json = (data: any) => {
      const { buffer, contentType } = service.serialize(data, format.name);
      res.contentType(contentType);
      res.send(buffer);
      return res;
    };

    next();
  };
}