跳转至

项目6: Web爬虫实战

难度: ⭐⭐⭐ 中等 时间: 10-15小时 涉及知识: HTTP请求、HTML解析、数据存储、反爬虫


📖 项目概述

项目背景

Web爬虫是自动化获取网页数据的重要技术,广泛应用于数据采集、竞品分析、舆情监控等场景。本项目将带你从零开始构建一个完整的Web爬虫系统。

项目目标

构建一个完整的Web爬虫系统,能够: - 发送HTTP请求获取网页 - 解析HTML提取数据 - 处理JavaScript渲染 - 实现多线程/异步爬取 - 处理反爬虫机制 - 存储爬取的数据

技术栈

  • HTTP库: requests, aiohttp
  • 解析库: BeautifulSoup4, lxml
  • 渲染引擎: Selenium, Playwright
  • 并发: asyncio, threading
  • 存储: CSV, JSON, MongoDB
  • 代理: requests-proxy

🏗️ 项目结构

Text Only
web-scraper/
├── scrapers/                # 爬虫模块
│   ├── __init__.py
│   ├── base_scraper.py    # 基础爬虫类
│   ├── static_scraper.py  # 静态页面爬虫
│   ├── dynamic_scraper.py # 动态页面爬虫
│   └── multi_thread.py   # 多线程爬虫
├── parsers/                 # 解析器
│   ├── __init__.py
│   ├── html_parser.py    # HTML解析
│   ├── json_parser.py    # JSON解析
│   └── text_parser.py    # 文本解析
├── storage/                 # 存储模块
│   ├── __init__.py
│   ├── csv_storage.py    # CSV存储
│   ├── json_storage.py   # JSON存储
│   └── mongo_storage.py  # MongoDB存储
├── utils/                   # 工具函数
│   ├── __init__.py
│   ├── proxy.py          # 代理管理
│   ├── user_agent.py     # User-Agent管理
│   ├── rate_limiter.py   # 限速器
│   └── retry.py         # 重试机制
├── config.py                # 配置文件
├── main.py                  # 主程序
└── requirements.txt          # 依赖文件

🎯 核心功能

1. HTTP请求

  • GET请求: 获取网页内容
  • POST请求: 提交表单数据
  • 请求头: 自定义请求头
  • Cookie管理: 处理Cookie
  • Session管理: 维持会话

2. HTML解析

  • 标签选择: CSS选择器、XPath
  • 属性提取: 提取标签属性
  • 文本提取: 提取纯文本
  • 链接提取: 提取所有链接
  • 图片提取: 提取所有图片

3. 动态页面

  • Selenium: 浏览器自动化
  • Playwright: 现代浏览器自动化
  • 等待机制: 显式等待、隐式等待
  • 交互操作: 点击、输入、滚动

4. 并发爬取

  • 多线程: threading多线程
  • 异步IO: asyncio异步
  • 线程池: ThreadPoolExecutor
  • 协程池: asyncio.gather

5. 反爬虫

  • User-Agent: 随机User-Agent
  • 代理IP: 代理IP池
  • 请求限速: 限速器
  • Cookie池: Cookie池管理
  • 验证码: 验证码识别

6. 数据存储

  • CSV存储: 保存为CSV文件
  • JSON存储: 保存为JSON文件
  • 数据库: 存储到MongoDB
  • 增量更新: 增量更新数据

💻 代码实现

1. 配置文件 (config.py)

Python
"""
Web爬虫配置文件
"""
from dataclasses import dataclass
import random

@dataclass  # @dataclass自动生成__init__、__repr__等常用方法
class Config:
    """配置类"""

    # 请求配置
    timeout: int = 10
    max_retries: int = 3
    retry_delay: float = 1.0

    # 并发配置
    max_workers: int = 5
    async_max_concurrent: int = 10

    # 限速配置
    requests_per_second: float = 1.0
    burst_size: int = 5

    # 代理配置
    use_proxy: bool = False
    proxy_list: list[str] | None = None

    # User-Agent配置
    use_random_user_agent: bool = True
    user_agent_list: list[str] | None = None

    # 存储配置
    output_dir: str = "./output"
    storage_type: str = "csv"  # csv, json, mongo

    # MongoDB配置
    mongo_uri: str = "mongodb://localhost:27017/"
    mongo_db: str = "scraper"
    mongo_collection: str = "data"

    # Selenium配置
    use_selenium: bool = False
    selenium_driver: str = "chrome"  # chrome, firefox
    headless: bool = True

    # 日志配置
    log_level: str = "INFO"
    log_file: str = "./logs/scraper.log"

# 默认User-Agent列表
DEFAULT_USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
]

config = Config()
if config.user_agent_list is None:
    config.user_agent_list = DEFAULT_USER_AGENTS

2. 基础爬虫类 (scrapers/base_scraper.py)

Python
"""
基础爬虫类
"""
import requests
import time
import logging
from typing import Any
from abc import ABC, abstractmethod  # ABC和abstractmethod定义抽象基类,强制子类实现接口

from config import config
from utils.proxy import ProxyManager
from utils.user_agent import UserManager
from utils.rate_limiter import RateLimiter
from utils.retry import retry

class BaseScraper(ABC):
    """基础爬虫抽象类"""

    def __init__(self, config):
        """
        初始化爬虫

        Args:
            config: 配置对象
        """
        self.config = config
        self.session = requests.Session()

        # 初始化工具
        self.proxy_manager = ProxyManager(config) if config.use_proxy else None
        self.user_agent_manager = UserManager(config) if config.use_random_user_agent else None
        self.rate_limiter = RateLimiter(
            config.requests_per_second,
            config.burst_size,
        )

        # 配置日志
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(config.log_level)

        # 请求统计
        self.request_count = 0
        self.success_count = 0
        self.fail_count = 0

    @retry(max_retries=3, delay=1.0)
    def fetch(
        self,
        url: str,
        method: str = "GET",
        params: dict | None = None,
        data: dict | None = None,
        headers: dict | None = None,
        **kwargs,  # **kwargs接收任意数量的关键字参数
    ) -> requests.Response | None:
        """
        发送HTTP请求

        Args:
            url: URL地址
            method: 请求方法
            params: 查询参数
            data: 请求数据
            headers: 请求头
            **kwargs: 其他参数

        Returns:
            响应对象
        """
        # 限速
        self.rate_limiter.acquire()

        # 准备请求头
        request_headers = self._prepare_headers(headers)

        # 准备代理
        proxies = self._prepare_proxies()

        try:
            # 发送请求
            response = self.session.request(
                method=method,
                url=url,
                params=params,
                data=data,
                headers=request_headers,
                proxies=proxies,
                timeout=self.config.timeout,
                **kwargs,
            )

            # 检查响应状态
            response.raise_for_status()

            # 更新统计
            self.request_count += 1
            self.success_count += 1

            self.logger.info(f"成功请求: {url}")

            return response

        except requests.RequestException as e:
            # 更新统计
            self.request_count += 1
            self.fail_count += 1

            self.logger.error(f"请求失败: {url}, 错误: {e}")

            return None

    def _prepare_headers(self, headers: dict | None = None) -> dict:
        """
        准备请求头

        Args:
            headers: 自定义请求头

        Returns:
            完整请求头
        """
        request_headers = {}

        # 添加User-Agent
        if self.user_agent_manager:
            request_headers['User-Agent'] = self.user_agent_manager.get_random_user_agent()

        # 添加自定义请求头
        if headers:
            request_headers.update(headers)

        return request_headers

    def _prepare_proxies(self) -> dict | None:
        """
        准备代理

        Returns:
            代理字典
        """
        if self.proxy_manager:
            proxy = self.proxy_manager.get_random_proxy()
            if proxy:
                return {
                    'http': proxy,
                    'https': proxy,
                }
        return None

    @abstractmethod
    def parse(self, response: requests.Response) -> Any:
        """
        解析响应

        Args:
            response: 响应对象

        Returns:
            解析后的数据
        """
        pass

    def scrape(self, url: str) -> Any | None:
        """
        爬取单个URL

        Args:
            url: URL地址

        Returns:
            解析后的数据
        """
        response = self.fetch(url)
        if response:
            return self.parse(response)
        return None

    def get_stats(self) -> dict[str, int]:
        """
        获取统计信息

        Returns:
            统计字典
        """
        return {
            'request_count': self.request_count,
            'success_count': self.success_count,
            'fail_count': self.fail_count,
            'success_rate': self.success_count / self.request_count if self.request_count > 0 else 0,
        }

3. 静态页面爬虫 (scrapers/static_scraper.py)

Python
"""
静态页面爬虫
"""
import requests
from bs4 import BeautifulSoup
from typing import Any

from scrapers.base_scraper import BaseScraper

class StaticScraper(BaseScraper):
    """静态页面爬虫"""

    def __init__(self, config):
        """初始化爬虫"""
        super().__init__(config)

    def parse(self, response: requests.Response) -> dict[str, Any]:
        """
        解析HTML响应

        Args:
            response: 响应对象

        Returns:
            解析后的数据
        """
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')

        # 提取数据
        data = {
            'url': response.url,
            'status_code': response.status_code,
            'title': self._extract_title(soup),
            'content': self._extract_content(soup),
            'links': self._extract_links(soup, response.url),
            'images': self._extract_images(soup, response.url),
            'meta': self._extract_meta(soup),
        }

        return data

    def _extract_title(self, soup: BeautifulSoup) -> str:
        """提取标题"""
        title_tag = soup.find('title')
        return title_tag.text.strip() if title_tag else ""

    def _extract_content(self, soup: BeautifulSoup) -> str:
        """提取正文内容"""
        # 移除script和style标签
        for tag in soup(['script', 'style']):
            tag.decompose()

        # 提取正文
        body = soup.find('body')
        return body.get_text(strip=True) if body else ""

    def _extract_links(
        self,
        soup: BeautifulSoup,
        base_url: str,
    ) -> list[str]:
        """提取所有链接"""
        links = []

        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']

            # 处理相对URL
            if href.startswith('/'):
                href = base_url + href
            elif not href.startswith('http'):
                continue

            links.append(href)

        return links

    def _extract_images(
        self,
        soup: BeautifulSoup,
        base_url: str,
    ) -> list[str]:
        """提取所有图片"""
        images = []

        for img_tag in soup.find_all('img', src=True):
            src = img_tag['src']

            # 处理相对URL
            if src.startswith('/'):
                src = base_url + src
            elif not src.startswith('http'):
                continue

            images.append(src)

        return images

    def _extract_meta(self, soup: BeautifulSoup) -> dict[str, str]:
        """提取meta标签"""
        meta = {}

        for meta_tag in soup.find_all('meta'):
            name = meta_tag.get('name') or meta_tag.get('property')
            content = meta_tag.get('content')

            if name and content:
                meta[name] = content

        return meta

4. 动态页面爬虫 (scrapers/dynamic_scraper.py)

Python
"""
动态页面爬虫
"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from typing import Any
import time

from config import config

class DynamicScraper:
    """动态页面爬虫"""

    def __init__(self, config):
        """
        初始化爬虫

        Args:
            config: 配置对象
        """
        self.config = config
        self.driver = None
        self._init_driver()

    def _init_driver(self):
        """初始化浏览器驱动"""
        if self.config.selenium_driver == 'chrome':
            options = Options()
            if self.config.headless:
                options.add_argument('--headless')
            options.add_argument('--no-sandbox')
            options.add_argument('--disable-dev-shm-usage')
            self.driver = webdriver.Chrome(options=options)

        elif self.config.selenium_driver == 'firefox':
            options = FirefoxOptions()
            if self.config.headless:
                options.add_argument('--headless')
            self.driver = webdriver.Firefox(options=options)

        else:
            raise ValueError(f"不支持的浏览器: {self.config.selenium_driver}")

    def fetch(self, url: str, wait_time: int = 3) -> str:
        """
        获取动态页面内容

        Args:
            url: URL地址
            wait_time: 等待时间(秒)

        Returns:
            页面HTML
        """
        try:
            # 访问页面
            self.driver.get(url)

            # 等待页面加载
            time.sleep(wait_time)

            # 等待特定元素加载(可选)
            # WebDriverWait(self.driver, 10).until(
            #     EC.presence_of_element_located((By.TAG_NAME, "body"))
            # )

            # 获取页面源码
            html = self.driver.page_source

            return html

        except Exception as e:
            print(f"获取页面失败: {e}")
            return ""

    def scroll_to_bottom(self):
        """滚动到页面底部"""
        last_height = self.driver.execute_script("return document.body.scrollHeight")

        while True:
            # 滚动到底部
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # 等待加载
            time.sleep(2)

            # 检查是否到达底部
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break

            last_height = new_height

    def click_element(self, selector: str, by: By = By.CSS_SELECTOR):
        """
        点击元素

        Args:
            selector: 选择器
            by: 定位方式
        """
        try:
            element = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((by, selector))
            )
            element.click()
            time.sleep(1)  # 等待响应
        except Exception as e:
            print(f"点击元素失败: {e}")

    def input_text(
        self,
        selector: str,
        text: str,
        by: By = By.CSS_SELECTOR,
    ):
        """
        输入文本

        Args:
            selector: 选择器
            text: 输入文本
            by: 定位方式
        """
        try:
            element = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((by, selector))
            )
            element.clear()
            element.send_keys(text)
        except Exception as e:
            print(f"输入文本失败: {e}")

    def close(self):
        """关闭浏览器"""
        if self.driver:
            self.driver.quit()

5. 多线程爬虫 (scrapers/multi_thread.py)

Python
"""
多线程爬虫
"""
import threading
from queue import Queue
from typing import Any
from collections.abc import Callable
import concurrent.futures

from scrapers.base_scraper import BaseScraper

class MultiThreadScraper:
    """多线程爬虫"""

    def __init__(
        self,
        scraper_class: type,
        config,
        max_workers: int = 5,
    ):
        """
        初始化多线程爬虫

        Args:
            scraper_class: 爬虫类
            config: 配置对象
            max_workers: 最大工作线程数
        """
        self.scraper_class = scraper_class
        self.config = config
        self.max_workers = max_workers

        # 结果队列
        self.result_queue = Queue()

        # 锁
        self.lock = threading.Lock()

    def scrape_urls(self, urls: list[str]) -> list[Any]:
        """
        爬取多个URL

        Args:
            urls: URL列表

        Returns:
            结果列表
        """
        results = []

        # 使用线程池
        with concurrent.futures.ThreadPoolExecutor(  # ThreadPoolExecutor线程池,管理和复用线程资源
            max_workers=self.max_workers
        ) as executor:
            # 提交任务
            future_to_url = {
                executor.submit(self._scrape_single, url): url
                for url in urls
            }

            # 获取结果
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    if result:
                        results.append(result)
                except Exception as e:
                    print(f"爬取失败 {url}: {e}")

        return results

    def _scrape_single(self, url: str) -> Any:
        """
        爬取单个URL

        Args:
            url: URL地址

        Returns:
            爬取结果
        """
        # 创建爬虫实例
        scraper = self.scraper_class(self.config)

        # 爬取
        return scraper.scrape(url)

6. 工具函数 (utils/retry.py)

Python
"""
重试装饰器
"""
import time
import functools
from collections.abc import Callable

def retry(
    max_retries: int = 3,
    delay: float = 1.0,
    exceptions: tuple = (Exception,),
):
    """
    重试装饰器

    Args:
        max_retries: 最大重试次数
        delay: 重试延迟
        exceptions: 捕获的异常类型

    Returns:
        装饰器函数
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)  # @wraps保留被装饰函数的元信息(名称、文档字符串等)
        def wrapper(*args, **kwargs) -> Any | None:
            last_exception = None

            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        print(f"重试 {attempt + 1}/{max_retries}: {e}")
                        time.sleep(delay)

            # 所有重试都失败
            raise last_exception

        return wrapper
    return decorator

7. 主程序 (main.py)

Python
"""
主程序
"""
import os

from config import config
from scrapers.static_scraper import StaticScraper
from scrapers.multi_thread import MultiThreadScraper
from storage.csv_storage import CSVStorage
from storage.json_storage import JSONStorage
from storage.mongo_storage import MongoStorage

def main():
    """主函数"""
    # 目标URL列表
    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3',
    ]

    # 创建存储器
    if config.storage_type == 'csv':
        storage = CSVStorage(config.output_dir)
    elif config.storage_type == 'json':
        storage = JSONStorage(config.output_dir)
    elif config.storage_type == 'mongo':
        storage = MongoStorage(
            config.mongo_uri,
            config.mongo_db,
            config.mongo_collection,
        )
    else:
        raise ValueError(f"不支持的存储类型: {config.storage_type}")

    # 创建多线程爬虫
    scraper = MultiThreadScraper(
        StaticScraper,
        config,
        max_workers=config.max_workers,
    )

    # 爬取URL
    print(f"开始爬取 {len(urls)} 个URL...")
    results = scraper.scrape_urls(urls)

    # 存储结果
    print(f"成功爬取 {len(results)} 个页面")
    storage.save(results)

    # 打印统计
    stats = scraper.scraper_class(config).get_stats()
    print(f"\n统计信息:")
    print(f"  请求总数: {stats['request_count']}")
    print(f"  成功数: {stats['success_count']}")
    print(f"  失败数: {stats['fail_count']}")
    print(f"  成功率: {stats['success_rate']:.2%}")

if __name__ == "__main__":
    main()

🧪 测试方法

1. 单元测试

Python
"""
单元测试示例
"""
import pytest
from scrapers.static_scraper import StaticScraper
from config import config

def test_static_scraper():
    """测试静态爬虫"""
    scraper = StaticScraper(config)

    # 测试爬取
    url = 'https://httpbin.org/html'
    data = scraper.scrape(url)

    # 验证结果
    assert data is not None
    assert 'url' in data
    assert 'title' in data
    assert 'content' in data

    print(f"✓ 静态爬虫测试通过")
    print(f"  标题: {data['title']}")
    print(f"  链接数: {len(data['links'])}")

2. 集成测试

Python
"""
集成测试示例
"""
def test_full_pipeline():
    """测试完整流程"""
    from scrapers.multi_thread import MultiThreadScraper

    # 创建爬虫
    scraper = MultiThreadScraper(
        StaticScraper,
        config,
        max_workers=2,
    )

    # 测试URL
    urls = [
        'https://httpbin.org/html',
        'https://httpbin.org/robots.txt',
    ]

    # 爬取
    results = scraper.scrape_urls(urls)

    # 验证结果
    assert len(results) > 0
    assert all('url' in result for result in results)  # all()全部为True才返回True

    print(f"✓ 完整流程测试通过")
    print(f"  成功爬取: {len(results)} 个页面")

📊 扩展建议

1. 功能扩展

  • 验证码识别: OCR识别验证码
  • 分布式爬虫: 分布式爬取
  • 增量爬取: 只爬取更新内容
  • 数据清洗: 数据清洗和去重

2. 性能优化

  • 连接池: HTTP连接池
  • 异步IO: asyncio异步
  • 缓存机制: 缓存已爬取页面
  • 压缩传输: 启用gzip压缩

3. 反爬虫增强

  • IP轮换: 动态IP轮换
  • 行为模拟: 模拟人类行为
  • Cookie池: Cookie池管理
  • 验证码: 第三方验证码服务

📚 学习收获

完成本项目后,你将掌握:

  • ✅ HTTP请求和响应处理
  • ✅ HTML解析和数据提取
  • ✅ 动态页面爬取
  • ✅ 多线程并发爬取
  • ✅ 反爬虫技术
  • ✅ 数据存储和管理
  • ✅ 完整的Web爬虫系统开发

🔗 参考资源


项目完成时间: 10-15小时 难度等级: ⭐⭐⭐ 中等 推荐指数: ⭐⭐⭐⭐⭐