项目6: Web爬虫实战¶
难度: ⭐⭐⭐ 中等 时间: 10-15小时 涉及知识: HTTP请求、HTML解析、数据存储、反爬虫
📖 项目概述¶
项目背景¶
Web爬虫是自动化获取网页数据的重要技术,广泛应用于数据采集、竞品分析、舆情监控等场景。本项目将带你从零开始构建一个完整的Web爬虫系统。
项目目标¶
构建一个完整的Web爬虫系统,能够: - 发送HTTP请求获取网页 - 解析HTML提取数据 - 处理JavaScript渲染 - 实现多线程/异步爬取 - 处理反爬虫机制 - 存储爬取的数据
技术栈¶
- HTTP库: requests, aiohttp
- 解析库: BeautifulSoup4, lxml
- 渲染引擎: Selenium, Playwright
- 并发: asyncio, threading
- 存储: CSV, JSON, MongoDB
- 代理: requests-proxy
🏗️ 项目结构¶
Text Only
web-scraper/
├── scrapers/ # 爬虫模块
│ ├── __init__.py
│ ├── base_scraper.py # 基础爬虫类
│ ├── static_scraper.py # 静态页面爬虫
│ ├── dynamic_scraper.py # 动态页面爬虫
│ └── multi_thread.py # 多线程爬虫
├── parsers/ # 解析器
│ ├── __init__.py
│ ├── html_parser.py # HTML解析
│ ├── json_parser.py # JSON解析
│ └── text_parser.py # 文本解析
├── storage/ # 存储模块
│ ├── __init__.py
│ ├── csv_storage.py # CSV存储
│ ├── json_storage.py # JSON存储
│ └── mongo_storage.py # MongoDB存储
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── proxy.py # 代理管理
│ ├── user_agent.py # User-Agent管理
│ ├── rate_limiter.py # 限速器
│ └── retry.py # 重试机制
├── config.py # 配置文件
├── main.py # 主程序
└── requirements.txt # 依赖文件
🎯 核心功能¶
1. HTTP请求¶
- GET请求: 获取网页内容
- POST请求: 提交表单数据
- 请求头: 自定义请求头
- Cookie管理: 处理Cookie
- Session管理: 维持会话
2. HTML解析¶
- 标签选择: CSS选择器、XPath
- 属性提取: 提取标签属性
- 文本提取: 提取纯文本
- 链接提取: 提取所有链接
- 图片提取: 提取所有图片
3. 动态页面¶
- Selenium: 浏览器自动化
- Playwright: 现代浏览器自动化
- 等待机制: 显式等待、隐式等待
- 交互操作: 点击、输入、滚动
4. 并发爬取¶
- 多线程: threading多线程
- 异步IO: asyncio异步
- 线程池: ThreadPoolExecutor
- 协程池: asyncio.gather
5. 反爬虫¶
- User-Agent: 随机User-Agent
- 代理IP: 代理IP池
- 请求限速: 限速器
- Cookie池: Cookie池管理
- 验证码: 验证码识别
6. 数据存储¶
- CSV存储: 保存为CSV文件
- JSON存储: 保存为JSON文件
- 数据库: 存储到MongoDB
- 增量更新: 增量更新数据
💻 代码实现¶
1. 配置文件 (config.py)¶
Python
"""
Web爬虫配置文件
"""
from dataclasses import dataclass
import random
@dataclass # @dataclass自动生成__init__、__repr__等常用方法
class Config:
"""配置类"""
# 请求配置
timeout: int = 10
max_retries: int = 3
retry_delay: float = 1.0
# 并发配置
max_workers: int = 5
async_max_concurrent: int = 10
# 限速配置
requests_per_second: float = 1.0
burst_size: int = 5
# 代理配置
use_proxy: bool = False
proxy_list: list[str] | None = None
# User-Agent配置
use_random_user_agent: bool = True
user_agent_list: list[str] | None = None
# 存储配置
output_dir: str = "./output"
storage_type: str = "csv" # csv, json, mongo
# MongoDB配置
mongo_uri: str = "mongodb://localhost:27017/"
mongo_db: str = "scraper"
mongo_collection: str = "data"
# Selenium配置
use_selenium: bool = False
selenium_driver: str = "chrome" # chrome, firefox
headless: bool = True
# 日志配置
log_level: str = "INFO"
log_file: str = "./logs/scraper.log"
# 默认User-Agent列表
DEFAULT_USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
]
config = Config()
if config.user_agent_list is None:
config.user_agent_list = DEFAULT_USER_AGENTS
2. 基础爬虫类 (scrapers/base_scraper.py)¶
Python
"""
基础爬虫类
"""
import requests
import time
import logging
from typing import Any
from abc import ABC, abstractmethod # ABC和abstractmethod定义抽象基类,强制子类实现接口
from config import config
from utils.proxy import ProxyManager
from utils.user_agent import UserManager
from utils.rate_limiter import RateLimiter
from utils.retry import retry
class BaseScraper(ABC):
"""基础爬虫抽象类"""
def __init__(self, config):
"""
初始化爬虫
Args:
config: 配置对象
"""
self.config = config
self.session = requests.Session()
# 初始化工具
self.proxy_manager = ProxyManager(config) if config.use_proxy else None
self.user_agent_manager = UserManager(config) if config.use_random_user_agent else None
self.rate_limiter = RateLimiter(
config.requests_per_second,
config.burst_size,
)
# 配置日志
self.logger = logging.getLogger(__name__)
self.logger.setLevel(config.log_level)
# 请求统计
self.request_count = 0
self.success_count = 0
self.fail_count = 0
@retry(max_retries=3, delay=1.0)
def fetch(
self,
url: str,
method: str = "GET",
params: dict | None = None,
data: dict | None = None,
headers: dict | None = None,
**kwargs, # **kwargs接收任意数量的关键字参数
) -> requests.Response | None:
"""
发送HTTP请求
Args:
url: URL地址
method: 请求方法
params: 查询参数
data: 请求数据
headers: 请求头
**kwargs: 其他参数
Returns:
响应对象
"""
# 限速
self.rate_limiter.acquire()
# 准备请求头
request_headers = self._prepare_headers(headers)
# 准备代理
proxies = self._prepare_proxies()
try:
# 发送请求
response = self.session.request(
method=method,
url=url,
params=params,
data=data,
headers=request_headers,
proxies=proxies,
timeout=self.config.timeout,
**kwargs,
)
# 检查响应状态
response.raise_for_status()
# 更新统计
self.request_count += 1
self.success_count += 1
self.logger.info(f"成功请求: {url}")
return response
except requests.RequestException as e:
# 更新统计
self.request_count += 1
self.fail_count += 1
self.logger.error(f"请求失败: {url}, 错误: {e}")
return None
def _prepare_headers(self, headers: dict | None = None) -> dict:
"""
准备请求头
Args:
headers: 自定义请求头
Returns:
完整请求头
"""
request_headers = {}
# 添加User-Agent
if self.user_agent_manager:
request_headers['User-Agent'] = self.user_agent_manager.get_random_user_agent()
# 添加自定义请求头
if headers:
request_headers.update(headers)
return request_headers
def _prepare_proxies(self) -> dict | None:
"""
准备代理
Returns:
代理字典
"""
if self.proxy_manager:
proxy = self.proxy_manager.get_random_proxy()
if proxy:
return {
'http': proxy,
'https': proxy,
}
return None
@abstractmethod
def parse(self, response: requests.Response) -> Any:
"""
解析响应
Args:
response: 响应对象
Returns:
解析后的数据
"""
pass
def scrape(self, url: str) -> Any | None:
"""
爬取单个URL
Args:
url: URL地址
Returns:
解析后的数据
"""
response = self.fetch(url)
if response:
return self.parse(response)
return None
def get_stats(self) -> dict[str, int]:
"""
获取统计信息
Returns:
统计字典
"""
return {
'request_count': self.request_count,
'success_count': self.success_count,
'fail_count': self.fail_count,
'success_rate': self.success_count / self.request_count if self.request_count > 0 else 0,
}
3. 静态页面爬虫 (scrapers/static_scraper.py)¶
Python
"""
静态页面爬虫
"""
import requests
from bs4 import BeautifulSoup
from typing import Any
from scrapers.base_scraper import BaseScraper
class StaticScraper(BaseScraper):
"""静态页面爬虫"""
def __init__(self, config):
"""初始化爬虫"""
super().__init__(config)
def parse(self, response: requests.Response) -> dict[str, Any]:
"""
解析HTML响应
Args:
response: 响应对象
Returns:
解析后的数据
"""
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 提取数据
data = {
'url': response.url,
'status_code': response.status_code,
'title': self._extract_title(soup),
'content': self._extract_content(soup),
'links': self._extract_links(soup, response.url),
'images': self._extract_images(soup, response.url),
'meta': self._extract_meta(soup),
}
return data
def _extract_title(self, soup: BeautifulSoup) -> str:
"""提取标题"""
title_tag = soup.find('title')
return title_tag.text.strip() if title_tag else ""
def _extract_content(self, soup: BeautifulSoup) -> str:
"""提取正文内容"""
# 移除script和style标签
for tag in soup(['script', 'style']):
tag.decompose()
# 提取正文
body = soup.find('body')
return body.get_text(strip=True) if body else ""
def _extract_links(
self,
soup: BeautifulSoup,
base_url: str,
) -> list[str]:
"""提取所有链接"""
links = []
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
# 处理相对URL
if href.startswith('/'):
href = base_url + href
elif not href.startswith('http'):
continue
links.append(href)
return links
def _extract_images(
self,
soup: BeautifulSoup,
base_url: str,
) -> list[str]:
"""提取所有图片"""
images = []
for img_tag in soup.find_all('img', src=True):
src = img_tag['src']
# 处理相对URL
if src.startswith('/'):
src = base_url + src
elif not src.startswith('http'):
continue
images.append(src)
return images
def _extract_meta(self, soup: BeautifulSoup) -> dict[str, str]:
"""提取meta标签"""
meta = {}
for meta_tag in soup.find_all('meta'):
name = meta_tag.get('name') or meta_tag.get('property')
content = meta_tag.get('content')
if name and content:
meta[name] = content
return meta
4. 动态页面爬虫 (scrapers/dynamic_scraper.py)¶
Python
"""
动态页面爬虫
"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from typing import Any
import time
from config import config
class DynamicScraper:
"""动态页面爬虫"""
def __init__(self, config):
"""
初始化爬虫
Args:
config: 配置对象
"""
self.config = config
self.driver = None
self._init_driver()
def _init_driver(self):
"""初始化浏览器驱动"""
if self.config.selenium_driver == 'chrome':
options = Options()
if self.config.headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
elif self.config.selenium_driver == 'firefox':
options = FirefoxOptions()
if self.config.headless:
options.add_argument('--headless')
self.driver = webdriver.Firefox(options=options)
else:
raise ValueError(f"不支持的浏览器: {self.config.selenium_driver}")
def fetch(self, url: str, wait_time: int = 3) -> str:
"""
获取动态页面内容
Args:
url: URL地址
wait_time: 等待时间(秒)
Returns:
页面HTML
"""
try:
# 访问页面
self.driver.get(url)
# 等待页面加载
time.sleep(wait_time)
# 等待特定元素加载(可选)
# WebDriverWait(self.driver, 10).until(
# EC.presence_of_element_located((By.TAG_NAME, "body"))
# )
# 获取页面源码
html = self.driver.page_source
return html
except Exception as e:
print(f"获取页面失败: {e}")
return ""
def scroll_to_bottom(self):
"""滚动到页面底部"""
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
# 滚动到底部
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 等待加载
time.sleep(2)
# 检查是否到达底部
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
def click_element(self, selector: str, by: By = By.CSS_SELECTOR):
"""
点击元素
Args:
selector: 选择器
by: 定位方式
"""
try:
element = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((by, selector))
)
element.click()
time.sleep(1) # 等待响应
except Exception as e:
print(f"点击元素失败: {e}")
def input_text(
self,
selector: str,
text: str,
by: By = By.CSS_SELECTOR,
):
"""
输入文本
Args:
selector: 选择器
text: 输入文本
by: 定位方式
"""
try:
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((by, selector))
)
element.clear()
element.send_keys(text)
except Exception as e:
print(f"输入文本失败: {e}")
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
5. 多线程爬虫 (scrapers/multi_thread.py)¶
Python
"""
多线程爬虫
"""
import threading
from queue import Queue
from typing import Any
from collections.abc import Callable
import concurrent.futures
from scrapers.base_scraper import BaseScraper
class MultiThreadScraper:
"""多线程爬虫"""
def __init__(
self,
scraper_class: type,
config,
max_workers: int = 5,
):
"""
初始化多线程爬虫
Args:
scraper_class: 爬虫类
config: 配置对象
max_workers: 最大工作线程数
"""
self.scraper_class = scraper_class
self.config = config
self.max_workers = max_workers
# 结果队列
self.result_queue = Queue()
# 锁
self.lock = threading.Lock()
def scrape_urls(self, urls: list[str]) -> list[Any]:
"""
爬取多个URL
Args:
urls: URL列表
Returns:
结果列表
"""
results = []
# 使用线程池
with concurrent.futures.ThreadPoolExecutor( # ThreadPoolExecutor线程池,管理和复用线程资源
max_workers=self.max_workers
) as executor:
# 提交任务
future_to_url = {
executor.submit(self._scrape_single, url): url
for url in urls
}
# 获取结果
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
if result:
results.append(result)
except Exception as e:
print(f"爬取失败 {url}: {e}")
return results
def _scrape_single(self, url: str) -> Any:
"""
爬取单个URL
Args:
url: URL地址
Returns:
爬取结果
"""
# 创建爬虫实例
scraper = self.scraper_class(self.config)
# 爬取
return scraper.scrape(url)
6. 工具函数 (utils/retry.py)¶
Python
"""
重试装饰器
"""
import time
import functools
from collections.abc import Callable
def retry(
max_retries: int = 3,
delay: float = 1.0,
exceptions: tuple = (Exception,),
):
"""
重试装饰器
Args:
max_retries: 最大重试次数
delay: 重试延迟
exceptions: 捕获的异常类型
Returns:
装饰器函数
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func) # @wraps保留被装饰函数的元信息(名称、文档字符串等)
def wrapper(*args, **kwargs) -> Any | None:
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
print(f"重试 {attempt + 1}/{max_retries}: {e}")
time.sleep(delay)
# 所有重试都失败
raise last_exception
return wrapper
return decorator
7. 主程序 (main.py)¶
Python
"""
主程序
"""
import os
from config import config
from scrapers.static_scraper import StaticScraper
from scrapers.multi_thread import MultiThreadScraper
from storage.csv_storage import CSVStorage
from storage.json_storage import JSONStorage
from storage.mongo_storage import MongoStorage
def main():
"""主函数"""
# 目标URL列表
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
]
# 创建存储器
if config.storage_type == 'csv':
storage = CSVStorage(config.output_dir)
elif config.storage_type == 'json':
storage = JSONStorage(config.output_dir)
elif config.storage_type == 'mongo':
storage = MongoStorage(
config.mongo_uri,
config.mongo_db,
config.mongo_collection,
)
else:
raise ValueError(f"不支持的存储类型: {config.storage_type}")
# 创建多线程爬虫
scraper = MultiThreadScraper(
StaticScraper,
config,
max_workers=config.max_workers,
)
# 爬取URL
print(f"开始爬取 {len(urls)} 个URL...")
results = scraper.scrape_urls(urls)
# 存储结果
print(f"成功爬取 {len(results)} 个页面")
storage.save(results)
# 打印统计
stats = scraper.scraper_class(config).get_stats()
print(f"\n统计信息:")
print(f" 请求总数: {stats['request_count']}")
print(f" 成功数: {stats['success_count']}")
print(f" 失败数: {stats['fail_count']}")
print(f" 成功率: {stats['success_rate']:.2%}")
if __name__ == "__main__":
main()
🧪 测试方法¶
1. 单元测试¶
Python
"""
单元测试示例
"""
import pytest
from scrapers.static_scraper import StaticScraper
from config import config
def test_static_scraper():
"""测试静态爬虫"""
scraper = StaticScraper(config)
# 测试爬取
url = 'https://httpbin.org/html'
data = scraper.scrape(url)
# 验证结果
assert data is not None
assert 'url' in data
assert 'title' in data
assert 'content' in data
print(f"✓ 静态爬虫测试通过")
print(f" 标题: {data['title']}")
print(f" 链接数: {len(data['links'])}")
2. 集成测试¶
Python
"""
集成测试示例
"""
def test_full_pipeline():
"""测试完整流程"""
from scrapers.multi_thread import MultiThreadScraper
# 创建爬虫
scraper = MultiThreadScraper(
StaticScraper,
config,
max_workers=2,
)
# 测试URL
urls = [
'https://httpbin.org/html',
'https://httpbin.org/robots.txt',
]
# 爬取
results = scraper.scrape_urls(urls)
# 验证结果
assert len(results) > 0
assert all('url' in result for result in results) # all()全部为True才返回True
print(f"✓ 完整流程测试通过")
print(f" 成功爬取: {len(results)} 个页面")
📊 扩展建议¶
1. 功能扩展¶
- 验证码识别: OCR识别验证码
- 分布式爬虫: 分布式爬取
- 增量爬取: 只爬取更新内容
- 数据清洗: 数据清洗和去重
2. 性能优化¶
- 连接池: HTTP连接池
- 异步IO: asyncio异步
- 缓存机制: 缓存已爬取页面
- 压缩传输: 启用gzip压缩
3. 反爬虫增强¶
- IP轮换: 动态IP轮换
- 行为模拟: 模拟人类行为
- Cookie池: Cookie池管理
- 验证码: 第三方验证码服务
📚 学习收获¶
完成本项目后,你将掌握:
- ✅ HTTP请求和响应处理
- ✅ HTML解析和数据提取
- ✅ 动态页面爬取
- ✅ 多线程并发爬取
- ✅ 反爬虫技术
- ✅ 数据存储和管理
- ✅ 完整的Web爬虫系统开发
🔗 参考资源¶
项目完成时间: 10-15小时 难度等级: ⭐⭐⭐ 中等 推荐指数: ⭐⭐⭐⭐⭐