名称:页面监控 描述:网页监控、变化检测和可用性跟踪。用于跟踪内容变化、检测页面宕机、监控更新、在删除前保存内容或为没有RSS的页面生成源。涵盖Visualping、ChangeTower、Distill.io和自托管监控解决方案。
页面监控方法
跟踪网页变化、检测内容删除和重要页面消失前保存的模式。
监控服务比较
| 服务 | 免费层 | 最佳适用 | 存储 | 告警速度 |
|---|---|---|---|---|
| Visualping | 5个页面 | 视觉变化 | 标准 | 分钟 |
| ChangeTower | 是 | 合规、归档 | 12年 | 分钟 |
| Distill.io | 25个页面 | 元素级跟踪 | 12个月 | 秒 |
| Wachete | 有限 | 登录保护页面 | 12个月 | 分钟 |
| UptimeRobot | 50个监视器 | 仅可用性 | 2个月 | 分钟 |
快速开始:监控一个页面
Distill.io元素监控
// Distill.io允许CSS/XPath选择器进行精确监控
// 常见用例的示例选择器:
// 监控新闻文章标题
const newsSelector = '.article-headline, h1.title, .story-title';
// 监控价格变化
const priceSelector = '.price, .product-price, [data-price]';
// 监控库存/可用性
const availabilitySelector = '.in-stock, .availability, .stock-status';
// 监控特定段落或部分
const sectionSelector = '#main-content p:first-child';
// 监控表数据
const tableSelector = 'table.data-table tbody tr';
Python监控脚本
import requests
import hashlib
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup
class PageMonitor:
"""简单的页面变化监控器,带有本地存储。"""
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.state_file = storage_dir / 'monitor_state.json'
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {'pages': {}}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def _get_page_hash(self, url: str, selector: str = None) -> tuple[str, str]:
"""获取页面或元素的内容哈希和内容。"""
response = requests.get(url, timeout=30, headers={
'User-Agent': 'Mozilla/5.0 (PageMonitor/1.0)'
})
response.raise_for_status()
if selector:
soup = BeautifulSoup(response.text, 'html.parser')
element = soup.select_one(selector)
content = element.get_text(strip=True) if element else ''
else:
content = response.text
content_hash = hashlib.sha256(content.encode()).hexdigest()
return content_hash, content
def add_page(self, url: str, name: str, selector: str = None):
"""添加一个页面进行监控。"""
content_hash, content = self._get_page_hash(url, selector)
self.state['pages'][url] = {
'name': name,
'selector': selector,
'last_hash': content_hash,
'last_check': datetime.now().isoformat(),
'last_content': content[:1000], # 存储预览
'change_count': 0
}
self._save_state()
print(f"已添加: {name} ({url})")
def check_page(self, url: str) -> Optional[dict]:
"""检查单个页面的变化。"""
if url not in self.state['pages']:
return None
page = self.state['pages'][url]
selector = page.get('selector')
try:
new_hash, new_content = self._get_page_hash(url, selector)
except Exception as e:
return {
'url': url,
'name': page['name'],
'status': 'error',
'error': str(e)
}
changed = new_hash != page['last_hash']
result = {
'url': url,
'name': page['name'],
'status': 'changed' if changed else 'unchanged',
'previous_content': page['last_content'],
'new_content': new_content[:1000] if changed else None
}
if changed:
page['last_hash'] = new_hash
page['last_content'] = new_content[:1000]
page['change_count'] += 1
# 归档变化
archive_file = self.storage_dir / f"{hashlib.md5(url.encode()).hexdigest()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
archive_file.write_text(new_content)
page['last_check'] = datetime.now().isoformat()
self._save_state()
return result
def check_all(self) -> list[dict]:
"""检查所有监控页面。"""
results = []
for url in self.state['pages']:
result = self.check_page(url)
if result:
results.append(result)
return results
# 用法
monitor = PageMonitor(Path('./page_monitor_data'))
# 添加页面进行监控
monitor.add_page(
'https://example.com/important-page',
'重要页面',
selector='.main-content' # 可选:监控特定元素
)
# 检查变化
results = monitor.check_all()
for result in results:
if result['status'] == 'changed':
print(f"已更改: {result['name']}")
print(f" 之前: {result['previous_content'][:100]}...")
print(f" 新内容: {result['new_content'][:100]}...")
可用性监控
UptimeRobot API集成
import requests
from typing import List, Optional
class UptimeRobotClient:
"""UptimeRobot API客户端,用于监控页面可用性。"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.uptimerobot.com/v2"
def _request(self, endpoint: str, params: dict = None) -> dict:
data = {'api_key': self.api_key}
if params:
data.update(params)
response = requests.post(f"{self.base_url}/{endpoint}", data=data)
return response.json()
def get_monitors(self) -> List[dict]:
"""获取所有监视器。"""
result = self._request('getMonitors')
return result.get('monitors', [])
def create_monitor(self, friendly_name: str, url: str,
monitor_type: int = 1) -> dict:
"""创建新监视器。
类型: 1=HTTP(s), 2=关键词, 3=Ping, 4=端口
"""
return self._request('newMonitor', {
'friendly_name': friendly_name,
'url': url,
'type': monitor_type
})
def get_monitor_uptime(self, monitor_id: int,
custom_uptime_ratios: str = "7-30-90") -> dict:
"""获取监视器的可用性统计。"""
return self._request('getMonitors', {
'monitors': monitor_id,
'custom_uptime_ratios': custom_uptime_ratios
})
def pause_monitor(self, monitor_id: int) -> dict:
"""暂停监视器。"""
return self._request('editMonitor', {
'id': monitor_id,
'status': 0
})
def resume_monitor(self, monitor_id: int) -> dict:
"""恢复监视器。"""
return self._request('editMonitor', {
'id': monitor_id,
'status': 1
})
# 用法
client = UptimeRobotClient('您的API密钥')
# 为重要页面创建监视器
client.create_monitor('新闻首页', 'https://example-news.com')
client.create_monitor('API状态', 'https://api.example.com/health')
# 检查所有监视器
for monitor in client.get_monitors():
status = '在线' if monitor['status'] == 2 else '离线'
print(f"{monitor['friendly_name']}: {status}")
RSS源生成
为没有源的页面生成RSS
import requests
from bs4 import BeautifulSoup
from feedgen.feed import FeedGenerator
from datetime import datetime
import hashlib
class RSSGenerator:
"""从网页生成RSS源。"""
def __init__(self, feed_id: str, title: str, link: str):
self.fg = FeedGenerator()
self.fg.id(feed_id)
self.fg.title(title)
self.fg.link(href=link)
self.fg.description(f'自动生成的{title}源')
def add_from_page(self, url: str, item_selector: str,
title_selector: str, link_selector: str,
description_selector: str = None):
"""解析页面并添加项目到源。
参数:
url: 要解析的页面URL
item_selector: 每个项目容器的CSS选择器
title_selector: 标题的CSS选择器(相对于项目)
link_selector: 链接的CSS选择器(相对于项目)
description_selector: 可选的描述CSS选择器
"""
response = requests.get(url, timeout=30)
soup = BeautifulSoup(response.text, 'html.parser')
items = soup.select(item_selector)
for item in items[:20]: # 限制到20个项目
title_elem = item.select_one(title_selector)
link_elem = item.select_one(link_selector)
if not title_elem or not link_elem:
continue
title = title_elem.get_text(strip=True)
link = link_elem.get('href', '')
# 如果是相对URL,则转换为绝对URL
if link.startswith('/'):
from urllib.parse import urljoin
link = urljoin(url, link)
fe = self.fg.add_entry()
fe.id(hashlib.md5(link.encode()).hexdigest())
fe.title(title)
fe.link(href=link)
if description_selector:
desc_elem = item.select_one(description_selector)
if desc_elem:
fe.description(desc_elem.get_text(strip=True))
fe.published(datetime.now())
def generate_rss(self) -> str:
"""生成RSS XML字符串。"""
return self.fg.rss_str(pretty=True).decode()
def save_rss(self, filepath: str):
"""保存RSS源到文件。"""
self.fg.rss_file(filepath)
# 示例:为没有RSS的新闻站点生成源
rss = RSSGenerator(
'https://example.com/news',
'示例新闻源',
'https://example.com/news'
)
rss.add_from_page(
'https://example.com/news',
item_selector='.news-item',
title_selector='h2 a',
link_selector='h2 a',
description_selector='.summary'
)
# 保存源
rss.save_rss('example_feed.xml')
使用RSS-Bridge(自托管)
# RSS-Bridge为没有源的站点生成源
# 支持Twitter、Instagram、YouTube等许多其他服务
# Docker安装
docker pull rssbridge/rss-bridge
docker run -d -p 3000:80 rssbridge/rss-bridge
# 访问 http://localhost:3000
# 选择桥接器,输入参数,获取RSS源URL
社交媒体监控
使用Twarc归档Twitter/X
# Twarc需要Twitter API凭证
# 安装
# pip install twarc
# 配置
# twarc2 configure
import subprocess
import json
from pathlib import Path
class TwitterArchiver:
"""归档Twitter搜索和时间线。"""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def search(self, query: str, max_results: int = 100) -> Path:
"""搜索推文并保存到文件。"""
output_file = self.output_dir / f"search_{query.replace(' ', '_')}.jsonl"
subprocess.run([
'twarc2', 'search',
'--max-results', str(max_results),
query,
str(output_file)
], check=True)
return output_file
def get_timeline(self, username: str, max_results: int = 100) -> Path:
"""获取用户时间线。"""
output_file = self.output_dir / f"timeline_{username}.jsonl"
subprocess.run([
'twarc2', 'timeline',
'--max-results', str(max_results),
username,
str(output_file)
], check=True)
return output_file
def parse_archive(self, filepath: Path) -> list[dict]:
"""解析归档的推文。"""
tweets = []
with open(filepath) as f:
for line in f:
data = json.loads(line)
if 'data' in data:
tweets.extend(data['data'])
return tweets
Webhook通知
在变化时发送告警
import requests
from typing import Optional
class AlertManager:
"""当监控页面变化时发送告警。"""
def __init__(self, slack_webhook: str = None,
discord_webhook: str = None,
email_config: dict = None):
self.slack_webhook = slack_webhook
self.discord_webhook = discord_webhook
self.email_config = email_config
def send_slack(self, message: str, channel: str = None):
"""发送Slack通知。"""
if not self.slack_webhook:
return
payload = {'text': message}
if channel:
payload['channel'] = channel
requests.post(self.slack_webhook, json=payload)
def send_discord(self, message: str):
"""发送Discord通知。"""
if not self.discord_webhook:
return
requests.post(self.discord_webhook, json={'content': message})
def send_email(self, subject: str, body: str, to: str):
"""发送电子邮件通知。"""
if not self.email_config:
return
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = to
with smtplib.SMTP(self.email_config['smtp_host'],
self.email_config['smtp_port']) as server:
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
server.send_message(msg)
def alert_change(self, page_name: str, url: str,
old_content: str, new_content: str):
"""发送变化告警到所有配置的渠道。"""
message = f"""
页面已更改: {page_name}
URL: {url}
时间: {datetime.now().isoformat()}
之前内容(预览):
{old_content[:200]}...
新内容(预览):
{new_content[:200]}...
"""
if self.slack_webhook:
self.send_slack(message)
if self.discord_webhook:
self.send_discord(message)
使用cron的定时监控
连续监控的Cron设置
# 编辑crontab
crontab -e
# 每15分钟检查页面
*/15 * * * * /usr/bin/python3 /path/to/monitor_script.py >> /var/log/monitor.log 2>&1
# 每5分钟检查关键页面
*/5 * * * * /usr/bin/python3 /path/to/critical_monitor.py >> /var/log/critical.log 2>&1
# 每天上午8点每日摘要报告
0 8 * * * /usr/bin/python3 /path/to/daily_report.py
监控脚本模板
#!/usr/bin/env python3
"""用于cron执行的页面监控脚本。"""
import sys
from pathlib import Path
from datetime import datetime
# 添加项目到路径
sys.path.insert(0, str(Path(__file__).parent))
from monitor import PageMonitor
from alerts import AlertManager
def main():
# 初始化
monitor = PageMonitor(Path('./data'))
alerts = AlertManager(
slack_webhook='https://hooks.slack.com/services/...',
discord_webhook='https://discord.com/api/webhooks/...'
)
# 检查所有页面
results = monitor.check_all()
# 处理结果
changes = [r for r in results if r['status'] == 'changed']
errors = [r for r in results if r['status'] == 'error']
# 告警变化
for change in changes:
alerts.alert_change(
change['name'],
change['url'],
change['previous_content'],
change['new_content']
)
print(f"[{datetime.now()}] 变化: {change['name']}")
# 告警错误
for error in errors:
alerts.send_slack(f"监控错误 {error['name']}: {error['error']}")
print(f"[{datetime.now()}] 错误: {error['name']} - {error['error']}")
# 摘要
print(f"[{datetime.now()}] 检查了 {len(results)} 个页面,"
f"{len(changes)} 个变化,{len(errors)} 个错误")
if __name__ == '__main__':
main()
变化时归档
检测到变化时自动归档
from multiarchiver import MultiArchiver
class ArchivingMonitor(PageMonitor):
"""检测到变化时归档内容的页面监控器。"""
def __init__(self, storage_dir: Path):
super().__init__(storage_dir)
self.archiver = MultiArchiver()
def check_page(self, url: str) -> dict:
"""检查页面并归档如果变化。"""
result = super().check_page(url)
if result and result['status'] == 'changed':
# 归档到多个服务
archive_results = self.archiver.archive_url(url)
successful_archives = [
r.archived_url for r in archive_results
if r.success
]
result['archives'] = successful_archives
# 日志归档URL
print(f"已归档 {url} 到:")
for archive_url in successful_archives:
print(f" - {archive_url}")
return result
按用例监控策略
新闻监控
## 新闻/时事监控
### 要监控的页面:
- 突发新闻部分
- 新闻稿页面
- 政府公告页面
- 公司新闻室
### 监控频率:
- 突发新闻: 每5分钟
- 新闻稿: 每15-30分钟
- 一般新闻: 每小时
### 归档策略:
- 检测后立即归档
- 同时使用Wayback Machine和Archive.today
- 带时间戳保存本地副本
研究监控
## 学术/研究监控
### 要监控的页面:
- 预印本服务器(arXiv、SSRN)
- 期刊目录
- 会议论文集
- 研究者简介
### 监控频率:
- 活跃主题每日
- 一般监控每周
### 推荐工具:
- Google学术告警(免费、内置)
- Semantic Scholar告警
- 可用时的RSS源
- 特定页面的自定义监视器
竞争情报
## 竞争对手监控
### 要监控的页面:
- 定价页面
- 产品页面
- 招聘发布
- 新闻稿
- 高管简介
### 监控频率:
- 定价: 每日
- 产品: 每日
- 招聘: 每周
- 新闻: 每日
### 法律考虑:
- 不违反服务条款
- 不规避访问控制
- 仅公开页面
- 不高频抓取
最佳实践
监控清单
## 监控页面前:
- [ ] 页面是否公开可访问?
- [ ] 是否尊重robots.txt?
- [ ] 监控频率是否合理?
- [ ] 是否有合法目的?
- [ ] 是否安全存储数据?
- [ ] 是否配置了告警?
- [ ] 是否设置了重要页面归档?
## 维护:
- [ ] 每月审查监视器
- [ ] 移除过时监视器
- [ ] 如果页面变化,更新选择器
- [ ] 检查告警传递
- [ ] 验证归档是否工作
速率限制
import time
from functools import wraps
def rate_limit(min_interval: float = 1.0):
"""函数调用速率限制装饰器。"""
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
# 用法
@rate_limit(min_interval=2.0) # 最多每2秒一次
def check_page(url: str):
return requests.get(url)