JSON转换器技能Skill json-transformer

JSON转换器技能是一个用于处理JSON数据的工具,支持解析、验证、转换、合并、提取等高级操作,广泛应用于数据工程、后端开发和数据分析领域,方便数据处理和API集成。关键词:JSON处理、数据转换、数据分析、数据清洗、编程工具、API数据、格式转换、数据提取。

数据工程 0 次安装 0 次浏览 更新于 3/11/2026

名称: json-transformer 描述: 转换、操作和分析JSON数据结构,支持高级操作。

JSON转换器技能

转换、操作和分析JSON数据结构,支持高级操作。

指令

您是JSON转换专家。当调用时:

  1. 解析和验证JSON

    • 从文件、字符串或API解析JSON
    • 验证JSON结构和模式
    • 优雅地处理格式错误的JSON
    • 美化打印和格式化JSON
    • 检测和修复常见JSON问题
  2. 转换数据结构

    • 重塑嵌套对象和数组
    • 扁平化和取消扁平化结构
    • 提取特定路径(JSONPath、JMESPath)
    • 合并和组合JSON文档
    • 过滤和映射数据
  3. 高级操作

    • 在JSON和其他格式之间转换(CSV、YAML、XML)
    • 应用转换(jq风格操作)
    • 查询和搜索JSON数据
    • 比较JSON文档差异
    • 从模式生成JSON
  4. 数据操作

    • 添加、更新、删除属性
    • 重命名键
    • 转换数据类型
    • 排序和去重
    • 计算聚合值

使用示例

@json-transformer data.json
@json-transformer --flatten
@json-transformer --path "users[*].email"
@json-transformer --merge file1.json file2.json
@json-transformer --to-csv data.json
@json-transformer --validate schema.json

基本JSON操作

解析和写入

Python

import json

# 解析JSON字符串
data = json.loads('{"name": "John", "age": 30}')

# 从文件解析
with open('data.json', 'r') as f:
    data = json.load(f)

# 写入JSON到文件
with open('output.json', 'w') as f:
    json.dump(data, f, indent=2)

# 美化打印
print(json.dumps(data, indent=2, sort_keys=True))

# 紧凑输出
compact = json.dumps(data, separators=(',', ':'))

# 处理特殊类型
from datetime import datetime
import decimal

def json_encoder(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    if isinstance(obj, decimal.Decimal):
        return float(obj)
    raise TypeError(f"Type {type(obj)} not serializable")

json.dumps(data, default=json_encoder)

JavaScript

// 解析JSON字符串
const data = JSON.parse('{"name": "John", "age": 30}');

// 从文件解析(Node.js)
const fs = require('fs');
const data = JSON.parse(fs.readFileSync('data.json', 'utf8'));

// 写入JSON到文件
fs.writeFileSync('output.json', JSON.stringify(data, null, 2));

// 美化打印
console.log(JSON.stringify(data, null, 2));

// 自定义序列化
const json = JSON.stringify(data, (key, value) => {
  if (value instanceof Date) {
    return value.toISOString();
  }
  return value;
}, 2);

jq(命令行)

# 美化打印
cat data.json | jq '.'

# 紧凑输出
cat data.json | jq -c '.'

# 排序键
cat data.json | jq -S '.'

# 从文件读取,写入到文件
jq '.' input.json > output.json

验证

Python(jsonschema)

from jsonschema import validate, ValidationError

# 定义模式
schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "age": {"type": "number", "minimum": 0},
        "email": {"type": "string", "format": "email"}
    },
    "required": ["name", "email"]
}

# 验证数据
data = {"name": "John", "email": "john@example.com", "age": 30}

try:
    validate(instance=data, schema=schema)
    print("有效JSON")
except ValidationError as e:
    print(f"无效: {e.message}")

# 针对JSON Schema草案验证
from jsonschema import Draft7Validator

validator = Draft7Validator(schema)
errors = list(validator.iter_errors(data))
for error in errors:
    print(f"错误在 {'.'.join(str(p) for p in error.path)}: {error.message}")

JavaScript(ajv)

const Ajv = require('ajv');
const ajv = new Ajv();

const schema = {
  type: 'object',
  properties: {
    name: { type: 'string' },
    age: { type: 'number', minimum: 0 },
    email: { type: 'string', format: 'email' }
  },
  required: ['name', 'email']
};

const validate = ajv.compile(schema);

const data = { name: 'John', email: 'john@example.com', age: 30 };

if (validate(data)) {
  console.log('有效JSON');
} else {
  console.log('无效:', validate.errors);
}

数据提取和查询

JSONPath查询

Python(jsonpath-ng)

from jsonpath_ng import jsonpath, parse

data = {
    "users": [
        {"name": "John", "age": 30, "email": "john@example.com"},
        {"name": "Jane", "age": 25, "email": "jane@example.com"}
    ]
}

# 提取所有用户名
jsonpath_expr = parse('users[*].name')
names = [match.value for match in jsonpath_expr.find(data)]
# 结果: ['John', 'Jane']

# 提取年龄超过25岁的用户邮箱
jsonpath_expr = parse('users[?(@.age > 25)].email')
emails = [match.value for match in jsonpath_expr.find(data)]

# 嵌套提取
data = {
    "company": {
        "departments": [
            {
                "name": "Engineering",
                "employees": [
                    {"name": "Alice", "salary": 100000},
                    {"name": "Bob", "salary": 90000}
                ]
            }
        ]
    }
}

jsonpath_expr = parse('company.departments[*].employees[*].name')
names = [match.value for match in jsonpath_expr.find(data)]

jq

# 提取字段
echo '{"name": "John", "age": 30}' | jq '.name'

# 从数组提取
echo '[{"name": "John"}, {"name": "Jane"}]' | jq '.[].name'

# 过滤数组
echo '[{"name": "John", "age": 30}, {"name": "Jane", "age": 25}]' | \
  jq '.[] | select(.age > 25)'

# 提取嵌套字段
cat data.json | jq '.users[].email'

# 多字段
cat data.json | jq '.users[] | {name: .name, email: .email}'

# 条件提取
cat data.json | jq '.users[] | select(.age > 25) | .email'

JMESPath查询

Python(jmespath)

import jmespath

data = {
    "users": [
        {"name": "John", "age": 30, "tags": ["admin", "developer"]},
        {"name": "Jane", "age": 25, "tags": ["developer"]},
        {"name": "Bob", "age": 35, "tags": ["manager"]}
    ]
}

# 简单提取
names = jmespath.search('users[*].name', data)
# 结果: ['John', 'Jane', 'Bob']

# 过滤
admins = jmespath.search('users[?contains(tags, `admin`)]', data)

# 多条件
senior_devs = jmespath.search(
    'users[?age > `28` && contains(tags, `developer`)]',
    data
)

# 投影
result = jmespath.search('users[*].{name: name, age: age}', data)

# 嵌套查询
data = {
    "departments": [
        {
            "name": "Engineering",
            "employees": [
                {"name": "Alice", "skills": ["Python", "Go"]},
                {"name": "Bob", "skills": ["JavaScript", "Python"]}
            ]
        }
    ]
}

python_devs = jmespath.search(
    'departments[*].employees[?contains(skills, `Python`)].name',
    data
)

数据转换

扁平化嵌套JSON

Python

def flatten_json(nested_json, parent_key='', sep='.'):
    """
    扁平化嵌套JSON结构
    """
    items = []

    for key, value in nested_json.items():
        new_key = f"{parent_key}{sep}{key}" if parent_key else key

        if isinstance(value, dict):
            items.extend(flatten_json(value, new_key, sep=sep).items())
        elif isinstance(value, list):
            for i, item in enumerate(value):
                if isinstance(item, dict):
                    items.extend(flatten_json(item, f"{new_key}[{i}]", sep=sep).items())
                else:
                    items.append((f"{new_key}[{i}]", item))
        else:
            items.append((new_key, value))

    return dict(items)

# 示例
nested = {
    "user": {
        "name": "John",
        "address": {
            "city": "New York",
            "zip": "10001"
        },
        "tags": ["admin", "developer"]
    }
}

flat = flatten_json(nested)
# 结果: {
#     'user.name': 'John',
#     'user.address.city': 'New York',
#     'user.address.zip': '10001',
#     'user.tags[0]': 'admin',
#     'user.tags[1]': 'developer'
# }

JavaScript

function flattenJSON(obj, prefix = '', result = {}) {
  for (const [key, value] of Object.entries(obj)) {
    const newKey = prefix ? `${prefix}.${key}` : key;

    if (value && typeof value === 'object' && !Array.isArray(value)) {
      flattenJSON(value, newKey, result);
    } else if (Array.isArray(value)) {
      value.forEach((item, index) => {
        if (typeof item === 'object') {
          flattenJSON(item, `${newKey}[${index}]`, result);
        } else {
          result[`${newKey}[${index}]`] = item;
        }
      });
    } else {
      result[newKey] = value;
    }
  }

  return result;
}

取消扁平化JSON

def unflatten_json(flat_json, sep='.'):
    """
    取消扁平化已扁平化的JSON结构
    """
    result = {}

    for key, value in flat_json.items():
        parts = key.split(sep)
        current = result

        for i, part in enumerate(parts[:-1]):
            # 处理数组表示法
            if '[' in part:
                array_key, index = part.split('[')
                index = int(index.rstrip(']'))

                if array_key not in current:
                    current[array_key] = []

                # 如果需要,扩展数组
                while len(current[array_key]) <= index:
                    current[array_key].append({})

                current = current[array_key][index]
            else:
                if part not in current:
                    current[part] = {}
                current = current[part]

        # 设置最终值
        final_key = parts[-1]
        if '[' in final_key:
            array_key, index = final_key.split('[')
            index = int(index.rstrip(']'))

            if array_key not in current:
                current[array_key] = []

            while len(current[array_key]) <= index:
                current[array_key].append(None)

            current[array_key][index] = value
        else:
            current[final_key] = value

    return result

合并JSON

Python

def deep_merge(dict1, dict2):
    """
    深度合并两个字典
    """
    result = dict1.copy()

    for key, value in dict2.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = deep_merge(result[key], value)
        else:
            result[key] = value

    return result

# 示例
base = {
    "user": {"name": "John", "age": 30},
    "settings": {"theme": "dark"}
}

override = {
    "user": {"age": 31, "email": "john@example.com"},
    "settings": {"language": "en"}
}

merged = deep_merge(base, override)
# 结果: {
#     'user': {'name': 'John', 'age': 31, 'email': 'john@example.com'},
#     'settings': {'theme': 'dark', 'language': 'en'}
# }

jq

# 合并两个JSON文件
jq -s '.[0] * .[1]' file1.json file2.json

# 深度合并
jq -s 'reduce .[] as $item ({}; . * $item)' file1.json file2.json

转换键

def transform_keys(obj, transform_fn):
    """
    转换JSON结构中的所有键
    """
    if isinstance(obj, dict):
        return {transform_fn(k): transform_keys(v, transform_fn) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [transform_keys(item, transform_fn) for item in obj]
    else:
        return obj

# 转换为snake_case
import re

def to_snake_case(text):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', text)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

data = {
    "firstName": "John",
    "lastName": "Doe",
    "userInfo": {
        "emailAddress": "john@example.com"
    }
}

snake_case_data = transform_keys(data, to_snake_case)
# 结果: {
#     'first_name': 'John',
#     'last_name': 'Doe',
#     'user_info': {'email_address': 'john@example.com'}
# }

# 转换为camelCase
def to_camel_case(text):
    components = text.split('_')
    return components[0] + ''.join(x.title() for x in components[1:])

格式转换

JSON到CSV

Python

import json
import csv
import pandas as pd

# 使用pandas(推荐)
data = [
    {"name": "John", "age": 30, "email": "john@example.com"},
    {"name": "Jane", "age": 25, "email": "jane@example.com"}
]

df = pd.DataFrame(data)
df.to_csv('output.csv', index=False)

# 使用csv模块
with open('output.csv', 'w', newline='') as csvfile:
    if data:
        writer = csv.DictWriter(csvfile, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

# 处理嵌套JSON
def flatten_for_csv(data):
    """为CSV导出扁平化嵌套JSON"""
    if isinstance(data, list):
        return [flatten_json(item) for item in data]
    return flatten_json(data)

flattened = flatten_for_csv(data)
pd.DataFrame(flattened).to_csv('output.csv', index=False)

jq

# 将JSON数组转换为CSV
cat data.json | jq -r '.[] | [.name, .age, .email] | @csv'

# 带标题
cat data.json | jq -r '["name", "age", "email"], (.[] | [.name, .age, .email]) | @csv'

JSON到YAML

Python

import json
import yaml

# JSON到YAML
with open('data.json', 'r') as json_file:
    data = json.load(json_file)

with open('data.yaml', 'w') as yaml_file:
    yaml.dump(data, yaml_file, default_flow_style=False)

# YAML到JSON
with open('data.yaml', 'r') as yaml_file:
    data = yaml.safe_load(yaml_file)

with open('data.json', 'w') as json_file:
    json.dump(data, json_file, indent=2)

JSON到XML

Python

import json
import xml.etree.ElementTree as ET

def json_to_xml(json_obj, root_name='root'):
    """将JSON转换为XML"""

    def build_xml(parent, obj):
        if isinstance(obj, dict):
            for key, val in obj.items():
                elem = ET.SubElement(parent, key)
                build_xml(elem, val)
        elif isinstance(obj, list):
            for item in obj:
                elem = ET.SubElement(parent, 'item')
                build_xml(elem, item)
        else:
            parent.text = str(obj)

    root = ET.Element(root_name)
    build_xml(root, json_obj)

    return ET.tostring(root, encoding='unicode')

# 示例
data = {"user": {"name": "John", "age": 30}}
xml_string = json_to_xml(data)

高级转换

jq风格转换

Python(pyjq)

import pyjq

data = {
    "users": [
        {"name": "John", "age": 30, "city": "New York"},
        {"name": "Jane", "age": 25, "city": "San Francisco"},
        {"name": "Bob", "age": 35, "city": "New York"}
    ]
}

# 选择和转换
result = pyjq.all('.users[] | {name, age}', data)

# 过滤和分组
result = pyjq.all('group_by(.city) | map({city: .[0].city, count: length})', data)

# 复杂转换
result = pyjq.all('''
    .users
    | map(select(.age > 25))
    | sort_by(.age)
    | reverse
''', data)

jq示例

# 映射数组
echo '[1,2,3,4,5]' | jq 'map(. * 2)'

# 过滤和转换
cat users.json | jq '.users | map(select(.age > 25) | {name, email})'

# 按字段分组
cat data.json | jq 'group_by(.category) | map({category: .[0].category, count: length})'

# 计算总和
cat orders.json | jq '[.[] | .amount] | add'

# 创建新结构
cat users.json | jq '{
  total: length,
  users: [.[] | {name, email}],
  avgAge: ([.[] | .age] | add / length)
}'

# 条件逻辑
cat data.json | jq '.[] | if .status == "active" then .name else empty end'

复杂重构

def restructure_json(data):
    """
    示例:将平面用户记录转换为层次结构
    """
    # 输入: [
    #   {"userId": 1, "name": "John", "orderId": 101, "product": "A"},
    #   {"userId": 1, "name": "John", "orderId": 102, "product": "B"},
    #   {"userId": 2, "name": "Jane", "orderId": 103, "product": "C"}
    # ]

    # 输出: [
    #   {
    #     "userId": 1,
    #     "name": "John",
    #     "orders": [
    #       {"orderId": 101, "product": "A"},
    #       {"orderId": 102, "product": "B"}
    #     ]
    #   },
    #   {
    #     "userId": 2,
    #     "name": "Jane",
    #     "orders": [{"orderId": 103, "product": "C"}]
    #   }
    # ]

    from collections import defaultdict

    users = defaultdict(lambda: {"orders": []})

    for record in data:
        user_id = record["userId"]

        if "name" not in users[user_id]:
            users[user_id]["userId"] = user_id
            users[user_id]["name"] = record["name"]

        users[user_id]["orders"].append({
            "orderId": record["orderId"],
            "product": record["product"]
        })

    return list(users.values())

数组操作

import json

def unique_by_key(array, key):
    """基于键移除重复项"""
    seen = set()
    result = []

    for item in array:
        value = item.get(key)
        if value not in seen:
            seen.add(value)
            result.append(item)

    return result

def sort_by_key(array, key, reverse=False):
    """按键排序数组"""
    return sorted(array, key=lambda x: x.get(key, ''), reverse=reverse)

def group_by_key(array, key):
    """按键分组数组元素"""
    from collections import defaultdict

    groups = defaultdict(list)
    for item in array:
        groups[item.get(key)].append(item)

    return dict(groups)

# 示例用法
users = [
    {"name": "John", "age": 30, "city": "New York"},
    {"name": "Jane", "age": 25, "city": "San Francisco"},
    {"name": "Bob", "age": 35, "city": "New York"},
    {"name": "Alice", "age": 28, "city": "San Francisco"}
]

# 按年龄排序
sorted_users = sort_by_key(users, 'age')

# 按城市分组
by_city = group_by_key(users, 'city')

JSON差异和比较

import json
from deepdiff import DeepDiff

def json_diff(obj1, obj2):
    """比较两个JSON对象并返回差异"""
    diff = DeepDiff(obj1, obj2, ignore_order=True)
    return diff

# 示例
old = {
    "name": "John",
    "age": 30,
    "addresses": [{"city": "New York"}]
}

new = {
    "name": "John",
    "age": 31,
    "addresses": [{"city": "San Francisco"}]
}

diff = json_diff(old, new)
print(json.dumps(diff, indent=2))

# 手动差异
def simple_diff(obj1, obj2, path=""):
    """简单差异实现"""
    diffs = []

    if type(obj1) != type(obj2):
        diffs.append(f"{path}: 类型从 {type(obj1)} 更改为 {type(obj2)}")
        return diffs

    if isinstance(obj1, dict):
        all_keys = set(obj1.keys()) | set(obj2.keys())

        for key in all_keys:
            new_path = f"{path}.{key}" if path else key

            if key not in obj1:
                diffs.append(f"{new_path}: 已添加")
            elif key not in obj2:
                diffs.append(f"{new_path}: 已移除")
            elif obj1[key] != obj2[key]:
                diffs.extend(simple_diff(obj1[key], obj2[key], new_path))

    elif isinstance(obj1, list):
        if len(obj1) != len(obj2):
            diffs.append(f"{path}: 长度从 {len(obj1)} 更改为 {len(obj2)}")

        for i, (item1, item2) in enumerate(zip(obj1, obj2)):
            diffs.extend(simple_diff(item1, item2, f"{path}[{i}]"))

    elif obj1 != obj2:
        diffs.append(f"{path}: 从 {obj1} 更改为 {obj2}")

    return diffs

模式生成

def generate_schema(data, name="root"):
    """
    从数据生成JSON模式
    """
    if isinstance(data, dict):
        properties = {}
        required = []

        for key, value in data.items():
            properties[key] = generate_schema(value, key)
            if value is not None:
                required.append(key)

        schema = {
            "type": "object",
            "properties": properties
        }

        if required:
            schema["required"] = required

        return schema

    elif isinstance(data, list):
        if data:
            return {
                "type": "array",
                "items": generate_schema(data[0], name)
            }
        return {"type": "array"}

    elif isinstance(data, bool):
        return {"type": "boolean"}

    elif isinstance(data, int):
        return {"type": "integer"}

    elif isinstance(data, float):
        return {"type": "number"}

    elif isinstance(data, str):
        return {"type": "string"}

    elif data is None:
        return {"type": "null"}

    return {}

# 示例
sample_data = {
    "name": "John",
    "age": 30,
    "email": "john@example.com",
    "active": True,
    "tags": ["developer", "admin"],
    "address": {
        "city": "New York",
        "zip": "10001"
    }
}

schema = generate_schema(sample_data)
print(json.dumps(schema, indent=2))

实用函数

带颜色的美化打印

from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import TerminalFormatter

def pretty_print_json(data):
    """使用语法高亮打印JSON"""
    json_str = json.dumps(data, indent=2, sort_keys=True)
    print(highlight(json_str, JsonLexer(), TerminalFormatter()))

带默认值的安全访问

def safe_get(data, path, default=None):
    """
    安全地从JSON获取嵌套值
    path: "user.address.city" 或 ["user", "address", "city"]
    """
    if isinstance(path, str):
        path = path.split('.')

    current = data
    for key in path:
        if isinstance(current, dict):
            current = current.get(key)
        elif isinstance(current, list) and key.isdigit():
            index = int(key)
            current = current[index] if 0 <= index < len(current) else None
        else:
            return default

        if current is None:
            return default

    return current

# 示例
data = {"user": {"address": {"city": "New York"}}}
city = safe_get(data, "user.address.city")  # "New York"
country = safe_get(data, "user.address.country", "Unknown")  # "Unknown"

命令行工具

使用jq

# 格式化JSON
cat messy.json | jq '.'

# 提取特定字段
cat data.json | jq '.users[] | {name, email}'

# 过滤数组
cat data.json | jq '.[] | select(.age > 30)'

# 转换键为小写
cat data.json | jq 'with_entries(.key |= ascii_downcase)'

# 合并多个JSON文件
jq -s 'add' file1.json file2.json file3.json

# 转换为CSV
cat data.json | jq -r '.[] | [.name, .age, .email] | @csv'

使用Python(命令行)

# 美化打印
python -m json.tool input.json

# 紧凑输出
python -c "import json; print(json.dumps(json.load(open('data.json')), separators=(',',':')))"

# 提取字段
python -c "import json; data=json.load(open('data.json')); print(data['users'][0]['name'])"

最佳实践

  1. 始终验证JSON 在处理之前
  2. 使用模式验证 用于API合约
  3. 优雅地处理错误(格式错误的JSON)
  4. 使用适当的库(jq、jmespath、jsonpath)
  5. 在转换期间保留数据类型
  6. 记录复杂转换
  7. 使用版本控制 用于模式定义
  8. 测试转换 使用边缘案例
  9. 考虑内存使用 对于大文件
  10. 使用流解析器 对于非常大的JSON

常见模式

API响应转换

def transform_api_response(response):
    """将API响应转换为应用格式"""
    return {
        "users": [
            {
                "id": user["userId"],
                "name": f"{user['firstName']} {user['lastName']}",
                "email": user["emailAddress"],
                "active": user["status"] == "active"
            }
            for user in response.get("data", {}).get("users", [])
        ],
        "pagination": {
            "page": response.get("page", 1),
            "total": response.get("totalResults", 0)
        }
    }

配置合并

def merge_configs(base_config, user_config):
    """将用户配置与基础配置合并"""
    result = deep_merge(base_config, user_config)

    # 验证必填字段
    required = ["database", "api_key"]
    for field in required:
        if field not in result:
            raise ValueError(f"缺少必填字段: {field}")

    return result

备注

  • 始终处理边缘案例(空值、空数组、缺失键)
  • 使用适当的工具(jq用于CLI,pandas用于数据科学)
  • 考虑大型JSON文件的性能
  • 在生产环境中验证模式
  • 尽可能保持转换幂等
  • 记录预期的JSON结构
  • 使用TypeScript/JSON模式进行类型安全