名称: 网络爬虫 描述: 具有反爬虫绕过、内容提取、未记录API和毒丸检测的网络爬虫。适用于从网站提取内容、处理付费墙、实施爬取级联或处理社交媒体。涵盖请求、trafilatura、Playwright隐身模式、yt-dlp和instaloader模式。
网络爬虫方法论
可靠、伦理的网络爬虫模式,包括后备策略和反爬虫处理。
爬取级联架构
实施多个提取策略,具有自动后备:
from abc import ABC, abstractmethod
from typing import Optional
import requests
from bs4 import BeautifulSoup
import trafilatura
# 用于 .py 文件
from playwright.sync_api import sync_playwright
from playwright_stealth import stealth_sync
# 用于 .ipynb 文件
import asyncio
from playwright.async_api import async_playwright
class ScrapingResult:
def __init__(self, content: str, title: str, method: str):
self.content = content
self.title = title
self.method = method # 跟踪哪个方法成功
class Scraper(ABC):
@abstractmethod
def fetch(self, url: str) -> Optional[ScrapingResult]: ...
class TrafilaturaСscraper(Scraper):
"""快速、轻量级提取标准文章。"""
def fetch(self, url: str) -> Optional[ScrapingResult]:
try:
downloaded = trafilatura.fetch_url(url)
if not downloaded:
return None
content = trafilatura.extract(
downloaded,
include_comments=False,
include_tables=True,
favor_recall=True
)
if not content or len(content) < 100:
return None
# 单独提取标题
soup = BeautifulSoup(downloaded, 'html.parser')
title = soup.find('title')
title_text = title.get_text() if title else ''
return ScrapingResult(content, title_text, 'trafilatura')
except Exception:
return None
class RequestsScraper(Scraper):
"""HTTP请求,带有旋转用户代理。"""
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
]
def fetch(self, url: str) -> Optional[ScrapingResult]:
import random
headers = {
'User-Agent': random.choice(self.USER_AGENTS),
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'en-US,en;q=0.9',
}
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 移除脚本/样式元素
for element in soup(['script', 'style', 'nav', 'footer', 'aside']):
element.decompose()
# 查找主要内容
main = soup.find('main') or soup.find('article') or soup.find('body')
content = main.get_text(separator='
', strip=True) if main else ''
title = soup.find('title')
title_text = title.get_text() if title else ''
if len(content) < 100:
return None
return ScrapingResult(content, title_text, 'requests')
except Exception:
return None
class PlaywrightScraper(Scraper):
"""重JavaScript渲染,带有隐身模式以绕过反爬虫。"""
def fetch(self, url: str) -> Optional[ScrapingResult]:
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
)
page = context.new_page()
# 应用隐身以避免检测
stealth_sync(page)
page.goto(url, wait_until='networkidle', timeout=60000)
# 等待内容加载
page.wait_for_timeout(2000)
# 提取内容
content = page.evaluate('''() => {
const article = document.querySelector('article, main, .content, #content');
return article ? article.innerText : document.body.innerText;
}''')
title = page.title()
browser.close()
if len(content) < 100:
return None
return ScrapingResult(content, title, 'playwright')
except Exception:
return None
class PlaywrightScraperAsync:
"""异步Playwright爬虫,用于Jupyter笔记本(.ipynb文件)。
Jupyter笔记本运行自己的事件循环,因此同步Playwright无法工作。
在笔记本单元格中使用此异步版本与`await`。
"""
async def fetch(self, url: str) -> Optional[ScrapingResult]:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
)
page = await context.new_page()
# 注意:playwright-stealth异步版本
# from playwright_stealth import stealth_async
# await stealth_async(page)
await page.goto(url, wait_until='networkidle', timeout=60000)
# 等待内容加载
await page.wait_for_timeout(2000)
# 提取内容
content = await page.evaluate('''() => {
const article = document.querySelector('article, main, .content, #content');
return article ? article.innerText : document.body.innerText;
}''')
title = await page.title()
await browser.close()
if len(content) < 100:
return None
return ScrapingResult(content, title, 'playwright_async')
except Exception:
return None
# 在Jupyter笔记本单元格中的用法:
# scraper = PlaywrightScraperAsync()
# result = await scraper.fetch('https://example.com')
class ScrapingCascade:
"""按顺序尝试多个爬虫,直到一个成功。"""
def __init__(self):
self.scrapers = [
TrafilaturaСscraper(),
RequestsScraper(),
PlaywrightScraper(),
]
def fetch(self, url: str) -> Optional[ScrapingResult]:
for scraper in self.scrapers:
result = scraper.fetch(url)
if result:
return result
return None
未记录API
查找未记录API
使用浏览器开发者工具发现API:
- 打开开发者工具(右键单击 → 检查,或F12)
- 转到网络选项卡以监视所有请求
- 通过Fetch/XHR过滤以仅显示API调用
- 触发您想要捕获的操作(搜索、滚动、点击)
- 分析响应 — 通常是带键值对的JSON
- 复制为cURL(右键单击请求)
- 使用curlconverter.com转换为代码
简化API请求
当您从开发者工具复制cURL时,它包含许多参数。通过以下方式简化:
- 移除不必要的cookie — 首先测试没有它们
- 保留认证令牌如果需要
- 识别您可以修改的输入参数(如搜索词的
prefix) - 测试参数值 — 一些会过期,所以定期验证
示例:反向工程自动完成API
import requests
import time
def search_suggestions(keyword: str) -> dict:
"""
从未记录API获取自动完成的搜索建议。
从浏览器开发者工具捕获中简化。
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:100.0) Gecko/20100101 Firefox/100.0',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'en-US,en;q=0.5',
}
params = {
'prefix': keyword,
'suggestion-type': ['WIDGET', 'KEYWORD'],
'alias': 'aps',
'plain-mid': '1',
}
response = requests.get(
'https://completion.amazon.com/api/2017/suggestions',
params=params,
headers=headers
)
return response.json()
# 收集多个关键词的建议
keywords = ['a', 'b', 'cookie', 'sock']
data = []
for keyword in keywords:
suggestions = search_suggestions(keyword)
suggestions['search_word'] = keyword # 跟踪种子关键词
time.sleep(1) # 限制自己的速率
data.extend(suggestions.get('suggestions', []))
来源:Leon Yin, “Finding Undocumented APIs,” Inspect Element, 2023
毒丸检测
检测付费墙、反爬虫页面和其他失败:
from dataclasses import dataclass
from enum import Enum
import re
class PoisonPillType(Enum):
PAYWALL = 'paywall'
CAPTCHA = 'captcha'
RATE_LIMIT = 'rate_limit'
CLOUDFLARE = 'cloudflare'
LOGIN_REQUIRED = 'login_required'
NOT_FOUND = 'not_found'
NONE = 'none'
@dataclass
class PoisonPillResult:
detected: bool
type: PoisonPillType
confidence: float
details: str
class PoisonPillDetector:
PATTERNS = {
PoisonPillType.PAYWALL: [
r'subscribe to continue',
r'subscription required',
r'become a member',
r'sign up to read',
r'you\'ve reached your limit',
r'article limit reached',
],
PoisonPillType.CAPTCHA: [
r'verify you are human',
r'captcha',
r'robot verification',
r'prove you\'re not a robot',
],
PoisonPillType.RATE_LIMIT: [
r'too many requests',
r'rate limit exceeded',
r'slow down',
r'429',
],
PoisonPillType.CLOUDFLARE: [
r'checking your browser',
r'cloudflare',
r'ddos protection',
r'please wait while we verify',
],
PoisonPillType.LOGIN_REQUIRED: [
r'sign in to continue',
r'log in required',
r'create an account',
],
}
PAYWALL_DOMAINS = {
'nytimes.com': PoisonPillType.PAYWALL,
'wsj.com': PoisonPillType.PAYWALL,
'washingtonpost.com': PoisonPillType.PAYWALL,
'ft.com': PoisonPillType.PAYWALL,
'bloomberg.com': PoisonPillType.PAYWALL,
}
def detect(self, url: str, content: str, status_code: int = 200) -> PoisonPillResult:
# 检查状态码
if status_code == 429:
return PoisonPillResult(True, PoisonPillType.RATE_LIMIT, 1.0, 'HTTP 429')
if status_code == 403:
return PoisonPillResult(True, PoisonPillType.CLOUDFLARE, 0.8, 'HTTP 403')
if status_code == 404:
return PoisonPillResult(True, PoisonPillType.NOT_FOUND, 1.0, 'HTTP 404')
# 检查已知付费墙域名
from urllib.parse import urlparse
domain = urlparse(url).netloc.replace('www.', '')
for paywall_domain, pill_type in self.PAYWALL_DOMAINS.items():
if paywall_domain in domain:
# 检查内容是否异常短(付费墙截断)
if len(content) < 500:
return PoisonPillResult(True, pill_type, 0.9, f'来自{domain}的短内容')
# 模式匹配
content_lower = content.lower()
for pill_type, patterns in self.PATTERNS.items():
for pattern in patterns:
if re.search(pattern, content_lower):
return PoisonPillResult(True, pill_type, 0.7, f'模式匹配: {pattern}')
return PoisonPillResult(False, PoisonPillType.NONE, 0.0, '')
社交媒体爬取
使用yt-dlp的YouTube
import yt_dlp
from pathlib import Path
def download_video_metadata(url: str) -> dict:
"""提取元数据而不下载视频。"""
ydl_opts = {
'skip_download': True,
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
'title': info.get('title'),
'description': info.get('description'),
'duration': info.get('duration'),
'upload_date': info.get('upload_date'),
'view_count': info.get('view_count'),
'channel': info.get('channel'),
'thumbnail': info.get('thumbnail'),
}
def download_video(url: str, output_dir: Path, audio_only: bool = False) -> Path:
"""下载视频或音频。"""
output_template = str(output_dir / '%(title)s.%(ext)s')
ydl_opts = {
'outtmpl': output_template,
'quiet': True,
}
if audio_only:
ydl_opts['format'] = 'bestaudio/best'
ydl_opts['postprocessors'] = [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
}]
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if audio_only:
filename = filename.rsplit('.', 1)[0] + '.mp3'
return Path(filename)
def get_transcript(url: str) -> list[dict]:
"""提取自动生成或手动字幕。"""
ydl_opts = {
'skip_download': True,
'writesubtitles': True,
'writeautomaticsub': True,
'subtitleslangs': ['en'],
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
# 检查字幕
subtitles = info.get('subtitles', {})
auto_captions = info.get('automatic_captions', {})
# 偏好手动字幕而非自动生成
subs = subtitles.get('en') or auto_captions.get('en')
if not subs:
return []
# 获取vtt或json格式
for sub in subs:
if sub['ext'] in ['vtt', 'json3']:
# 下载并解析字幕文件
# ... 实现取决于格式
pass
return []
使用instaloader的Instagram
import instaloader
from pathlib import Path
class InstagramScraper:
def __init__(self, username: str = None, session_file: str = None):
self.loader = instaloader.Instaloader(
download_videos=True,
download_video_thumbnails=False,
download_geotags=False,
download_comments=False,
save_metadata=True,
compress_json=False,
)
if session_file and Path(session_file).exists():
self.loader.load_session_from_file(username, session_file)
def get_profile_posts(self, username: str, limit: int = 50) -> list[dict]:
"""从个人资料获取最近的帖子。"""
profile = instaloader.Profile.from_username(self.loader.context, username)
posts = []
for i, post in enumerate(profile.get_posts()):
if i >= limit:
break
posts.append({
'shortcode': post.shortcode,
'url': f'https://instagram.com/p/{post.shortcode}/',
'caption': post.caption,
'timestamp': post.date_utc.isoformat(),
'likes': post.likes,
'comments': post.comments,
'is_video': post.is_video,
'video_url': post.video_url if post.is_video else None,
})
return posts
def download_post(self, shortcode: str, output_dir: Path):
"""下载单个帖子的媒体。"""
post = instaloader.Post.from_shortcode(self.loader.context, shortcode)
self.loader.download_post(post, target=str(output_dir))
使用yt-dlp的TikTok
def scrape_tiktok_profile(username: str, output_dir: Path, limit: int = 50) -> list[dict]:
"""爬取TikTok个人资料视频。"""
profile_url = f'https://tiktok.com/@{username}'
ydl_opts = {
'quiet': True,
'extract_flat': True, # 不下载,仅获取信息
'playlistend': limit,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(profile_url, download=False)
videos = []
for entry in info.get('entries', []):
videos.append({
'id': entry.get('id'),
'title': entry.get('title'),
'url': entry.get('url'),
'timestamp': entry.get('timestamp'),
'view_count': entry.get('view_count'),
})
return videos
def download_tiktok_video(url: str, output_dir: Path) -> Path:
"""下载单个TikTok视频。"""
ydl_opts = {
'outtmpl': str(output_dir / '%(id)s.%(ext)s'),
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
return Path(ydl.prepare_filename(info))
请求模式
旋转用户代理和标头
import random
from fake_useragent import UserAgent
class RequestManager:
def __init__(self):
self.ua = UserAgent()
self.session = requests.Session()
def get_headers(self) -> dict:
return {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def fetch(self, url: str, retry_count: int = 3) -> requests.Response:
for attempt in range(retry_count):
try:
response = self.session.get(
url,
headers=self.get_headers(),
timeout=30
)
response.raise_for_status()
return response
except requests.RequestException as e:
if attempt == retry_count - 1:
raise
time.sleep(2 ** attempt) # 指数退避
有延迟的尊重性爬取
import time
import random
from urllib.parse import urlparse
class PoliteRequester:
def __init__(self, min_delay: float = 1.0, max_delay: float = 3.0):
self.min_delay = min_delay
self.max_delay = max_delay
self.last_request_per_domain = {}
def wait_for_domain(self, url: str):
domain = urlparse(url).netloc
last_request = self.last_request_per_domain.get(domain, 0)
elapsed = time.time() - last_request
delay = random.uniform(self.min_delay, self.max_delay)
if elapsed < delay:
time.sleep(delay - elapsed)
self.last_request_per_domain[domain] = time.time()
伦理考虑
- 在爬取前总是检查
robots.txt - 尊重速率限制并在请求间添加延迟
- 未经同意不爬取个人数据
- 缓存响应以避免冗余请求
- 适当时用描述性User-Agent标识自己
- 如果收到明确的阻止信号,停止