网络爬虫Skill web-scraping

网络爬虫技能用于从网站提取数据,包括绕过反爬虫机制、内容提取、处理未记录API和毒丸检测。适用于数据采集、内容分析、社交媒体处理和SEO优化。关键词包括网络爬虫、数据提取、反爬虫、API爬取、毒丸检测、网页抓取、数据工程。

数据工程 0 次安装 4 次浏览 更新于 3/15/2026

名称: 网络爬虫 描述: 具有反爬虫绕过、内容提取、未记录API和毒丸检测的网络爬虫。适用于从网站提取内容、处理付费墙、实施爬取级联或处理社交媒体。涵盖请求、trafilatura、Playwright隐身模式、yt-dlp和instaloader模式。

网络爬虫方法论

可靠、伦理的网络爬虫模式,包括后备策略和反爬虫处理。

爬取级联架构

实施多个提取策略,具有自动后备:

from abc import ABC, abstractmethod
from typing import Optional
import requests
from bs4 import BeautifulSoup
import trafilatura

# 用于 .py 文件
from playwright.sync_api import sync_playwright
from playwright_stealth import stealth_sync

# 用于 .ipynb 文件
import asyncio
from playwright.async_api import async_playwright

class ScrapingResult:
    def __init__(self, content: str, title: str, method: str):
        self.content = content
        self.title = title
        self.method = method  # 跟踪哪个方法成功

class Scraper(ABC):
    @abstractmethod
    def fetch(self, url: str) -> Optional[ScrapingResult]: ...

class TrafilaturaСscraper(Scraper):
    """快速、轻量级提取标准文章。"""

    def fetch(self, url: str) -> Optional[ScrapingResult]:
        try:
            downloaded = trafilatura.fetch_url(url)
            if not downloaded:
                return None

            content = trafilatura.extract(
                downloaded,
                include_comments=False,
                include_tables=True,
                favor_recall=True
            )

            if not content or len(content) < 100:
                return None

            # 单独提取标题
            soup = BeautifulSoup(downloaded, 'html.parser')
            title = soup.find('title')
            title_text = title.get_text() if title else ''

            return ScrapingResult(content, title_text, 'trafilatura')
        except Exception:
            return None

class RequestsScraper(Scraper):
    """HTTP请求,带有旋转用户代理。"""

    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
    ]

    def fetch(self, url: str) -> Optional[ScrapingResult]:
        import random

        headers = {
            'User-Agent': random.choice(self.USER_AGENTS),
            'Accept': 'text/html,application/xhtml+xml',
            'Accept-Language': 'en-US,en;q=0.9',
        }

        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')

            # 移除脚本/样式元素
            for element in soup(['script', 'style', 'nav', 'footer', 'aside']):
                element.decompose()

            # 查找主要内容
            main = soup.find('main') or soup.find('article') or soup.find('body')
            content = main.get_text(separator='
', strip=True) if main else ''

            title = soup.find('title')
            title_text = title.get_text() if title else ''

            if len(content) < 100:
                return None

            return ScrapingResult(content, title_text, 'requests')
        except Exception:
            return None

class PlaywrightScraper(Scraper):
    """重JavaScript渲染,带有隐身模式以绕过反爬虫。"""

    def fetch(self, url: str) -> Optional[ScrapingResult]:
        try:
            with sync_playwright() as p:
                browser = p.chromium.launch(headless=True)
                context = browser.new_context(
                    viewport={'width': 1920, 'height': 1080},
                    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                )
                page = context.new_page()

                # 应用隐身以避免检测
                stealth_sync(page)

                page.goto(url, wait_until='networkidle', timeout=60000)

                # 等待内容加载
                page.wait_for_timeout(2000)

                # 提取内容
                content = page.evaluate('''() => {
                    const article = document.querySelector('article, main, .content, #content');
                    return article ? article.innerText : document.body.innerText;
                }''')

                title = page.title()

                browser.close()

                if len(content) < 100:
                    return None

                return ScrapingResult(content, title, 'playwright')
        except Exception:
            return None

class PlaywrightScraperAsync:
    """异步Playwright爬虫,用于Jupyter笔记本(.ipynb文件)。
    
    Jupyter笔记本运行自己的事件循环,因此同步Playwright无法工作。
    在笔记本单元格中使用此异步版本与`await`。
    """

    async def fetch(self, url: str) -> Optional[ScrapingResult]:
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch(headless=True)
                context = await browser.new_context(
                    viewport={'width': 1920, 'height': 1080},
                    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                )
                page = await context.new_page()

                # 注意:playwright-stealth异步版本
                # from playwright_stealth import stealth_async
                # await stealth_async(page)

                await page.goto(url, wait_until='networkidle', timeout=60000)

                # 等待内容加载
                await page.wait_for_timeout(2000)

                # 提取内容
                content = await page.evaluate('''() => {
                    const article = document.querySelector('article, main, .content, #content');
                    return article ? article.innerText : document.body.innerText;
                }''')

                title = await page.title()

                await browser.close()

                if len(content) < 100:
                    return None

                return ScrapingResult(content, title, 'playwright_async')
        except Exception:
            return None

# 在Jupyter笔记本单元格中的用法:
# scraper = PlaywrightScraperAsync()
# result = await scraper.fetch('https://example.com')

class ScrapingCascade:
    """按顺序尝试多个爬虫,直到一个成功。"""

    def __init__(self):
        self.scrapers = [
            TrafilaturaСscraper(),
            RequestsScraper(),
            PlaywrightScraper(),
        ]

    def fetch(self, url: str) -> Optional[ScrapingResult]:
        for scraper in self.scrapers:
            result = scraper.fetch(url)
            if result:
                return result
        return None

未记录API

查找未记录API

使用浏览器开发者工具发现API:

  1. 打开开发者工具(右键单击 → 检查,或F12)
  2. 转到网络选项卡以监视所有请求
  3. 通过Fetch/XHR过滤以仅显示API调用
  4. 触发您想要捕获的操作(搜索、滚动、点击)
  5. 分析响应 — 通常是带键值对的JSON
  6. 复制为cURL(右键单击请求)
  7. 使用curlconverter.com转换为代码

简化API请求

当您从开发者工具复制cURL时,它包含许多参数。通过以下方式简化:

  1. 移除不必要的cookie — 首先测试没有它们
  2. 保留认证令牌如果需要
  3. 识别您可以修改的输入参数(如搜索词的prefix
  4. 测试参数值 — 一些会过期,所以定期验证

示例:反向工程自动完成API

import requests
import time

def search_suggestions(keyword: str) -> dict:
    """
    从未记录API获取自动完成的搜索建议。
    从浏览器开发者工具捕获中简化。
    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:100.0) Gecko/20100101 Firefox/100.0',
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Accept-Language': 'en-US,en;q=0.5',
    }

    params = {
        'prefix': keyword,
        'suggestion-type': ['WIDGET', 'KEYWORD'],
        'alias': 'aps',
        'plain-mid': '1',
    }

    response = requests.get(
        'https://completion.amazon.com/api/2017/suggestions',
        params=params,
        headers=headers
    )
    return response.json()

# 收集多个关键词的建议
keywords = ['a', 'b', 'cookie', 'sock']
data = []

for keyword in keywords:
    suggestions = search_suggestions(keyword)
    suggestions['search_word'] = keyword  # 跟踪种子关键词
    time.sleep(1)  # 限制自己的速率
    data.extend(suggestions.get('suggestions', []))

来源:Leon Yin, “Finding Undocumented APIs,” Inspect Element, 2023

毒丸检测

检测付费墙、反爬虫页面和其他失败:

from dataclasses import dataclass
from enum import Enum
import re

class PoisonPillType(Enum):
    PAYWALL = 'paywall'
    CAPTCHA = 'captcha'
    RATE_LIMIT = 'rate_limit'
    CLOUDFLARE = 'cloudflare'
    LOGIN_REQUIRED = 'login_required'
    NOT_FOUND = 'not_found'
    NONE = 'none'

@dataclass
class PoisonPillResult:
    detected: bool
    type: PoisonPillType
    confidence: float
    details: str

class PoisonPillDetector:
    PATTERNS = {
        PoisonPillType.PAYWALL: [
            r'subscribe to continue',
            r'subscription required',
            r'become a member',
            r'sign up to read',
            r'you\'ve reached your limit',
            r'article limit reached',
        ],
        PoisonPillType.CAPTCHA: [
            r'verify you are human',
            r'captcha',
            r'robot verification',
            r'prove you\'re not a robot',
        ],
        PoisonPillType.RATE_LIMIT: [
            r'too many requests',
            r'rate limit exceeded',
            r'slow down',
            r'429',
        ],
        PoisonPillType.CLOUDFLARE: [
            r'checking your browser',
            r'cloudflare',
            r'ddos protection',
            r'please wait while we verify',
        ],
        PoisonPillType.LOGIN_REQUIRED: [
            r'sign in to continue',
            r'log in required',
            r'create an account',
        ],
    }

    PAYWALL_DOMAINS = {
        'nytimes.com': PoisonPillType.PAYWALL,
        'wsj.com': PoisonPillType.PAYWALL,
        'washingtonpost.com': PoisonPillType.PAYWALL,
        'ft.com': PoisonPillType.PAYWALL,
        'bloomberg.com': PoisonPillType.PAYWALL,
    }

    def detect(self, url: str, content: str, status_code: int = 200) -> PoisonPillResult:
        # 检查状态码
        if status_code == 429:
            return PoisonPillResult(True, PoisonPillType.RATE_LIMIT, 1.0, 'HTTP 429')
        if status_code == 403:
            return PoisonPillResult(True, PoisonPillType.CLOUDFLARE, 0.8, 'HTTP 403')
        if status_code == 404:
            return PoisonPillResult(True, PoisonPillType.NOT_FOUND, 1.0, 'HTTP 404')

        # 检查已知付费墙域名
        from urllib.parse import urlparse
        domain = urlparse(url).netloc.replace('www.', '')
        for paywall_domain, pill_type in self.PAYWALL_DOMAINS.items():
            if paywall_domain in domain:
                # 检查内容是否异常短(付费墙截断)
                if len(content) < 500:
                    return PoisonPillResult(True, pill_type, 0.9, f'来自{domain}的短内容')

        # 模式匹配
        content_lower = content.lower()
        for pill_type, patterns in self.PATTERNS.items():
            for pattern in patterns:
                if re.search(pattern, content_lower):
                    return PoisonPillResult(True, pill_type, 0.7, f'模式匹配: {pattern}')

        return PoisonPillResult(False, PoisonPillType.NONE, 0.0, '')

社交媒体爬取

使用yt-dlp的YouTube

import yt_dlp
from pathlib import Path

def download_video_metadata(url: str) -> dict:
    """提取元数据而不下载视频。"""
    ydl_opts = {
        'skip_download': True,
        'quiet': True,
        'no_warnings': True,
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)
        return {
            'title': info.get('title'),
            'description': info.get('description'),
            'duration': info.get('duration'),
            'upload_date': info.get('upload_date'),
            'view_count': info.get('view_count'),
            'channel': info.get('channel'),
            'thumbnail': info.get('thumbnail'),
        }

def download_video(url: str, output_dir: Path, audio_only: bool = False) -> Path:
    """下载视频或音频。"""
    output_template = str(output_dir / '%(title)s.%(ext)s')

    ydl_opts = {
        'outtmpl': output_template,
        'quiet': True,
    }

    if audio_only:
        ydl_opts['format'] = 'bestaudio/best'
        ydl_opts['postprocessors'] = [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
        }]

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        filename = ydl.prepare_filename(info)
        if audio_only:
            filename = filename.rsplit('.', 1)[0] + '.mp3'
        return Path(filename)

def get_transcript(url: str) -> list[dict]:
    """提取自动生成或手动字幕。"""
    ydl_opts = {
        'skip_download': True,
        'writesubtitles': True,
        'writeautomaticsub': True,
        'subtitleslangs': ['en'],
        'quiet': True,
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)

        # 检查字幕
        subtitles = info.get('subtitles', {})
        auto_captions = info.get('automatic_captions', {})

        # 偏好手动字幕而非自动生成
        subs = subtitles.get('en') or auto_captions.get('en')
        if not subs:
            return []

        # 获取vtt或json格式
        for sub in subs:
            if sub['ext'] in ['vtt', 'json3']:
                # 下载并解析字幕文件
                # ... 实现取决于格式
                pass

        return []

使用instaloader的Instagram

import instaloader
from pathlib import Path

class InstagramScraper:
    def __init__(self, username: str = None, session_file: str = None):
        self.loader = instaloader.Instaloader(
            download_videos=True,
            download_video_thumbnails=False,
            download_geotags=False,
            download_comments=False,
            save_metadata=True,
            compress_json=False,
        )

        if session_file and Path(session_file).exists():
            self.loader.load_session_from_file(username, session_file)

    def get_profile_posts(self, username: str, limit: int = 50) -> list[dict]:
        """从个人资料获取最近的帖子。"""
        profile = instaloader.Profile.from_username(self.loader.context, username)
        posts = []

        for i, post in enumerate(profile.get_posts()):
            if i >= limit:
                break

            posts.append({
                'shortcode': post.shortcode,
                'url': f'https://instagram.com/p/{post.shortcode}/',
                'caption': post.caption,
                'timestamp': post.date_utc.isoformat(),
                'likes': post.likes,
                'comments': post.comments,
                'is_video': post.is_video,
                'video_url': post.video_url if post.is_video else None,
            })

        return posts

    def download_post(self, shortcode: str, output_dir: Path):
        """下载单个帖子的媒体。"""
        post = instaloader.Post.from_shortcode(self.loader.context, shortcode)
        self.loader.download_post(post, target=str(output_dir))

使用yt-dlp的TikTok

def scrape_tiktok_profile(username: str, output_dir: Path, limit: int = 50) -> list[dict]:
    """爬取TikTok个人资料视频。"""
    profile_url = f'https://tiktok.com/@{username}'

    ydl_opts = {
        'quiet': True,
        'extract_flat': True,  # 不下载,仅获取信息
        'playlistend': limit,
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(profile_url, download=False)
        videos = []

        for entry in info.get('entries', []):
            videos.append({
                'id': entry.get('id'),
                'title': entry.get('title'),
                'url': entry.get('url'),
                'timestamp': entry.get('timestamp'),
                'view_count': entry.get('view_count'),
            })

        return videos

def download_tiktok_video(url: str, output_dir: Path) -> Path:
    """下载单个TikTok视频。"""
    ydl_opts = {
        'outtmpl': str(output_dir / '%(id)s.%(ext)s'),
        'quiet': True,
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        return Path(ydl.prepare_filename(info))

请求模式

旋转用户代理和标头

import random
from fake_useragent import UserAgent

class RequestManager:
    def __init__(self):
        self.ua = UserAgent()
        self.session = requests.Session()

    def get_headers(self) -> dict:
        return {
            'User-Agent': self.ua.random,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }

    def fetch(self, url: str, retry_count: int = 3) -> requests.Response:
        for attempt in range(retry_count):
            try:
                response = self.session.get(
                    url,
                    headers=self.get_headers(),
                    timeout=30
                )
                response.raise_for_status()
                return response
            except requests.RequestException as e:
                if attempt == retry_count - 1:
                    raise
                time.sleep(2 ** attempt)  # 指数退避

有延迟的尊重性爬取

import time
import random
from urllib.parse import urlparse

class PoliteRequester:
    def __init__(self, min_delay: float = 1.0, max_delay: float = 3.0):
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.last_request_per_domain = {}

    def wait_for_domain(self, url: str):
        domain = urlparse(url).netloc
        last_request = self.last_request_per_domain.get(domain, 0)

        elapsed = time.time() - last_request
        delay = random.uniform(self.min_delay, self.max_delay)

        if elapsed < delay:
            time.sleep(delay - elapsed)

        self.last_request_per_domain[domain] = time.time()

伦理考虑

  • 在爬取前总是检查robots.txt
  • 尊重速率限制并在请求间添加延迟
  • 未经同意不爬取个人数据
  • 缓存响应以避免冗余请求
  • 适当时用描述性User-Agent标识自己
  • 如果收到明确的阻止信号,停止