Python语言专长Skill python

此技能专注于Python编程语言的高级应用,用于开发高质量软件、处理数据、构建机器学习模型等。它覆盖web框架、数据处理、异步编程、类型提示、测试和打包,关键词:Python, 编程, 软件开发, 数据科学, 机器学习, 后端开发, FastAPI, Django, 测试, 代码质量, SEO优化。

后端开发 0 次安装 2 次浏览 更新于 3/24/2026

名称: python 描述: Python语言专长,用于编写地道的、生产质量的Python代码。涵盖web框架(FastAPI, Django, Flask)、数据处理(pandas, numpy, dask)、机器学习模式(sklearn, pytorch)、异步编程、类型提示、pytest测试、打包(pip, uv, poetry)、代码检查(ruff, mypy, black)和PEP 8标准。适用于任何Python开发,包括数据工程和机器学习工作流。触发词:python, py, pip, uv, poetry, virtualenv, pytest, pydantic, fastapi, django, flask, pandas, numpy, dataclass, type hints, asyncio, mypy, ruff, black, sklearn, pytorch, tensorflow, jupyter, pipenv, conda。

Python语言专长

概述

此技能提供全面的指导,用于编写地道的、可维护的、生产质量的Python代码,覆盖所有领域:web应用、数据处理、机器学习和通用脚本编写。它涵盖现代Python实践,包括类型提示、异步编程、测试模式、正确打包、数据工程工作流和ML模型开发。

关键概念

类型提示(typing模块)

from typing import Optional, Union, List, Dict, Callable, TypeVar, Generic
from collections.abc import Sequence, Mapping, Iterator

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

def process_items(items: Sequence[T], transform: Callable[[T], T]) -> list[T]:
    return [transform(item) for item in items]

class Repository(Generic[T]):
    def __init__(self) -> None:
        self._items: dict[str, T] = {}

    def get(self, key: str) -> T | None:
        return self._items.get(key)

    def set(self, key: str, value: T) -> None:
        self._items[key] = value

异步/等待模式

import asyncio
from typing import AsyncIterator

async def fetch_data(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def process_batch(urls: list[str]) -> list[dict]:
    tasks = [fetch_data(url) for url in urls]
    return await asyncio.gather(*tasks, return_exceptions=True)

async def stream_items(source: AsyncIterator[bytes]) -> AsyncIterator[dict]:
    async for chunk in source:
        yield json.loads(chunk)

上下文管理器

from contextlib import contextmanager, asynccontextmanager
from typing import Iterator, AsyncIterator

@contextmanager
def managed_resource(name: str) -> Iterator[Resource]:
    resource = Resource(name)
    try:
        resource.acquire()
        yield resource
    finally:
        resource.release()

@asynccontextmanager
async def async_transaction(db: Database) -> AsyncIterator[Transaction]:
    tx = await db.begin()
    try:
        yield tx
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise

装饰器

from functools import wraps
from typing import Callable, ParamSpec, TypeVar

P = ParamSpec('P')
R = TypeVar('R')

def retry(max_attempts: int = 3) -> Callable[[Callable[P, R]], Callable[P, R]]:
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception: Exception | None = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
            raise last_exception
        return wrapper
    return decorator

生成器

from typing import Generator, Iterator

def paginate(items: Sequence[T], page_size: int) -> Generator[list[T], None, None]:
    for i in range(0, len(items), page_size):
        yield list(items[i:i + page_size])

def read_chunks(file_path: str, chunk_size: int = 8192) -> Iterator[bytes]:
    with open(file_path, 'rb') as f:
        while chunk := f.read(chunk_size):
            yield chunk

最佳实践

PEP 8 合规性

  • 使用4个空格缩进(永不使用制表符)
  • 最大行长为88个字符(black默认)或79(严格PEP 8)
  • 函数和变量使用蛇形命名法,类使用帕斯卡命名法
  • 顶级定义前两行空白,方法之间一行空白
  • 导入在顶部:标准库、第三方、本地(用空白行分隔)

现代Python特性(3.10+)

# 结构化模式匹配
match command:
    case {"action": "create", "name": str(name)}:
        create_resource(name)
    case {"action": "delete", "id": int(id_)}:
        delete_resource(id_)
    case _:
        raise ValueError("未知命令")

# 使用 | 的联合类型
def process(value: int | str | None) -> str:
    ...

# 流畅接口的Self类型
from typing import Self

class Builder:
    def with_name(self, name: str) -> Self:
        self._name = name
        return self

使用pyproject.toml打包

[project]
name = "mypackage"
version = "0.1.0"
description = "一个示例包"
requires-python = ">=3.11"
dependencies = [
    "httpx>=0.25.0",
    "pydantic>=2.0.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.21.0",
    "mypy>=1.0.0",
    "ruff>=0.1.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.ruff]
line-length = 88
target-version = "py311"

[tool.mypy]
strict = true
python_version = "3.11"

Web框架模式

FastAPI应用

from fastapi import FastAPI, Depends, HTTPException, status
from pydantic import BaseModel, Field
from typing import Annotated

app = FastAPI(title="API服务", version="1.0.0")

class UserCreate(BaseModel):
    email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    name: str = Field(..., min_length=1, max_length=100)

class User(UserCreate):
    id: int

async def get_db() -> AsyncIterator[AsyncSession]:
    async with AsyncSession(engine) as session:
        yield session

@app.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
async def create_user(
    user: UserCreate,
    db: Annotated[AsyncSession, Depends(get_db)]
) -> User:
    db_user = await UserService(db).create(user)
    return User(id=db_user.id, email=db_user.email, name=db_user.name)

@app.get("/users/{user_id}", response_model=User)
async def get_user(
    user_id: int,
    db: Annotated[AsyncSession, Depends(get_db)]
) -> User:
    user = await UserService(db).get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户未找到")
    return user

Django模式

from django.db import models, transaction
from django.core.validators import EmailValidator
from typing import Self

class TimeStampedModel(models.Model):
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        abstract = True

class User(TimeStampedModel):
    email = models.EmailField(unique=True, validators=[EmailValidator()])
    name = models.CharField(max_length=100)
    is_active = models.BooleanField(default=True)

    class Meta:
        db_table = 'users'
        indexes = [
            models.Index(fields=['email']),
            models.Index(fields=['created_at']),
        ]

    @classmethod
    def create_with_profile(cls, email: str, name: str) -> Self:
        with transaction.atomic():
            user = cls.objects.create(email=email, name=name)
            Profile.objects.create(user=user)
            return user

数据工程模式

Pandas数据处理

import pandas as pd
import numpy as np
from typing import Callable

def load_and_clean(file_path: str) -> pd.DataFrame:
    df = pd.read_csv(file_path, parse_dates=['timestamp'])

    # 处理缺失值
    df['amount'] = df['amount'].fillna(0)
    df['category'] = df['category'].fillna('未知')

    # 类型转换
    df['user_id'] = df['user_id'].astype('Int64')
    df['amount'] = df['amount'].astype('float64')

    # 移除重复项
    df = df.drop_duplicates(subset=['user_id', 'timestamp'])

    return df

def aggregate_by_window(
    df: pd.DataFrame,
    window: str = '1D',
    agg_funcs: dict[str, str | list[str]] = None
) -> pd.DataFrame:
    if agg_funcs is None:
        agg_funcs = {'amount': ['sum', 'mean', 'count']}

    return (df
        .set_index('timestamp')
        .groupby('category')
        .resample(window)
        .agg(agg_funcs)
        .reset_index())

def apply_transformation(
    df: pd.DataFrame,
    transform: Callable[[pd.Series], pd.Series],
    columns: list[str]
) -> pd.DataFrame:
    df_copy = df.copy()
    for col in columns:
        df_copy[col] = transform(df_copy[col])
    return df_copy

# 向量化操作以提高性能
def calculate_features(df: pd.DataFrame) -> pd.DataFrame:
    df['amount_log'] = np.log1p(df['amount'])
    df['amount_zscore'] = (df['amount'] - df['amount'].mean()) / df['amount'].std()
    df['is_weekend'] = df['timestamp'].dt.dayofweek.isin([5, 6])
    return df

用于大型数据集的Dask

import dask.dataframe as dd
from dask.diagnostics import ProgressBar

def process_large_dataset(input_path: str, output_path: str) -> None:
    # 读取分区数据
    ddf = dd.read_parquet(input_path, engine='pyarrow')

    # 惰性转换
    ddf = ddf[ddf['amount'] > 0]
    ddf['amount_usd'] = ddf['amount'] * ddf['exchange_rate']

    # 聚合
    result = ddf.groupby('category').agg({
        'amount_usd': ['sum', 'mean', 'count'],
        'user_id': 'nunique'
    })

    # 执行并保存
    with ProgressBar():
        result.compute().to_parquet(output_path)

def parallel_apply(
    ddf: dd.DataFrame,
    func: Callable[[pd.DataFrame], pd.DataFrame],
    meta: dict[str, type]
) -> dd.DataFrame:
    return ddf.map_partitions(func, meta=meta)

NumPy数值计算

import numpy as np
from numpy.typing import NDArray

def moving_average(
    data: NDArray[np.float64],
    window_size: int
) -> NDArray[np.float64]:
    return np.convolve(data, np.ones(window_size), 'valid') / window_size

def normalize_features(
    X: NDArray[np.float64],
    axis: int = 0
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
    mean = np.mean(X, axis=axis, keepdims=True)
    std = np.std(X, axis=axis, keepdims=True)
    X_normalized = (X - mean) / (std + 1e-8)
    return X_normalized, mean, std

def batch_process(
    data: NDArray[np.float64],
    batch_size: int
) -> list[NDArray[np.float64]]:
    n_samples = data.shape[0]
    return [data[i:i+batch_size] for i in range(0, n_samples, batch_size)]

机器学习模式

Scikit-learn管道

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class CustomFeatureTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, log_transform: bool = True):
        self.log_transform = log_transform

    def fit(self, X: NDArray, y: NDArray | None = None) -> Self:
        return self

    def transform(self, X: NDArray) -> NDArray:
        X_copy = X.copy()
        if self.log_transform:
            X_copy = np.log1p(np.abs(X_copy))
        return X_copy

def build_pipeline() -> Pipeline:
    return Pipeline([
        ('features', CustomFeatureTransformer(log_transform=True)),
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(random_state=42))
    ])

def train_with_cv(
    X: NDArray,
    y: NDArray,
    pipeline: Pipeline,
    cv: int = 5
) -> dict[str, float]:
    scores = cross_val_score(pipeline, X, y, cv=cv, scoring='f1_macro')
    return {
        'mean_score': scores.mean(),
        'std_score': scores.std(),
        'scores': scores.tolist()
    }

def hyperparameter_search(
    X: NDArray,
    y: NDArray,
    pipeline: Pipeline
) -> tuple[Pipeline, dict]:
    param_grid = {
        'classifier__n_estimators': [100, 200, 300],
        'classifier__max_depth': [10, 20, None],
        'features__log_transform': [True, False]
    }

    search = GridSearchCV(
        pipeline,
        param_grid,
        cv=5,
        scoring='f1_macro',
        n_jobs=-1,
        verbose=1
    )

    search.fit(X, y)
    return search.best_estimator_, search.best_params_

PyTorch模型训练

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from typing import Callable

class CustomDataset(Dataset[tuple[torch.Tensor, torch.Tensor]]):
    def __init__(self, X: NDArray, y: NDArray, transform: Callable | None = None):
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).long()
        self.transform = transform

    def __len__(self) -> int:
        return len(self.X)

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        x, y = self.X[idx], self.y[idx]
        if self.transform:
            x = self.transform(x)
        return x, y

class MLP(nn.Module):
    def __init__(self, input_dim: int, hidden_dims: list[int], output_dim: int):
        super().__init__()
        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_dim),
                nn.Dropout(0.3)
            ])
            prev_dim = hidden_dim

        layers.append(nn.Linear(prev_dim, output_dim))
        self.network = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.network(x)

def train_epoch(
    model: nn.Module,
    dataloader: DataLoader,
    criterion: nn.Module,
    optimizer: torch.optim.Optimizer,
    device: torch.device
) -> float:
    model.train()
    total_loss = 0.0

    for X_batch, y_batch in dataloader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X_batch.size(0)

    return total_loss / len(dataloader.dataset)

@torch.no_grad()
def evaluate(
    model: nn.Module,
    dataloader: DataLoader,
    device: torch.device
) -> tuple[float, NDArray]:
    model.eval()
    all_preds = []
    all_labels = []

    for X_batch, y_batch in dataloader:
        X_batch = X_batch.to(device)
        outputs = model(X_batch)
        preds = outputs.argmax(dim=1).cpu().numpy()

        all_preds.extend(preds)
        all_labels.extend(y_batch.numpy())

    accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
    return accuracy, np.array(all_preds)

常见模式

数据类和Pydantic模型

from dataclasses import dataclass, field
from pydantic import BaseModel, Field, field_validator

@dataclass
class Config:
    host: str
    port: int = 8080
    tags: list[str] = field(default_factory=list)

class UserCreate(BaseModel):
    email: str = Field(..., min_length=5)
    name: str = Field(..., max_length=100)

    @field_validator('email')
    @classmethod
    def validate_email(cls, v: str) -> str:
        if '@' not in v:
            raise ValueError('无效邮箱')
        return v.lower()

使用pytest测试

import pytest
from unittest.mock import AsyncMock, patch

@pytest.fixture
def client() -> TestClient:
    return TestClient(app)

@pytest.fixture
async def db_session() -> AsyncIterator[AsyncSession]:
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    async with AsyncSession(engine) as session:
        yield session

class TestUserService:
    @pytest.mark.asyncio
    async def test_create_user(self, db_session: AsyncSession) -> None:
        service = UserService(db_session)
        user = await service.create(name="测试", email="test@example.com")
        assert user.id is not None

    @pytest.mark.parametrize("email,valid", [
        ("user@example.com", True),
        ("invalid", False),
        ("", False),
    ])
    def test_email_validation(self, email: str, valid: bool) -> None:
        if valid:
            User(email=email, name="测试")
        else:
            with pytest.raises(ValueError):
                User(email=email, name="测试")

    @patch("mymodule.external_api")
    async def test_with_mock(self, mock_api: AsyncMock) -> None:
        mock_api.fetch.return_value = {"status": "ok"}
        result = await process_with_api()
        mock_api.fetch.assert_called_once()

反模式

避免这些做法

# 差:可变默认参数
def append_to(item, target=[]):  # 错误:调用间共享列表
    target.append(item)
    return target

# 好:使用None并创建新列表
def append_to(item, target=None):
    if target is None:
        target = []
    target.append(item)
    return target

# 差:裸except子句
try:
    risky_operation()
except:  # 捕获SystemExit、KeyboardInterrupt等
    pass

# 好:捕获特定异常
try:
    risky_operation()
except (ValueError, RuntimeError) as e:
    logger.error(f"操作失败: {e}")

# 差:使用+进行复杂字符串格式化
message = "User " + name + " has " + str(count) + " items"

# 好:f-strings
message = f"用户 {name} 有 {count} 个物品"

# 差:使用type()检查类型
if type(obj) == list:
    ...

# 好:使用isinstance进行类型检查
if isinstance(obj, list):
    ...

# 差:不使用上下文管理器处理资源
f = open("file.txt")
data = f.read()
f.close()

# 好:始终使用上下文管理器
with open("file.txt") as f:
    data = f.read()

# 差:全局可变状态
_cache = {}

def get_cached(key):
    return _cache.get(key)

# 好:在类中封装状态或使用依赖注入
class Cache:
    def __init__(self):
        self._store: dict[str, Any] = {}

    def get(self, key: str) -> Any | None:
        return self._store.get(key)