ZarrPython库技能Skill zarr-python

Zarr Python是一个用于存储和处理大规模N维数组的Python库,支持分块、压缩、并行I/O和云存储集成,与NumPy、Dask、Xarray兼容,适用于科学计算、数据工程和机器学习流水线。关键词:Zarr Python, 数据存储, 分块数组, 压缩, 云存储, 科学计算, 并行处理, Dask, Xarray

数据工程 0 次安装 0 次浏览 更新于 3/16/2026

名称: zarr-python 描述: “用于云存储的分块N维数组。压缩数组、并行I/O、S3/GCS集成、与NumPy/Dask/Xarray兼容,适用于大规模科学计算流水线。”

Zarr Python

概述

Zarr 是一个Python库,用于存储大型N维数组,支持分块和压缩。应用此技能可实现高效并行I/O、云原生工作流,以及与NumPy、Dask和Xarray的无缝集成。

快速开始

安装

# 使用 pip
pip install zarr

# 使用 conda
conda install --channel conda-forge zarr

需要Python 3.11+。对于云存储支持,安装额外包:

pip install s3fs  # 用于S3
pip install gcsfs  # 用于Google Cloud Storage

基本数组创建

import zarr
import numpy as np

# 创建具有分块和压缩的2D数组
z = zarr.create_array(
    store="data/my_array.zarr",
    shape=(10000, 10000),
    chunks=(1000, 1000),
    dtype="f4"
)

# 使用NumPy风格索引写入数据
z[:, :] = np.random.random((10000, 10000))

# 读取数据
data = z[0:100, 0:100]  # 返回NumPy数组

核心操作

创建数组

Zarr 提供多种便利函数用于数组创建:

# 创建空数组
z = zarr.zeros(shape=(10000, 10000), chunks=(1000, 1000), dtype='f4',
               store='data.zarr')

# 创建填充数组
z = zarr.ones((5000, 5000), chunks=(500, 500))
z = zarr.full((1000, 1000), fill_value=42, chunks=(100, 100))

# 从现有数据创建
data = np.arange(10000).reshape(100, 100)
z = zarr.array(data, chunks=(10, 10), store='data.zarr')

# 创建类似数组
z2 = zarr.zeros_like(z)  # 匹配z的形状、分块和数据类型

打开现有数组

# 打开数组(默认可读写模式)
z = zarr.open_array('data.zarr', mode='r+')

# 只读模式
z = zarr.open_array('data.zarr', mode='r')

# open() 函数自动检测数组或组
z = zarr.open('data.zarr')  # 返回数组或组

读写数据

Zarr 数组支持类似NumPy的索引:

# 写入整个数组
z[:] = 42

# 写入切片
z[0, :] = np.arange(100)
z[10:20, 50:60] = np.random.random((10, 10))

# 读取数据(返回NumPy数组)
data = z[0:100, 0:100]
row = z[5, :]

# 高级索引
z.vindex[[0, 5, 10], [2, 8, 15]]  # 坐标索引
z.oindex[0:10, [5, 10, 15]]       # 正交索引
z.blocks[0, 0]                     # 块/分块索引

调整大小和追加

# 调整数组大小
z.resize(15000, 15000)  # 扩展或缩小维度

# 沿轴追加数据
z.append(np.random.random((1000, 10000)), axis=0)  # 添加行

分块策略

分块对性能至关重要。根据访问模式选择分块大小和形状。

分块大小指南

  • 最小分块大小:推荐1 MB以获得最佳性能
  • 平衡:较大分块 = 较少元数据操作;较小分块 = 更好的并行访问
  • 内存考虑:整个分块在压缩时必须能装入内存
# 配置分块大小(目标每块约1MB)
# 对于float32数据:1MB = 262,144个元素 = 512×512数组
z = zarr.zeros(
    shape=(10000, 10000),
    chunks=(512, 512),  # 约1MB分块
    dtype='f4'
)

分块形状与访问模式对齐

关键:分块形状根据数据访问方式显著影响性能。

# 如果频繁访问行(第一维度)
z = zarr.zeros((10000, 10000), chunks=(10, 10000))  # 分块跨列

# 如果频繁访问列(第二维度)
z = zarr.zeros((10000, 10000), chunks=(10000, 10))  # 分块跨行

# 对于混合访问模式(平衡方法)
z = zarr.zeros((10000, 10000), chunks=(1000, 1000))  # 方形分块

性能示例:对于(200, 200, 200)数组,沿第一维度读取:

  • 使用分块(1, 200, 200):约107ms
  • 使用分块(200, 200, 1):约1.65ms(快65倍!)

分片大规模存储

当数组有数百万个小分块时,使用分片将分块分组到更大的存储对象:

from zarr.codecs import ShardingCodec, BytesCodec
from zarr.codecs.blosc import BloscCodec

# 创建带分片的数组
z = zarr.create_array(
    store='data.zarr',
    shape=(100000, 100000),
    chunks=(100, 100),  # 小分块以便访问
    shards=(1000, 1000),  # 每组100个分块
    dtype='f4'
)

好处

  • 减少文件系统开销,避免数百万个小文件
  • 提高云存储性能(减少对象请求)
  • 防止文件系统块大小浪费

重要:写入前整个分片必须能装入内存。

压缩

Zarr 对每个分块应用压缩以减少存储空间,同时保持快速访问。

配置压缩

from zarr.codecs.blosc import BloscCodec
from zarr.codecs import GzipCodec, ZstdCodec

# 默认:Blosc 带Zstandard
z = zarr.zeros((1000, 1000), chunks=(100, 100))  # 使用默认压缩

# 配置Blosc编解码器
z = zarr.create_array(
    store='data.zarr',
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype='f4',
    codecs=[BloscCodec(cname='zstd', clevel=5, shuffle='shuffle')]
)

# 可用Blosc压缩器:'blosclz', 'lz4', 'lz4hc', 'snappy', 'zlib', 'zstd'

# 使用Gzip压缩
z = zarr.create_array(
    store='data.zarr',
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype='f4',
    codecs=[GzipCodec(level=6)]
)

# 禁用压缩
z = zarr.create_array(
    store='data.zarr',
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype='f4',
    codecs=[BytesCodec()]  # 无压缩
)

压缩性能提示

  • Blosc(默认):快速压缩/解压缩,适合交互式工作负载
  • Zstandard:更好的压缩比,比LZ4稍慢
  • Gzip:最大压缩,性能较慢
  • LZ4:最快的压缩,压缩比较低
  • Shuffle:启用洗牌过滤器以提高数值数据压缩
# 对数值科学数据最优
codecs=[BloscCodec(cname='zstd', clevel=5, shuffle='shuffle')]

# 对速度最优
codecs=[BloscCodec(cname='lz4', clevel=1)]

# 对压缩比最优
codecs=[GzipCodec(level=9)]

存储后端

Zarr 通过灵活存储接口支持多种存储后端。

本地文件系统(默认)

from zarr.storage import LocalStore

# 显式存储创建
store = LocalStore('data/my_array.zarr')
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))

# 或使用字符串路径(自动创建LocalStore)
z = zarr.open_array('data/my_array.zarr', mode='w', shape=(1000, 1000),
                    chunks=(100, 100))

内存存储

from zarr.storage import MemoryStore

# 创建内存存储
store = MemoryStore()
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))

# 数据仅存在内存中,不持久化

ZIP文件存储

from zarr.storage import ZipStore

# 写入ZIP文件
store = ZipStore('data.zip', mode='w')
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))
z[:] = np.random.random((1000, 1000))
store.close()  # 重要:必须关闭ZipStore

# 从ZIP文件读取
store = ZipStore('data.zip', mode='r')
z = zarr.open_array(store=store)
data = z[:]
store.close()

云存储(S3、GCS)

import s3fs
import zarr

# S3存储
s3 = s3fs.S3FileSystem(anon=False)  # 使用凭证
store = s3fs.S3Map(root='my-bucket/path/to/array.zarr', s3=s3)
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))
z[:] = data

# Google Cloud Storage
import gcsfs
gcs = gcsfs.GCSFileSystem(project='my-project')
store = gcsfs.GCSMap(root='my-bucket/path/to/array.zarr', gcs=gcs)
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))

云存储最佳实践

  • 使用整合元数据减少延迟:zarr.consolidate_metadata(store)
  • 对齐分块大小与云对象大小(通常5-100 MB最优)
  • 使用Dask启用并行写入大规模数据
  • 考虑分片以减少对象数量

组和层次结构

组以层次方式组织多个数组,类似目录或HDF5组。

创建和使用组

# 创建根组
root = zarr.group(store='data/hierarchy.zarr')

# 创建子组
temperature = root.create_group('temperature')
precipitation = root.create_group('precipitation')

# 在组内创建数组
temp_array = temperature.create_array(
    name='t2m',
    shape=(365, 720, 1440),
    chunks=(1, 720, 1440),
    dtype='f4'
)

precip_array = precipitation.create_array(
    name='prcp',
    shape=(365, 720, 1440),
    chunks=(1, 720, 1440),
    dtype='f4'
)

# 使用路径访问
array = root['temperature/t2m']

# 可视化层次结构
print(root.tree())
# 输出:
# /
#  ├── temperature
#  │   └── t2m (365, 720, 1440) f4
#  └── precipitation
#      └── prcp (365, 720, 1440) f4

H5py兼容API

Zarr 为熟悉的HDF5用户提供h5py兼容接口:

# 使用h5py风格方法创建组
root = zarr.group('data.zarr')
dataset = root.create_dataset('my_data', shape=(1000, 1000), chunks=(100, 100),
                              dtype='f4')

# 类似h5py访问
grp = root.require_group('subgroup')
arr = grp.require_dataset('array', shape=(500, 500), chunks=(50, 50), dtype='i4')

属性和元数据

使用属性为数组和组附加自定义元数据:

# 向数组添加属性
z = zarr.zeros((1000, 1000), chunks=(100, 100))
z.attrs['description'] = '温度数据,单位为开尔文'
z.attrs['units'] = 'K'
z.attrs['created'] = '2024-01-15'
z.attrs['processing_version'] = 2.1

# 属性存储为JSON
print(z.attrs['units'])  # 输出:K

# 向组添加属性
root = zarr.group('data.zarr')
root.attrs['project'] = '气候分析'
root.attrs['institution'] = '研究所'

# 属性随数组/组持久化
z2 = zarr.open('data.zarr')
print(z2.attrs['description'])

重要:属性必须是JSON可序列化(字符串、数字、列表、字典、布尔值、空值)。

与NumPy、Dask和Xarray集成

NumPy集成

Zarr 数组实现NumPy数组接口:

import numpy as np
import zarr

z = zarr.zeros((1000, 1000), chunks=(100, 100))

# 直接使用NumPy函数
result = np.sum(z, axis=0)  # NumPy操作Zarr数组
mean = np.mean(z[:100, :100])

# 转换为NumPy数组
numpy_array = z[:]  # 加载整个数组到内存

Dask集成

Dask 提供对Zarr数组的惰性并行计算:

import dask.array as da
import zarr

# 创建大型Zarr数组
z = zarr.open('data.zarr', mode='w', shape=(100000, 100000),
              chunks=(1000, 1000), dtype='f4')

# 加载为Dask数组(惰性,不加载数据)
dask_array = da.from_zarr('data.zarr')

# 执行计算(并行,外存)
result = dask_array.mean(axis=0).compute()  # 并行计算

# 将Dask数组写入Zarr
large_array = da.random.random((100000, 100000), chunks=(1000, 1000))
da.to_zarr(large_array, 'output.zarr')

好处

  • 处理大于内存的数据集
  • 跨分块自动并行计算
  • 与分块存储的高效I/O

Xarray集成

Xarray 提供带Zarr后端的标签化多维数组:

import xarray as xr
import zarr

# 将Zarr存储作为Xarray Dataset打开(惰性加载)
ds = xr.open_zarr('data.zarr')

# Dataset包括坐标和元数据
print(ds)

# 访问变量
temperature = ds['temperature']

# 执行标签化操作
subset = ds.sel(time='2024-01', lat=slice(30, 60))

# 将Xarray Dataset写入Zarr
ds.to_zarr('output.zarr')

# 从头创建带坐标
import pandas as pd
ds = xr.Dataset(
    {
        'temperature': (['time', 'lat', 'lon'], data),
        'precipitation': (['time', 'lat', 'lon'], data2)
    },
    coords={
        'time': pd.date_range('2024-01-01', periods=365),
        'lat': np.arange(-90, 91, 1),
        'lon': np.arange(-180, 180, 1)
    }
)
ds.to_zarr('climate_data.zarr')

好处

  • 命名维度和坐标
  • 基于标签的索引和选择
  • 与pandas集成处理时间序列
  • NetCDF-like接口,气候/地理空间科学家熟悉

并行计算和同步

线程安全操作

from zarr import ThreadSynchronizer
import zarr

# 用于多线程写入
synchronizer = ThreadSynchronizer()
z = zarr.open_array('data.zarr', mode='r+', shape=(10000, 10000),
                    chunks=(1000, 1000), synchronizer=synchronizer)

# 安全用于多线程并发写入(当写入不跨越分块边界时)

进程安全操作

from zarr import ProcessSynchronizer
import zarr

# 用于多进程写入
synchronizer = ProcessSynchronizer('sync_data.sync')
z = zarr.open_array('data.zarr', mode='r+', shape=(10000, 10000),
                    chunks=(1000, 1000), synchronizer=synchronizer)

# 安全用于多进程并发写入

注意

  • 并发读取无需同步
  • 同步仅用于可能跨越分块边界的写入
  • 每个进程/线程写入独立分块时无需同步

整合元数据

对于具有许多数组的层次存储,将元数据整合到单个文件以减少I/O操作:

import zarr

# 创建数组/组后
root = zarr.group('data.zarr')
# ... 创建多个数组/组 ...

# 整合元数据
zarr.consolidate_metadata('data.zarr')

# 使用整合元数据打开(更快,尤其在云存储上)
root = zarr.open_consolidated('data.zarr')

好处

  • 将元数据读取操作从N(每数组一次)减少到1
  • 对云存储至关重要(减少延迟)
  • 加速tree()操作和组遍历

注意

  • 如果数组更新未重新整合,元数据可能过时
  • 不适合频繁更新的数据集
  • 多写入器场景可能有读取不一致

性能优化

最优性能检查清单

  1. 分块大小:目标每块1-10 MB

    # 对于float32:1MB = 262,144个元素
    chunks = (512, 512)  # 512×512×4字节 = 约1MB
    
  2. 分块形状:与访问模式对齐

    # 行访问 → 分块跨列:(小, 大)
    # 列访问 → 分块跨行:(大, 小)
    # 随机访问 → 平衡:(中, 中)
    
  3. 压缩:基于工作负载选择

    # 交互/快速:BloscCodec(cname='lz4')
    # 平衡:BloscCodec(cname='zstd', clevel=5)
    # 最大压缩:GzipCodec(level=9)
    
  4. 存储后端:匹配环境

    # 本地:LocalStore(默认)
    # 云:S3Map/GCSMap带整合元数据
    # 临时:MemoryStore
    
  5. 分片:用于大规模数据集

    # 当有数百万个小分块时
    shards=(10*chunk_size, 10*chunk_size)
    
  6. 并行I/O:对大型操作使用Dask

    import dask.array as da
    dask_array = da.from_zarr('data.zarr')
    result = dask_array.compute(scheduler='threads', num_workers=8)
    

性能分析和调试

# 打印详细数组信息
print(z.info)

# 输出包括:
# - 类型、形状、分块、数据类型
# - 压缩编解码器和级别
# - 存储大小(压缩与未压缩)
# - 存储位置

# 检查存储大小
print(f"压缩大小: {z.nbytes_stored / 1e6:.2f} MB")
print(f"未压缩大小: {z.nbytes / 1e6:.2f} MB")
print(f"压缩比: {z.nbytes / z.nbytes_stored:.2f}x")

常见模式和最佳实践

模式:时间序列数据

# 存储时间序列,时间作为第一维度
# 这允许高效追加新时间步
z = zarr.open('timeseries.zarr', mode='a',
              shape=(0, 720, 1440),  # 以0时间步开始
              chunks=(1, 720, 1440),  # 每块一个时间步
              dtype='f4')

# 追加新时间步
new_data = np.random.random((1, 720, 1440))
z.append(new_data, axis=0)

模式:大型矩阵操作

import dask.array as da

# 在Zarr中创建大型矩阵
z = zarr.open('matrix.zarr', mode='w',
              shape=(100000, 100000),
              chunks=(1000, 1000),
              dtype='f8')

# 使用Dask并行计算
dask_z = da.from_zarr('matrix.zarr')
result = (dask_z @ dask_z.T).compute()  # 并行矩阵乘法

模式:云原生工作流

import s3fs
import zarr

# 写入S3
s3 = s3fs.S3FileSystem()
store = s3fs.S3Map(root='s3://my-bucket/data.zarr', s3=s3)

# 创建适合云的分块数组
z = zarr.open_array(store=store, mode='w',
                    shape=(10000, 10000),
                    chunks=(500, 500),  # 约1MB分块
                    dtype='f4')
z[:] = data

# 整合元数据以加速读取
zarr.consolidate_metadata(store)

# 从S3读取(随时随地)
store_read = s3fs.S3Map(root='s3://my-bucket/data.zarr', s3=s3)
z_read = zarr.open_consolidated(store_read)
subset = z_read[0:100, 0:100]

模式:格式转换

# HDF5 到 Zarr
import h5py
import zarr

with h5py.File('data.h5', 'r') as h5:
    dataset = h5['dataset_name']
    z = zarr.array(dataset[:],
                   chunks=(1000, 1000),
                   store='data.zarr')

# NumPy 到 Zarr
import numpy as np
data = np.load('data.npy')
z = zarr.array(data, chunks='auto', store='data.zarr')

# Zarr 到 NetCDF(通过Xarray)
import xarray as xr
ds = xr.open_zarr('data.zarr')
ds.to_netcdf('data.nc')

常见问题与解决方案

问题:性能缓慢

诊断:检查分块大小和对齐

print(z.chunks)  # 分块大小是否合适?
print(z.info)    # 检查压缩比

解决方案

  • 增加分块大小到1-10 MB
  • 对齐分块与访问模式
  • 尝试不同压缩编解码器
  • 使用Dask并行操作

问题:高内存使用

原因:加载整个数组或大分块到内存

解决方案

# 不要加载整个数组
# 错误:data = z[:]
# 正确:分块处理
for i in range(0, z.shape[0], 1000):
    chunk = z[i:i+1000, :]
    process(chunk)

# 或使用Dask自动分块
import dask.array as da
dask_z = da.from_zarr('data.zarr')
result = dask_z.mean().compute()  # 分块处理

问题:云存储延迟

解决方案

# 1. 整合元数据
zarr.consolidate_metadata(store)
z = zarr.open_consolidated(store)

# 2. 使用合适分块大小(云上5-100 MB)
chunks = (2000, 2000)  # 云上较大分块

# 3. 启用分片
shards = (10000, 10000)  # 分组多分块

问题:并发写入冲突

解决方案:使用同步器或确保非重叠写入

from zarr import ProcessSynchronizer

sync = ProcessSynchronizer('sync.sync')
z = zarr.open_array('data.zarr', mode='r+', synchronizer=sync)

# 或设计工作流使每个进程写入独立分块

额外资源

有关详细API文档、高级用法和最新更新:

相关库