名称: json-transformer 描述: 转换、操作和分析JSON数据结构,支持高级操作。
JSON转换器技能
转换、操作和分析JSON数据结构,支持高级操作。
指令
您是JSON转换专家。当调用时:
-
解析和验证JSON:
- 从文件、字符串或API解析JSON
- 验证JSON结构和模式
- 优雅地处理格式错误的JSON
- 美化打印和格式化JSON
- 检测和修复常见JSON问题
-
转换数据结构:
- 重塑嵌套对象和数组
- 扁平化和取消扁平化结构
- 提取特定路径(JSONPath、JMESPath)
- 合并和组合JSON文档
- 过滤和映射数据
-
高级操作:
- 在JSON和其他格式之间转换(CSV、YAML、XML)
- 应用转换(jq风格操作)
- 查询和搜索JSON数据
- 比较JSON文档差异
- 从模式生成JSON
-
数据操作:
- 添加、更新、删除属性
- 重命名键
- 转换数据类型
- 排序和去重
- 计算聚合值
使用示例
@json-transformer data.json
@json-transformer --flatten
@json-transformer --path "users[*].email"
@json-transformer --merge file1.json file2.json
@json-transformer --to-csv data.json
@json-transformer --validate schema.json
基本JSON操作
解析和写入
Python
import json
# 解析JSON字符串
data = json.loads('{"name": "John", "age": 30}')
# 从文件解析
with open('data.json', 'r') as f:
data = json.load(f)
# 写入JSON到文件
with open('output.json', 'w') as f:
json.dump(data, f, indent=2)
# 美化打印
print(json.dumps(data, indent=2, sort_keys=True))
# 紧凑输出
compact = json.dumps(data, separators=(',', ':'))
# 处理特殊类型
from datetime import datetime
import decimal
def json_encoder(obj):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, decimal.Decimal):
return float(obj)
raise TypeError(f"Type {type(obj)} not serializable")
json.dumps(data, default=json_encoder)
JavaScript
// 解析JSON字符串
const data = JSON.parse('{"name": "John", "age": 30}');
// 从文件解析(Node.js)
const fs = require('fs');
const data = JSON.parse(fs.readFileSync('data.json', 'utf8'));
// 写入JSON到文件
fs.writeFileSync('output.json', JSON.stringify(data, null, 2));
// 美化打印
console.log(JSON.stringify(data, null, 2));
// 自定义序列化
const json = JSON.stringify(data, (key, value) => {
if (value instanceof Date) {
return value.toISOString();
}
return value;
}, 2);
jq(命令行)
# 美化打印
cat data.json | jq '.'
# 紧凑输出
cat data.json | jq -c '.'
# 排序键
cat data.json | jq -S '.'
# 从文件读取,写入到文件
jq '.' input.json > output.json
验证
Python(jsonschema)
from jsonschema import validate, ValidationError
# 定义模式
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number", "minimum": 0},
"email": {"type": "string", "format": "email"}
},
"required": ["name", "email"]
}
# 验证数据
data = {"name": "John", "email": "john@example.com", "age": 30}
try:
validate(instance=data, schema=schema)
print("有效JSON")
except ValidationError as e:
print(f"无效: {e.message}")
# 针对JSON Schema草案验证
from jsonschema import Draft7Validator
validator = Draft7Validator(schema)
errors = list(validator.iter_errors(data))
for error in errors:
print(f"错误在 {'.'.join(str(p) for p in error.path)}: {error.message}")
JavaScript(ajv)
const Ajv = require('ajv');
const ajv = new Ajv();
const schema = {
type: 'object',
properties: {
name: { type: 'string' },
age: { type: 'number', minimum: 0 },
email: { type: 'string', format: 'email' }
},
required: ['name', 'email']
};
const validate = ajv.compile(schema);
const data = { name: 'John', email: 'john@example.com', age: 30 };
if (validate(data)) {
console.log('有效JSON');
} else {
console.log('无效:', validate.errors);
}
数据提取和查询
JSONPath查询
Python(jsonpath-ng)
from jsonpath_ng import jsonpath, parse
data = {
"users": [
{"name": "John", "age": 30, "email": "john@example.com"},
{"name": "Jane", "age": 25, "email": "jane@example.com"}
]
}
# 提取所有用户名
jsonpath_expr = parse('users[*].name')
names = [match.value for match in jsonpath_expr.find(data)]
# 结果: ['John', 'Jane']
# 提取年龄超过25岁的用户邮箱
jsonpath_expr = parse('users[?(@.age > 25)].email')
emails = [match.value for match in jsonpath_expr.find(data)]
# 嵌套提取
data = {
"company": {
"departments": [
{
"name": "Engineering",
"employees": [
{"name": "Alice", "salary": 100000},
{"name": "Bob", "salary": 90000}
]
}
]
}
}
jsonpath_expr = parse('company.departments[*].employees[*].name')
names = [match.value for match in jsonpath_expr.find(data)]
jq
# 提取字段
echo '{"name": "John", "age": 30}' | jq '.name'
# 从数组提取
echo '[{"name": "John"}, {"name": "Jane"}]' | jq '.[].name'
# 过滤数组
echo '[{"name": "John", "age": 30}, {"name": "Jane", "age": 25}]' | \
jq '.[] | select(.age > 25)'
# 提取嵌套字段
cat data.json | jq '.users[].email'
# 多字段
cat data.json | jq '.users[] | {name: .name, email: .email}'
# 条件提取
cat data.json | jq '.users[] | select(.age > 25) | .email'
JMESPath查询
Python(jmespath)
import jmespath
data = {
"users": [
{"name": "John", "age": 30, "tags": ["admin", "developer"]},
{"name": "Jane", "age": 25, "tags": ["developer"]},
{"name": "Bob", "age": 35, "tags": ["manager"]}
]
}
# 简单提取
names = jmespath.search('users[*].name', data)
# 结果: ['John', 'Jane', 'Bob']
# 过滤
admins = jmespath.search('users[?contains(tags, `admin`)]', data)
# 多条件
senior_devs = jmespath.search(
'users[?age > `28` && contains(tags, `developer`)]',
data
)
# 投影
result = jmespath.search('users[*].{name: name, age: age}', data)
# 嵌套查询
data = {
"departments": [
{
"name": "Engineering",
"employees": [
{"name": "Alice", "skills": ["Python", "Go"]},
{"name": "Bob", "skills": ["JavaScript", "Python"]}
]
}
]
}
python_devs = jmespath.search(
'departments[*].employees[?contains(skills, `Python`)].name',
data
)
数据转换
扁平化嵌套JSON
Python
def flatten_json(nested_json, parent_key='', sep='.'):
"""
扁平化嵌套JSON结构
"""
items = []
for key, value in nested_json.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
items.extend(flatten_json(value, new_key, sep=sep).items())
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
items.extend(flatten_json(item, f"{new_key}[{i}]", sep=sep).items())
else:
items.append((f"{new_key}[{i}]", item))
else:
items.append((new_key, value))
return dict(items)
# 示例
nested = {
"user": {
"name": "John",
"address": {
"city": "New York",
"zip": "10001"
},
"tags": ["admin", "developer"]
}
}
flat = flatten_json(nested)
# 结果: {
# 'user.name': 'John',
# 'user.address.city': 'New York',
# 'user.address.zip': '10001',
# 'user.tags[0]': 'admin',
# 'user.tags[1]': 'developer'
# }
JavaScript
function flattenJSON(obj, prefix = '', result = {}) {
for (const [key, value] of Object.entries(obj)) {
const newKey = prefix ? `${prefix}.${key}` : key;
if (value && typeof value === 'object' && !Array.isArray(value)) {
flattenJSON(value, newKey, result);
} else if (Array.isArray(value)) {
value.forEach((item, index) => {
if (typeof item === 'object') {
flattenJSON(item, `${newKey}[${index}]`, result);
} else {
result[`${newKey}[${index}]`] = item;
}
});
} else {
result[newKey] = value;
}
}
return result;
}
取消扁平化JSON
def unflatten_json(flat_json, sep='.'):
"""
取消扁平化已扁平化的JSON结构
"""
result = {}
for key, value in flat_json.items():
parts = key.split(sep)
current = result
for i, part in enumerate(parts[:-1]):
# 处理数组表示法
if '[' in part:
array_key, index = part.split('[')
index = int(index.rstrip(']'))
if array_key not in current:
current[array_key] = []
# 如果需要,扩展数组
while len(current[array_key]) <= index:
current[array_key].append({})
current = current[array_key][index]
else:
if part not in current:
current[part] = {}
current = current[part]
# 设置最终值
final_key = parts[-1]
if '[' in final_key:
array_key, index = final_key.split('[')
index = int(index.rstrip(']'))
if array_key not in current:
current[array_key] = []
while len(current[array_key]) <= index:
current[array_key].append(None)
current[array_key][index] = value
else:
current[final_key] = value
return result
合并JSON
Python
def deep_merge(dict1, dict2):
"""
深度合并两个字典
"""
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
# 示例
base = {
"user": {"name": "John", "age": 30},
"settings": {"theme": "dark"}
}
override = {
"user": {"age": 31, "email": "john@example.com"},
"settings": {"language": "en"}
}
merged = deep_merge(base, override)
# 结果: {
# 'user': {'name': 'John', 'age': 31, 'email': 'john@example.com'},
# 'settings': {'theme': 'dark', 'language': 'en'}
# }
jq
# 合并两个JSON文件
jq -s '.[0] * .[1]' file1.json file2.json
# 深度合并
jq -s 'reduce .[] as $item ({}; . * $item)' file1.json file2.json
转换键
def transform_keys(obj, transform_fn):
"""
转换JSON结构中的所有键
"""
if isinstance(obj, dict):
return {transform_fn(k): transform_keys(v, transform_fn) for k, v in obj.items()}
elif isinstance(obj, list):
return [transform_keys(item, transform_fn) for item in obj]
else:
return obj
# 转换为snake_case
import re
def to_snake_case(text):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', text)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
data = {
"firstName": "John",
"lastName": "Doe",
"userInfo": {
"emailAddress": "john@example.com"
}
}
snake_case_data = transform_keys(data, to_snake_case)
# 结果: {
# 'first_name': 'John',
# 'last_name': 'Doe',
# 'user_info': {'email_address': 'john@example.com'}
# }
# 转换为camelCase
def to_camel_case(text):
components = text.split('_')
return components[0] + ''.join(x.title() for x in components[1:])
格式转换
JSON到CSV
Python
import json
import csv
import pandas as pd
# 使用pandas(推荐)
data = [
{"name": "John", "age": 30, "email": "john@example.com"},
{"name": "Jane", "age": 25, "email": "jane@example.com"}
]
df = pd.DataFrame(data)
df.to_csv('output.csv', index=False)
# 使用csv模块
with open('output.csv', 'w', newline='') as csvfile:
if data:
writer = csv.DictWriter(csvfile, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# 处理嵌套JSON
def flatten_for_csv(data):
"""为CSV导出扁平化嵌套JSON"""
if isinstance(data, list):
return [flatten_json(item) for item in data]
return flatten_json(data)
flattened = flatten_for_csv(data)
pd.DataFrame(flattened).to_csv('output.csv', index=False)
jq
# 将JSON数组转换为CSV
cat data.json | jq -r '.[] | [.name, .age, .email] | @csv'
# 带标题
cat data.json | jq -r '["name", "age", "email"], (.[] | [.name, .age, .email]) | @csv'
JSON到YAML
Python
import json
import yaml
# JSON到YAML
with open('data.json', 'r') as json_file:
data = json.load(json_file)
with open('data.yaml', 'w') as yaml_file:
yaml.dump(data, yaml_file, default_flow_style=False)
# YAML到JSON
with open('data.yaml', 'r') as yaml_file:
data = yaml.safe_load(yaml_file)
with open('data.json', 'w') as json_file:
json.dump(data, json_file, indent=2)
JSON到XML
Python
import json
import xml.etree.ElementTree as ET
def json_to_xml(json_obj, root_name='root'):
"""将JSON转换为XML"""
def build_xml(parent, obj):
if isinstance(obj, dict):
for key, val in obj.items():
elem = ET.SubElement(parent, key)
build_xml(elem, val)
elif isinstance(obj, list):
for item in obj:
elem = ET.SubElement(parent, 'item')
build_xml(elem, item)
else:
parent.text = str(obj)
root = ET.Element(root_name)
build_xml(root, json_obj)
return ET.tostring(root, encoding='unicode')
# 示例
data = {"user": {"name": "John", "age": 30}}
xml_string = json_to_xml(data)
高级转换
jq风格转换
Python(pyjq)
import pyjq
data = {
"users": [
{"name": "John", "age": 30, "city": "New York"},
{"name": "Jane", "age": 25, "city": "San Francisco"},
{"name": "Bob", "age": 35, "city": "New York"}
]
}
# 选择和转换
result = pyjq.all('.users[] | {name, age}', data)
# 过滤和分组
result = pyjq.all('group_by(.city) | map({city: .[0].city, count: length})', data)
# 复杂转换
result = pyjq.all('''
.users
| map(select(.age > 25))
| sort_by(.age)
| reverse
''', data)
jq示例
# 映射数组
echo '[1,2,3,4,5]' | jq 'map(. * 2)'
# 过滤和转换
cat users.json | jq '.users | map(select(.age > 25) | {name, email})'
# 按字段分组
cat data.json | jq 'group_by(.category) | map({category: .[0].category, count: length})'
# 计算总和
cat orders.json | jq '[.[] | .amount] | add'
# 创建新结构
cat users.json | jq '{
total: length,
users: [.[] | {name, email}],
avgAge: ([.[] | .age] | add / length)
}'
# 条件逻辑
cat data.json | jq '.[] | if .status == "active" then .name else empty end'
复杂重构
def restructure_json(data):
"""
示例:将平面用户记录转换为层次结构
"""
# 输入: [
# {"userId": 1, "name": "John", "orderId": 101, "product": "A"},
# {"userId": 1, "name": "John", "orderId": 102, "product": "B"},
# {"userId": 2, "name": "Jane", "orderId": 103, "product": "C"}
# ]
# 输出: [
# {
# "userId": 1,
# "name": "John",
# "orders": [
# {"orderId": 101, "product": "A"},
# {"orderId": 102, "product": "B"}
# ]
# },
# {
# "userId": 2,
# "name": "Jane",
# "orders": [{"orderId": 103, "product": "C"}]
# }
# ]
from collections import defaultdict
users = defaultdict(lambda: {"orders": []})
for record in data:
user_id = record["userId"]
if "name" not in users[user_id]:
users[user_id]["userId"] = user_id
users[user_id]["name"] = record["name"]
users[user_id]["orders"].append({
"orderId": record["orderId"],
"product": record["product"]
})
return list(users.values())
数组操作
import json
def unique_by_key(array, key):
"""基于键移除重复项"""
seen = set()
result = []
for item in array:
value = item.get(key)
if value not in seen:
seen.add(value)
result.append(item)
return result
def sort_by_key(array, key, reverse=False):
"""按键排序数组"""
return sorted(array, key=lambda x: x.get(key, ''), reverse=reverse)
def group_by_key(array, key):
"""按键分组数组元素"""
from collections import defaultdict
groups = defaultdict(list)
for item in array:
groups[item.get(key)].append(item)
return dict(groups)
# 示例用法
users = [
{"name": "John", "age": 30, "city": "New York"},
{"name": "Jane", "age": 25, "city": "San Francisco"},
{"name": "Bob", "age": 35, "city": "New York"},
{"name": "Alice", "age": 28, "city": "San Francisco"}
]
# 按年龄排序
sorted_users = sort_by_key(users, 'age')
# 按城市分组
by_city = group_by_key(users, 'city')
JSON差异和比较
import json
from deepdiff import DeepDiff
def json_diff(obj1, obj2):
"""比较两个JSON对象并返回差异"""
diff = DeepDiff(obj1, obj2, ignore_order=True)
return diff
# 示例
old = {
"name": "John",
"age": 30,
"addresses": [{"city": "New York"}]
}
new = {
"name": "John",
"age": 31,
"addresses": [{"city": "San Francisco"}]
}
diff = json_diff(old, new)
print(json.dumps(diff, indent=2))
# 手动差异
def simple_diff(obj1, obj2, path=""):
"""简单差异实现"""
diffs = []
if type(obj1) != type(obj2):
diffs.append(f"{path}: 类型从 {type(obj1)} 更改为 {type(obj2)}")
return diffs
if isinstance(obj1, dict):
all_keys = set(obj1.keys()) | set(obj2.keys())
for key in all_keys:
new_path = f"{path}.{key}" if path else key
if key not in obj1:
diffs.append(f"{new_path}: 已添加")
elif key not in obj2:
diffs.append(f"{new_path}: 已移除")
elif obj1[key] != obj2[key]:
diffs.extend(simple_diff(obj1[key], obj2[key], new_path))
elif isinstance(obj1, list):
if len(obj1) != len(obj2):
diffs.append(f"{path}: 长度从 {len(obj1)} 更改为 {len(obj2)}")
for i, (item1, item2) in enumerate(zip(obj1, obj2)):
diffs.extend(simple_diff(item1, item2, f"{path}[{i}]"))
elif obj1 != obj2:
diffs.append(f"{path}: 从 {obj1} 更改为 {obj2}")
return diffs
模式生成
def generate_schema(data, name="root"):
"""
从数据生成JSON模式
"""
if isinstance(data, dict):
properties = {}
required = []
for key, value in data.items():
properties[key] = generate_schema(value, key)
if value is not None:
required.append(key)
schema = {
"type": "object",
"properties": properties
}
if required:
schema["required"] = required
return schema
elif isinstance(data, list):
if data:
return {
"type": "array",
"items": generate_schema(data[0], name)
}
return {"type": "array"}
elif isinstance(data, bool):
return {"type": "boolean"}
elif isinstance(data, int):
return {"type": "integer"}
elif isinstance(data, float):
return {"type": "number"}
elif isinstance(data, str):
return {"type": "string"}
elif data is None:
return {"type": "null"}
return {}
# 示例
sample_data = {
"name": "John",
"age": 30,
"email": "john@example.com",
"active": True,
"tags": ["developer", "admin"],
"address": {
"city": "New York",
"zip": "10001"
}
}
schema = generate_schema(sample_data)
print(json.dumps(schema, indent=2))
实用函数
带颜色的美化打印
from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import TerminalFormatter
def pretty_print_json(data):
"""使用语法高亮打印JSON"""
json_str = json.dumps(data, indent=2, sort_keys=True)
print(highlight(json_str, JsonLexer(), TerminalFormatter()))
带默认值的安全访问
def safe_get(data, path, default=None):
"""
安全地从JSON获取嵌套值
path: "user.address.city" 或 ["user", "address", "city"]
"""
if isinstance(path, str):
path = path.split('.')
current = data
for key in path:
if isinstance(current, dict):
current = current.get(key)
elif isinstance(current, list) and key.isdigit():
index = int(key)
current = current[index] if 0 <= index < len(current) else None
else:
return default
if current is None:
return default
return current
# 示例
data = {"user": {"address": {"city": "New York"}}}
city = safe_get(data, "user.address.city") # "New York"
country = safe_get(data, "user.address.country", "Unknown") # "Unknown"
命令行工具
使用jq
# 格式化JSON
cat messy.json | jq '.'
# 提取特定字段
cat data.json | jq '.users[] | {name, email}'
# 过滤数组
cat data.json | jq '.[] | select(.age > 30)'
# 转换键为小写
cat data.json | jq 'with_entries(.key |= ascii_downcase)'
# 合并多个JSON文件
jq -s 'add' file1.json file2.json file3.json
# 转换为CSV
cat data.json | jq -r '.[] | [.name, .age, .email] | @csv'
使用Python(命令行)
# 美化打印
python -m json.tool input.json
# 紧凑输出
python -c "import json; print(json.dumps(json.load(open('data.json')), separators=(',',':')))"
# 提取字段
python -c "import json; data=json.load(open('data.json')); print(data['users'][0]['name'])"
最佳实践
- 始终验证JSON 在处理之前
- 使用模式验证 用于API合约
- 优雅地处理错误(格式错误的JSON)
- 使用适当的库(jq、jmespath、jsonpath)
- 在转换期间保留数据类型
- 记录复杂转换
- 使用版本控制 用于模式定义
- 测试转换 使用边缘案例
- 考虑内存使用 对于大文件
- 使用流解析器 对于非常大的JSON
常见模式
API响应转换
def transform_api_response(response):
"""将API响应转换为应用格式"""
return {
"users": [
{
"id": user["userId"],
"name": f"{user['firstName']} {user['lastName']}",
"email": user["emailAddress"],
"active": user["status"] == "active"
}
for user in response.get("data", {}).get("users", [])
],
"pagination": {
"page": response.get("page", 1),
"total": response.get("totalResults", 0)
}
}
配置合并
def merge_configs(base_config, user_config):
"""将用户配置与基础配置合并"""
result = deep_merge(base_config, user_config)
# 验证必填字段
required = ["database", "api_key"]
for field in required:
if field not in result:
raise ValueError(f"缺少必填字段: {field}")
return result
备注
- 始终处理边缘案例(空值、空数组、缺失键)
- 使用适当的工具(jq用于CLI,pandas用于数据科学)
- 考虑大型JSON文件的性能
- 在生产环境中验证模式
- 尽可能保持转换幂等
- 记录预期的JSON结构
- 使用TypeScript/JSON模式进行类型安全