名称: python 描述: Python语言专长,用于编写地道的、生产质量的Python代码。涵盖web框架(FastAPI, Django, Flask)、数据处理(pandas, numpy, dask)、机器学习模式(sklearn, pytorch)、异步编程、类型提示、pytest测试、打包(pip, uv, poetry)、代码检查(ruff, mypy, black)和PEP 8标准。适用于任何Python开发,包括数据工程和机器学习工作流。触发词:python, py, pip, uv, poetry, virtualenv, pytest, pydantic, fastapi, django, flask, pandas, numpy, dataclass, type hints, asyncio, mypy, ruff, black, sklearn, pytorch, tensorflow, jupyter, pipenv, conda。
Python语言专长
概述
此技能提供全面的指导,用于编写地道的、可维护的、生产质量的Python代码,覆盖所有领域:web应用、数据处理、机器学习和通用脚本编写。它涵盖现代Python实践,包括类型提示、异步编程、测试模式、正确打包、数据工程工作流和ML模型开发。
关键概念
类型提示(typing模块)
from typing import Optional, Union, List, Dict, Callable, TypeVar, Generic
from collections.abc import Sequence, Mapping, Iterator
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
def process_items(items: Sequence[T], transform: Callable[[T], T]) -> list[T]:
return [transform(item) for item in items]
class Repository(Generic[T]):
def __init__(self) -> None:
self._items: dict[str, T] = {}
def get(self, key: str) -> T | None:
return self._items.get(key)
def set(self, key: str, value: T) -> None:
self._items[key] = value
异步/等待模式
import asyncio
from typing import AsyncIterator
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def process_batch(urls: list[str]) -> list[dict]:
tasks = [fetch_data(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def stream_items(source: AsyncIterator[bytes]) -> AsyncIterator[dict]:
async for chunk in source:
yield json.loads(chunk)
上下文管理器
from contextlib import contextmanager, asynccontextmanager
from typing import Iterator, AsyncIterator
@contextmanager
def managed_resource(name: str) -> Iterator[Resource]:
resource = Resource(name)
try:
resource.acquire()
yield resource
finally:
resource.release()
@asynccontextmanager
async def async_transaction(db: Database) -> AsyncIterator[Transaction]:
tx = await db.begin()
try:
yield tx
await tx.commit()
except Exception:
await tx.rollback()
raise
装饰器
from functools import wraps
from typing import Callable, ParamSpec, TypeVar
P = ParamSpec('P')
R = TypeVar('R')
def retry(max_attempts: int = 3) -> Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
last_exception: Exception | None = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
raise last_exception
return wrapper
return decorator
生成器
from typing import Generator, Iterator
def paginate(items: Sequence[T], page_size: int) -> Generator[list[T], None, None]:
for i in range(0, len(items), page_size):
yield list(items[i:i + page_size])
def read_chunks(file_path: str, chunk_size: int = 8192) -> Iterator[bytes]:
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
yield chunk
最佳实践
PEP 8 合规性
- 使用4个空格缩进(永不使用制表符)
- 最大行长为88个字符(black默认)或79(严格PEP 8)
- 函数和变量使用蛇形命名法,类使用帕斯卡命名法
- 顶级定义前两行空白,方法之间一行空白
- 导入在顶部:标准库、第三方、本地(用空白行分隔)
现代Python特性(3.10+)
# 结构化模式匹配
match command:
case {"action": "create", "name": str(name)}:
create_resource(name)
case {"action": "delete", "id": int(id_)}:
delete_resource(id_)
case _:
raise ValueError("未知命令")
# 使用 | 的联合类型
def process(value: int | str | None) -> str:
...
# 流畅接口的Self类型
from typing import Self
class Builder:
def with_name(self, name: str) -> Self:
self._name = name
return self
使用pyproject.toml打包
[project]
name = "mypackage"
version = "0.1.0"
description = "一个示例包"
requires-python = ">=3.11"
dependencies = [
"httpx>=0.25.0",
"pydantic>=2.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"mypy>=1.0.0",
"ruff>=0.1.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.ruff]
line-length = 88
target-version = "py311"
[tool.mypy]
strict = true
python_version = "3.11"
Web框架模式
FastAPI应用
from fastapi import FastAPI, Depends, HTTPException, status
from pydantic import BaseModel, Field
from typing import Annotated
app = FastAPI(title="API服务", version="1.0.0")
class UserCreate(BaseModel):
email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
name: str = Field(..., min_length=1, max_length=100)
class User(UserCreate):
id: int
async def get_db() -> AsyncIterator[AsyncSession]:
async with AsyncSession(engine) as session:
yield session
@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(
user: UserCreate,
db: Annotated[AsyncSession, Depends(get_db)]
) -> User:
db_user = await UserService(db).create(user)
return User(id=db_user.id, email=db_user.email, name=db_user.name)
@app.get("/users/{user_id}", response_model=User)
async def get_user(
user_id: int,
db: Annotated[AsyncSession, Depends(get_db)]
) -> User:
user = await UserService(db).get(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户未找到")
return user
Django模式
from django.db import models, transaction
from django.core.validators import EmailValidator
from typing import Self
class TimeStampedModel(models.Model):
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
class User(TimeStampedModel):
email = models.EmailField(unique=True, validators=[EmailValidator()])
name = models.CharField(max_length=100)
is_active = models.BooleanField(default=True)
class Meta:
db_table = 'users'
indexes = [
models.Index(fields=['email']),
models.Index(fields=['created_at']),
]
@classmethod
def create_with_profile(cls, email: str, name: str) -> Self:
with transaction.atomic():
user = cls.objects.create(email=email, name=name)
Profile.objects.create(user=user)
return user
数据工程模式
Pandas数据处理
import pandas as pd
import numpy as np
from typing import Callable
def load_and_clean(file_path: str) -> pd.DataFrame:
df = pd.read_csv(file_path, parse_dates=['timestamp'])
# 处理缺失值
df['amount'] = df['amount'].fillna(0)
df['category'] = df['category'].fillna('未知')
# 类型转换
df['user_id'] = df['user_id'].astype('Int64')
df['amount'] = df['amount'].astype('float64')
# 移除重复项
df = df.drop_duplicates(subset=['user_id', 'timestamp'])
return df
def aggregate_by_window(
df: pd.DataFrame,
window: str = '1D',
agg_funcs: dict[str, str | list[str]] = None
) -> pd.DataFrame:
if agg_funcs is None:
agg_funcs = {'amount': ['sum', 'mean', 'count']}
return (df
.set_index('timestamp')
.groupby('category')
.resample(window)
.agg(agg_funcs)
.reset_index())
def apply_transformation(
df: pd.DataFrame,
transform: Callable[[pd.Series], pd.Series],
columns: list[str]
) -> pd.DataFrame:
df_copy = df.copy()
for col in columns:
df_copy[col] = transform(df_copy[col])
return df_copy
# 向量化操作以提高性能
def calculate_features(df: pd.DataFrame) -> pd.DataFrame:
df['amount_log'] = np.log1p(df['amount'])
df['amount_zscore'] = (df['amount'] - df['amount'].mean()) / df['amount'].std()
df['is_weekend'] = df['timestamp'].dt.dayofweek.isin([5, 6])
return df
用于大型数据集的Dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
def process_large_dataset(input_path: str, output_path: str) -> None:
# 读取分区数据
ddf = dd.read_parquet(input_path, engine='pyarrow')
# 惰性转换
ddf = ddf[ddf['amount'] > 0]
ddf['amount_usd'] = ddf['amount'] * ddf['exchange_rate']
# 聚合
result = ddf.groupby('category').agg({
'amount_usd': ['sum', 'mean', 'count'],
'user_id': 'nunique'
})
# 执行并保存
with ProgressBar():
result.compute().to_parquet(output_path)
def parallel_apply(
ddf: dd.DataFrame,
func: Callable[[pd.DataFrame], pd.DataFrame],
meta: dict[str, type]
) -> dd.DataFrame:
return ddf.map_partitions(func, meta=meta)
NumPy数值计算
import numpy as np
from numpy.typing import NDArray
def moving_average(
data: NDArray[np.float64],
window_size: int
) -> NDArray[np.float64]:
return np.convolve(data, np.ones(window_size), 'valid') / window_size
def normalize_features(
X: NDArray[np.float64],
axis: int = 0
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
mean = np.mean(X, axis=axis, keepdims=True)
std = np.std(X, axis=axis, keepdims=True)
X_normalized = (X - mean) / (std + 1e-8)
return X_normalized, mean, std
def batch_process(
data: NDArray[np.float64],
batch_size: int
) -> list[NDArray[np.float64]]:
n_samples = data.shape[0]
return [data[i:i+batch_size] for i in range(0, n_samples, batch_size)]
机器学习模式
Scikit-learn管道
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class CustomFeatureTransformer(BaseEstimator, TransformerMixin):
def __init__(self, log_transform: bool = True):
self.log_transform = log_transform
def fit(self, X: NDArray, y: NDArray | None = None) -> Self:
return self
def transform(self, X: NDArray) -> NDArray:
X_copy = X.copy()
if self.log_transform:
X_copy = np.log1p(np.abs(X_copy))
return X_copy
def build_pipeline() -> Pipeline:
return Pipeline([
('features', CustomFeatureTransformer(log_transform=True)),
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
def train_with_cv(
X: NDArray,
y: NDArray,
pipeline: Pipeline,
cv: int = 5
) -> dict[str, float]:
scores = cross_val_score(pipeline, X, y, cv=cv, scoring='f1_macro')
return {
'mean_score': scores.mean(),
'std_score': scores.std(),
'scores': scores.tolist()
}
def hyperparameter_search(
X: NDArray,
y: NDArray,
pipeline: Pipeline
) -> tuple[Pipeline, dict]:
param_grid = {
'classifier__n_estimators': [100, 200, 300],
'classifier__max_depth': [10, 20, None],
'features__log_transform': [True, False]
}
search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='f1_macro',
n_jobs=-1,
verbose=1
)
search.fit(X, y)
return search.best_estimator_, search.best_params_
PyTorch模型训练
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from typing import Callable
class CustomDataset(Dataset[tuple[torch.Tensor, torch.Tensor]]):
def __init__(self, X: NDArray, y: NDArray, transform: Callable | None = None):
self.X = torch.from_numpy(X).float()
self.y = torch.from_numpy(y).long()
self.transform = transform
def __len__(self) -> int:
return len(self.X)
def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
x, y = self.X[idx], self.y[idx]
if self.transform:
x = self.transform(x)
return x, y
class MLP(nn.Module):
def __init__(self, input_dim: int, hidden_dims: list[int], output_dim: int):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim),
nn.Dropout(0.3)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.network(x)
def train_epoch(
model: nn.Module,
dataloader: DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer,
device: torch.device
) -> float:
model.train()
total_loss = 0.0
for X_batch, y_batch in dataloader:
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item() * X_batch.size(0)
return total_loss / len(dataloader.dataset)
@torch.no_grad()
def evaluate(
model: nn.Module,
dataloader: DataLoader,
device: torch.device
) -> tuple[float, NDArray]:
model.eval()
all_preds = []
all_labels = []
for X_batch, y_batch in dataloader:
X_batch = X_batch.to(device)
outputs = model(X_batch)
preds = outputs.argmax(dim=1).cpu().numpy()
all_preds.extend(preds)
all_labels.extend(y_batch.numpy())
accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
return accuracy, np.array(all_preds)
常见模式
数据类和Pydantic模型
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, field_validator
@dataclass
class Config:
host: str
port: int = 8080
tags: list[str] = field(default_factory=list)
class UserCreate(BaseModel):
email: str = Field(..., min_length=5)
name: str = Field(..., max_length=100)
@field_validator('email')
@classmethod
def validate_email(cls, v: str) -> str:
if '@' not in v:
raise ValueError('无效邮箱')
return v.lower()
使用pytest测试
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture
def client() -> TestClient:
return TestClient(app)
@pytest.fixture
async def db_session() -> AsyncIterator[AsyncSession]:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
yield session
class TestUserService:
@pytest.mark.asyncio
async def test_create_user(self, db_session: AsyncSession) -> None:
service = UserService(db_session)
user = await service.create(name="测试", email="test@example.com")
assert user.id is not None
@pytest.mark.parametrize("email,valid", [
("user@example.com", True),
("invalid", False),
("", False),
])
def test_email_validation(self, email: str, valid: bool) -> None:
if valid:
User(email=email, name="测试")
else:
with pytest.raises(ValueError):
User(email=email, name="测试")
@patch("mymodule.external_api")
async def test_with_mock(self, mock_api: AsyncMock) -> None:
mock_api.fetch.return_value = {"status": "ok"}
result = await process_with_api()
mock_api.fetch.assert_called_once()
反模式
避免这些做法
# 差:可变默认参数
def append_to(item, target=[]): # 错误:调用间共享列表
target.append(item)
return target
# 好:使用None并创建新列表
def append_to(item, target=None):
if target is None:
target = []
target.append(item)
return target
# 差:裸except子句
try:
risky_operation()
except: # 捕获SystemExit、KeyboardInterrupt等
pass
# 好:捕获特定异常
try:
risky_operation()
except (ValueError, RuntimeError) as e:
logger.error(f"操作失败: {e}")
# 差:使用+进行复杂字符串格式化
message = "User " + name + " has " + str(count) + " items"
# 好:f-strings
message = f"用户 {name} 有 {count} 个物品"
# 差:使用type()检查类型
if type(obj) == list:
...
# 好:使用isinstance进行类型检查
if isinstance(obj, list):
...
# 差:不使用上下文管理器处理资源
f = open("file.txt")
data = f.read()
f.close()
# 好:始终使用上下文管理器
with open("file.txt") as f:
data = f.read()
# 差:全局可变状态
_cache = {}
def get_cached(key):
return _cache.get(key)
# 好:在类中封装状态或使用依赖注入
class Cache:
def __init__(self):
self._store: dict[str, Any] = {}
def get(self, key: str) -> Any | None:
return self._store.get(key)