11.Scrapy框架

11.Scrapy框架

明廷盛 嘻嘻😁

第一章 关于python虚拟环境

[[2.需要用到的python基础知识#十一. Python环境]]

第二章 Scrapy基本知识

Scrapy官方文档: https://docs.scrapy.org/en/latest/topics/commands.html (就看英文的, 中文看了能笑死人🤣)
Request和Response对象: https://docs.scrapy.org/en/latest/topics/request-response.html

第一节 理论知识

2.1.1 为什么学Scrapy框架

  1. 代码量比reqeusts少70%
  2. 支持异步(Twisted异步网络框架)

2.1.2 异步和非阻塞

  • & 说明: 阻塞是指需要等待请求结果而不能继续执行下面的语句(比如多线程中, 线程A遇到了网络请求需要等待, 虽然可以切到线程B继续执行, 但对于A来说, 还是阻塞的, 所以多线程是阻塞的; 而异步编程async, 是无需等待的, 直接可以运行下面的语句,等浏览器响应后, 会调用回调函数告知从而获取结果)
    Pasted image 20250123113552|850

2.1.3 适合同步的爬虫工作流程

Pasted image 20250123113638|800

2.1.4 适合异步的爬虫工作流程

Pasted image 20250123113701|775

2.1.5 Scrapy框架的工作流程♒

微观图

Pasted image 20250127104230|925Pasted image 20250125201528|446

单个正常请求(200)
  • Engine向Scheduler要url*(实际上是Request对象)*
  • scheduler从请求队列中拿url, 并返回给Engine
  • Engine经过下载中间件 到Downloader
  • Downloader处理请求【如果请求这个正常200】
  • 经过下载中间件返回给Engine
  • 后续: Engine经过爬虫中间件给Spider, spider做后续处理
    recording|950
宏观图

Pasted image 20250123114448|525

第二节 Hello Scrapy

Pasted image 20250123094327

STEP1:下载scrapy

  • & 说明: 直接pip install scrapy

STEP2: 搭建scrapy框架

  • $ 语法: scrapy startproject 项目名称
  • ! 注意: 该命令会①生成一个first_scrapy文件夹 ②在这个文件夹中还会生成first_scrapy的包 ③这个包下才是真正的目录结构 [[2.需要用到的python基础知识#十二. 文件夹和包的区别]]
1
scrapy startporjcet first_scrapy

STEP3: 创建爬虫文件

  • $ 语法: cd到first_scrapy目录下运行scrapy genspider 爬虫文件名 允许爬虫的域名
1
scrapy genspider douban movie.douban.com/top250

STEP4: 运行

  • $ 语法: cd到first_scrapy目录下运行scrapy crawl douban
1
scrapy crawl douban
  • $ 语法: 使用scrapy.cmdline 在生成的爬虫文件中直接运行(不通过cmd)
1
2
3
if __name__ == '__main__':
# 使用cmdline代替在命令行中运行, execute方法接受一个list
cmdline.execute("scrapy crawl douban".split())

STEP5: [可选]日志处理

  • & 说明: 可以看到, scrapy默认是打印了非常多日志信息的, 不想看的方法有以下两种(当然出错了还得要看的, 非常重要)
  1. 关闭日志[不建议]
    • $ 语法: scrapy crawl douban --nolog
      Pasted image 20250123134445|625
  2. 控制台步显示日志, 将日志保存在一个文件中
    • $ 语法: 进到setting中, 写这两行
    • ! 日志等级: DEBUG<INFO<WARNING<ERROR<CRITICAL 这里设置为WARNING, 比WARNING低的日志信息都不会保存.
title
1
2
3
# 日志保存并设置日志保存等级
LOG_LEVEL = 'WARNING'
LOG_FILE = './log.txt'

第三节 文件目录解析

  • & 说明: 一图流, 只是简单了解下, (后续肯定会更新这张图的)
    Pasted image 20250123095127|1225

第四节 response对比requests常用方法

序号requests模块常用方法作用scrapy模块常用方法返回值作用
result.text返回以文本格式(HTML代码)response.text
result.content返回以二进制格式response.body
result.json()返回以json格式response.json()
result.status_code返回此次请求服务器返回的状态码response.status
result.headers获取响应头response.headers
result.request.headers获取该次请求的请求头request.headers
result.cookies获取cookiesrequest.headerscookie在reqeustのheaders中看
result.content.decode("指定编码")先返回二进制, 然后自定义解码获取html源码response.body.decode("utf-8")
response.xpath("\\ol...")<class ‘scrapy.selector.unified.SelectorList’>配合extractextract_first使用,[[11.Scrapy框架#第五节 作业(爬取豆瓣电影信息)]]

需求: 爬取豆瓣电影信息

  • & 说明: 上面那个Hello Scrapy [[11.Scrapy框架#第二节 Hello Scrapy]]是将项目搭建出来, 同时douban.py, 是”处理引擎发来的response, 提取数据, 提取url,再交回给引擎”的作用, 我们写数据清洗就在这里写
  • $ 语法: Scrapy底层是集成了lxml的, 所以支持xpath; 具体怎么用? 看代码!
    Pasted image 20250123133334|725

2.5.1 关闭”君子协议”

  • $ 语法: 进到settings.py
    Pasted image 20250123134904|450

2.5.2 设置请求头

  • $ 语法: 进到settings.py中,
    Pasted image 20250123134958|1275

2.5.3 设置Cookies

  • ! 检查点: ①Cookies_Enable=False ②请求头中添加Cookies
  • $ 语法: 进到settings.py中, 设置两处(①是设置是否禁用Cookies, 选择False, 如果注释掉是不开其Cookie ②是在请求头中带上Cookie头)
    Pasted image 20250123135214|1300

2.5.4 全部代码

ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import scrapy
from scrapy.http import HtmlResponse
from scrapy import cmdline


class DoubanSpider(scrapy.Spider):
name = "douban"
allowed_domains = ["movie.douban.com"]
start_urls = ["https://movie.douban.com/top250"]

def parse(self, response: HtmlResponse, **kwargs):
# 数据清洗(就简单提取个, 电影title即可)
group = response.xpath('//ol[@class="grid_view"]/li')
print(type(group), group) # <class 'scrapy.selector.unified.SelectorList'>
for movie in group: #分组
title = movie.xpath('.//span[@class="title"]/text()')
print(type(title), title)
# <class 'scrapy.selector.unified.SelectorList'>
# [<Selector query='.//span[@class="title"]/text()' data='肖申克的救赎'>,
# <Selector query='.//span[@class="title"]/text()' data='\xa0/\xa0The Shawshank Redemption'>]

t1=title.extract()
print(type(t1), t1)
# <class 'list'>
# ['肖申克的救赎',
# '\xa0/\xa0The Shawshank Redemption']

t2 =title.extract_first()
print(type(t2), t2)
# <class 'str'>
# 肖申克的救赎


if __name__ == '__main__':
cmdline.execute("scrapy crawl douban".split()) # 使用cmdline代替在命令行中运行, execute方法接受一个list

第六节 实战2(pipelines items 翻页)

2.6.1 联动pipelines实现”数据存储”

  • ! 检测点: ①setting是否配置 ②spiders是否传回的是字典 ③多个pipelines是否return item ④强转dict()
  • & 说明: 看图,那两个一次的, 正好可以进行 数据库的连接和关闭
  • ! 注意: open_spider(), process_item(), close_spider()只有在piplines.py中才可以重写, 但可以有多个类, 不同的类可以不同的重写
    Pasted image 20250123224532
    Pasted image 20250123173139|500
  1. 存储到mysql数据库
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from itemadapter import ItemAdapter
import pymysql


class FirstScrapyPipeline:

def open_spider(self, spider):
# 初始化连接
self.mysql_db = pymysql.connect(host="localhost", user="root", password="root", db="py_spider")
self.mysql_cursor = self.mysql_db.cursor()

# 建表
sql = """
create table if not exists db_movie(
id int not null auto_increment primary key,
title varchar(50),
rate double,
quote varchar(255)
) engine = innodb;
"""
if self.mysql_cursor.execute("SHOW TABLES LIKE 'db_movie';"):
print("表已存在")
else:
try:
self.mysql_cursor.execute(sql)
print("建表成功")
except Exception as e:
print("建表失败", e)

def process_item(self, item, spider):
# 插入数据
sql = "insert into db_movie(title,rate,quote) values(%s,%s,%s)"
try:
self.mysql_cursor.execute(sql, (item['title'], item['rate'], item['quote']))
self.mysql_db.commit()
print(f"数据{item},插入成功!")
except Exception as e:
print(f"数据{item},插入失败!, {e}")

def close_spider(self, spider):
# 关闭连接
self.mysql_cursor.close()
self.mysql_db.close()

  1. 存储到mongodb数据库
  • ! 注意: 这里需要将item转为字典, 否则报没id的异常
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class MongoSpiderPipeline:
def open_spider(self, spider):
self.client = pymongo.MongoClient()
self.collection = self.client['py_spider']['db_movie']

def process_item(self, item, spider):
try:
self.collection.insert_one(dict(item)) # 这里要转dict, mongodb不认map
print("插入成功", item)
except Exception as e:
print("==>插入失败", e)

def close_spider(self, spider):
self.client.close()


settings.py

  • ! 这里需要”激活”pipeline, 才能用, 这里的300, 301代表权重,数字越小, 越先得到item值, 第一个管道得到的值是从spiders来的, 如果上一个管道没有return item, 后面的管道就拿不到这个值 (后面的管道只能拿到上一个管道return的值) 【类似与责任链的设计模式】
    Pasted image 20250124170237

2.6.2 联动items实现”数据定义”

  • ! 检查点: ①items名称是否对的上 ②spiders中是否实例化的是items定义的类而不是普通的Dict( ) ③导包 set source packege
  • & 说明: items的作用就是, 定义你要存储的格式(不定义类型)
  • ! 注意: scrapy.Item其本质上是一个字典(最高父类是字典)

  1. STEP1: 在items中”定义数据”
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html

import scrapy

class FirstScrapyItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
title = scrapy.Field()
rate = scrapy.Field()
quote = scrapy.Field()


  1. STEP2: 在parse中导入, 并实例化item作为字典返回, 不在返回Dict()
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import scrapy
from scrapy import cmdline
from scrapy.http import HtmlResponse
from first_scrapy.items import FirstScrapyItem


class DoubanSpider(scrapy.Spider):
name = "douban"
allowed_domains = ["movie.douban.com"]
start_urls = ["https://movie.douban.com/top250"]

def parse(self, response: HtmlResponse, **kwargs):
group = response.xpath('//ol[@class="grid_view"]/li')
for movie in group:
item = FirstScrapyItem()
item['title1'] = movie.xpath('.//span[@class="title"][1]/text()').extract_first()
item['rate'] = movie.xpath('.//span[@class="rating_num"]/text()').extract_first()
item['quote'] = movie.xpath('.//span[@class="inq"]/text()').extract_first()
yield item

if __name__ == '__main__':
cmdline.execute("scrapy crawl douban".split())


  1. STEP3: 将框架的文件夹设为”根文件夹”, 否则无法导入包【左图】
    Pasted image 20250123175500|375 Pasted image 20250123175013|975
  • ! 注意: 如果数据定义对不上, 会发生什么? 【右图】

2.6.3 实现翻页

  • ! 检测点: ①重写start_requests( )方法, 请求我指定的url, 而不是默认的start_urls
  1. 思路一: 获取页面的”下一页”标签, 如果有href就表示还有下一页,否则就是翻页结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import scrapy
from scrapy import cmdline
from scrapy.http import HtmlResponse
from first_scrapy.items import FirstScrapyItem


class DoubanSpider(scrapy.Spider):
name = "douban"
allowed_domains = ["movie.douban.com"]
start_urls = ["https://movie.douban.com/top250"]

def parse(self, response: HtmlResponse, **kwargs):
group = response.xpath('//ol[@class="grid_view"]/li')
for movie in group:
item = FirstScrapyItem()
item['title'] = movie.xpath('.//span[@class="title"][1]/text()').extract_first()
item['rate'] = movie.xpath('.//span[@class="rating_num"]/text()').extract_first()
item['quote'] = movie.xpath('.//span[@class="inq"]/text()').extract_first()
yield item
# 框架的流程,response到spiders后,如果有url,调用回调发送让Scrapy继续去请求,也可以封装为字典给pipelines
# 翻页实现思路一: 获取页面的"下一页"标签, 如果有href就表示还有下一页,否则就是翻页结束
next_url = response.xpath('//span[@class="next"]/a/@href').extract_first()
if next_url:
print(self.start_urls) # self.start_urls是不会变的
next_url = response.urljoin(next_url) # urljoin = response.url + next_url
# next_url = "https://movie.douban.com/top250" + next_url #等效于上一行
yield scrapy.Request(url=next_url, callback=self.parse)


if __name__ == '__main__':
cmdline.execute("scrapy crawl douban".split())

  1. 【推荐】思路二: 重写start_reqeusts()方法
    • & 说明: 当执行scrapy crawl douban后, 会发生如下的事情【左图】请注意⚠️看start_requests()方法的默认编写【右图】, 所以我们如果把默认遍历start_url, 重写为遍历每一页, 不就可以实现每页数据的读取了.
    • ! 注意: start_requests()确实是整个周期只执行一次,但你要执行完! 程序运行到了parse, 是callback让他回调到那的, 不是说执行了parse, 就代表start_requests执行完了
      Pasted image 20250123212522|650Pasted image 20250123212721|700
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import scrapy
from scrapy import cmdline
from scrapy.http import HtmlResponse
from first_scrapy.items import FirstScrapyItem


class DoubanSpider(scrapy.Spider):
name = "douban"
allowed_domains = ["movie.douban.com"]
start_urls = ["https://movie.douban.com/top250"]

def parse(self, response: HtmlResponse, **kwargs):
print("parse")
group = response.xpath('//ol[@class="grid_view"]/li')
for movie in group:
item = FirstScrapyItem()
item['title'] = movie.xpath('.//span[@class="title"][1]/text()').extract_first()
item['rate'] = movie.xpath('.//span[@class="rating_num"]/text()').extract_first()
item['quote'] = movie.xpath('.//span[@class="inq"]/text()').extract_first()
yield item

def start_requests(self):
print("start_requests")
for page in range(0, 10):
next_url = f"https://movie.douban.com/top250?start={page * 25}&filter="
yield scrapy.Request(next_url, callback=self.parse)


if __name__ == '__main__':
cmdline.execute("scrapy crawl douban".split())

第七节 实战3(meta, URL去重, 发送POST请求)

2.7.1 meta

  • ! 检查点: ①返回Request时携带meta字典【meta是个字典!!!】 ③response.meta获取这个字典

    需求:我现在需要爬每个电影的详细页的数据

  • & parse是处理全部页的请求的, 我现在需要处理电影详细页的请求, 我需要将item从parse()方法–>parse_detail()方法,在detail处理好后再yield给pipeline,此时就要用到meta
  • $ 语法:①放入: 通过Request(meta=字典) ②取出: 通过response.meta[键]获取
  • ! 注意: Spiders中的类可以自己写方法, 但一定要有self, response这两个形参
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import scrapy
from scrapy import cmdline
from scrapy.http import HtmlResponse
from first_scrapy.items import FirstScrapyItem


class DoubanSpider(scrapy.Spider):
name = "douban"
allowed_domains = ["movie.douban.com"]
start_urls = ["https://movie.douban.com/top250"]

def parse(self, response: HtmlResponse, **kwargs):
print("parse")
group = response.xpath('//ol[@class="grid_view"]/li')
for movie in group:
item = FirstScrapyItem()
item['title'] = movie.xpath('.//span[@class="title"][1]/text()').extract_first()
item['rate'] = movie.xpath('.//span[@class="rating_num"]/text()').extract_first()
item['quote'] = movie.xpath('.//span[@class="inq"]/text()').extract_first()
item['url'] = movie.xpath('.//div[@class="hd"]/a/@href').extract_first() # 详细页的url链接
yield scrapy.Request(item['url'], callback=self.parse_detail, meta={"item": item})

# 处理详细数据
def parse_detail(self, response: HtmlResponse):
item = response.meta['item'] # 相当于接力item数据
item['short'] = response.xpath('//span[@property="v:summary"]/text()').extract_first().strip() or ""
yield item # 这里再传给数据库

# 翻页
def start_requests(self):
print("start_requests")
for page in range(0, 10):
next_url = f"https://movie.douban.com/top250?start={page * 25}&filter="
yield scrapy.Request(next_url, callback=self.parse)


if __name__ == '__main__':
cmdline.execute("scrapy crawl douban".split())

2.7.2 URL去重

  • & 如果我们希望, 我们发送的url中存在相同url时, 只发送一个, 我们需要设置dont_filter变量
    • $ case1: 当dont_filter设置为True时,Scrapy将不会对该请求进行去重检查,即使该请求的URL已经存在于调度器中。
    • $ case2: 当dont_filter设置为False(默认值),Scrapy会检查请求的URL是否已经被调度过,如果是,则不会再次调度该请求。
  • ! dont_filter是Request对象的属性, 下面yield的写法, 相当于request.dont_filter=True
title
1
2
3
4
5
def start_requests(self):
url = "http://httpbin.org/ip"
for page in range(1, 50):
yield scrapy.Request(url, callback=self.parse, dont_filter=True)
# 挺好记的, dont_filter==>不要过滤,True就是认同,不需要过滤

2.7.3 发送POST请求 🕳️

  • & 有两种发送POST请求的方法, 但是有区别
  • $ 方式一: Request(url, body=json.dumps(请求体字典), method="POST",callback: 这种方法请求时会用json(字符串)发过去
    • ! 注意⚠️: 如果想要&拼接的方式, 可以使用 urllib的方法urlencode(字典), 让body=urlencode(data)就可以实现FormRequest的&方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import json

def start_requests(self):
data = {
"column": "szse_main_latest",
"pageNum": 1,
"pageSize": '30',
"sortName": "",
"sortType": "",
"clusterFlag": "true"
}
yield scrapy.Request(url=aim_url, body=json.dumps(data), method='POST', callback=self.parse)
# request.body:
# b'{"column": "szse_main_latest", "pageNum": "1", "pageSize": "30", "sortName": "", "sortType": "", "clusterFlag": "true"}'


  • $ 方式二:FormRequest(url, formdata=字典, callback): 这种方法请求会用&拼接过去
1
2
3
4
5
6
7
8
9
10
11
12
def start_requests(self):
data = {
"column": "szse_main_latest",
"pageNum": str(page),
"pageSize": '30',
"sortName": "",
"sortType": "",
"clusterFlag": "true"
}
yield scrapy.FormRequest(url=aim_url, formdata=data, callback=self.parse)
# request.body:
# b'column=szse_main_latest&pageNum=1&pageSize=30&sortName=&sortType=&clusterFlag=true'
  • ! 这里可以查看”请求标头”, 猜测服务器能解析何种请求方式(①json字符串②&符号拼接), 其实不用刻意去背, 其中一种发不了的时候, 用另一种就行
类型Content-Type简要解释使用场景默认该用
表单数据application/x-www-form-urlencoded适用于简单的键值对数据,通常用于HTML表单提交。提交表单数据,如登录表单、搜索表单等FormRequest
JSON数据application/json适用于结构化数据,广泛用于API开发。提交JSON格式的数据,如API请求Request
文件数据multipart/form-data适用于上传文件,可以同时发送文件和表单数据。上传文件,如图片、文档等Request
XML数据application/xmltext/xml适用于XML结构化数据,常用于SOAP等协议。提交XML格式的数据,如SOAP请求Request
自定义数据格式text/plain 或其他自定义类型适用于自定义格式的数据,只要服务器端能够解析即可。提交自定义格式的数据Request
多种数据类型混合multipart/form-data可以在一个请求中混合使用多种数据类型,如表单数据和JSON数据。混合发送多种类型的数据Request
![Pasted image 20250127135152700](https://20040424.xyz/PicList/20250209160026249.png)

第八节 中间件 DownloaderMiddlewares

2.8.1 DownloaderMiddlewares

  • & 说明: ①可以有很多个”下载中间件” ②类似Spring的”拦截器”,来回都会拦截
  1. from_crawler()
    • $ 作用: 这个方法初始化该”下载中间件”类, 必须返回 这个中间件的实例对象
    • $ 语法点2: crawler.signals.connect(触发指定方法, 指定情况) 像默认写法这里就是, 当spiders对象实例化时(前), 触发一个叫做spider_opened的一个自定义方法
    • $ 返回值: 一定是当前类的实例化对象
信号名称说明 【数据来源Cursor】
signals.spider_opened当爬虫打开时触发。
signals.spider_closed当爬虫关闭时触发。
signals.item_scraped当一个项目被爬虫成功抓取并返回时触发。
signals.request_scheduled当请求被调度器安排时触发。
signals.response_received当响应被下载器接收到时触发。
signals.spider_error当爬虫处理请求时发生错误时触发。
signals.item_dropped当项目被丢弃时触发。
signals.engine_started当Scrapy引擎启动时触发。
signals.engine_stopped当Scrapy引擎停止时触发。
signals.start_requests当爬虫开始请求时触发。
signals.stats_collected当统计信息被收集时触发。
title
1
2
3
4
5
6
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s

  1. process_request()
    • $ 作用: 这个方法用于截取 Engine到Dowloader之间的url请求
    • $ 返回值:
    • ①【default】return None: 默认就是返回None, 表示继续发送原先的request请求
    • return Request对象: (记形参中的reqeust为”原request”; 返回的request为”新request”)原request不发了=>将”新request加入到请求队列中==>终止原request的生命周期(不再发往下一个request)
    • return Response对象: 终止request的生命周期(不再发往下一个中间件,更不会发给”下载器”), 示例: Engine->A->B->C->Downloader; ABC分别是不同的下载中间件那如果A的process_request返回Response, 这个Response是直接略过BC的process_request, 直接跳到CBA这个次序的process_response 进行处理
title
1
2
3
4
5
6
7
8
9
10
11
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.

# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
return None

  1. process_response()
    • $ 作用: 截取 Dowloader准备返回给Engine的response信息 (注意如果有多个DowloaderMiddleware顺序与process_reqeust正好相反)
    • $ 返回值:
    • ①【default】return Response: 默认就是返回Response,适用于: 可以”篡改”响应内容, 或缓存机制(比如如果request的url相同, 我直接返回缓存中的值类似与”篡改”)
    • return Reqeust: 适用于: 重定向 或 条件处理(如果返回的response不是200,就换个代理ip重新请求)等
      • 如果返回Reqeust对象, 那当前的response对象还会被Spider的parse方法接受到吗?
      • 如果是Engine->A->B->C->Dowloader的次序, C的process_response()直接返回了一个request, BC还能接受到吗?
title
1
2
3
4
5
6
7
8
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.

# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response

  1. process_exception()

    用到再说, 😪😫🥱😴

title
1
2
3
4
5
6
7
8
9
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.

# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass

2.8.2 设置请求头 & Cookies

  1. 设置请求头
    • & 大体思路: DowloaderMiddleware中的process_request()方法既然是拦截请求的, 那我直接在这里把user-agent带上就实现了
    • $ 语法; request.headers['user-agent'] = 字符串
    • ! 注意: 这里直接等于字符串, 不是字典,
1
2
3
4
5
def process_request(self, request, spider):
# 设置请求头 (直接设置字符串!!!)
user_agent = "Mozilla/5.0 (Linux; Android 10; Pixel 3 XL Build/QP1A.190711.020) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Mobile Safari/537.36"
request.headers['user-agent'] = user_agent
return None

  1. 设置Cookies
    • $ 语法: ①先去settings.pyCOOKIES_ENABLED = False取消注释 ②request.headers['Cookie'] = 字符串
    • ! 注意: 一定要去settings.py解开, 要不Scrapy访问默认是不会携带Cookies的
1
2
3
4
5
def process_request(self, request, spider):
# 直接设置Cookie字符串
cookies_string = "_ga=GA1.2.1091215768.1720851732; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22190aac1939a5f9-04476d5a63df218-26001f51-2073600-190aac1939b1b56%22%2C%22first_id%22%3A%22%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24search_keyword_id%22%3A%22a6e3168500f1b99800000006678c9fcd%22%2C%22%24search_keyword_id_type%22%3A%22baidu_seo_keyword_id%22%2C%22%24search_keyword_id_hash%22%3A5603252640931268%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTkwYWFjMTkzOWE1ZjktMDQ0NzZkNWE2M2RmMjE4LTI2MDAxZjUxLTIwNzM2MDAtMTkwYWFjMTkzOWIxYjU2In0%3D%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%22%2C%22value%22%3A%22%22%7D%2C%22%24device_id%22%3A%22190aac1939a5f9-04476d5a63df218-26001f51-2073600-190aac1939b1b56%22%7D; _gcl_au=1.1.8636828.1737269208"
request.headers['Cookie'] = cookies_string # 直接设置Cookie头
return None

2.8.3 设置代理IP 🔒

  • & 帮助文档: ①meta ②meta#proxy
  • $ 语法: ①proxy的设置: Request对象中有个meta属性, 其下有个proxy属性, request.meta['proxy']="http://2.2.3.3:4324"download-timeout属性
  • ! 卡点: Scrapy的异步是, 将所有的请求(近乎)同一时刻发出, 所以需要一个”版本锁”/“乐观锁”的思想(Redis的锁)
    Pasted image 20250126171742|600
  1. 错误版本!!!!
title
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Version_1: 因为异步延迟问题, 导致的ip复用失败
class IpDownloaderMiddleware:
def __init__(self):
# self.current_proxy_ip = self.get_proxy_ip() # 初始化当前代理IP
self.current_proxy_ip = "http://3.3.3.3:3333" # 模拟ip失效

@classmethod
def from_crawler(cls, crawler):
s = cls()
return s

def process_request(self, request: Request, spider):
request.meta['proxy'] = self.current_proxy_ip # 使用当前代理IP
request.meta['download_timeout'] = 1 # 1s内没有返回就超时
return None

def process_response(self, request: Request, response: Response, spider):
if response.status != 200:
print(f"请求失败: {request.url}, 状态码: {response.status}")
self.current_proxy_ip = self.get_proxy_ip() # 获取新的代理IP
request.meta['proxy'] = self.current_proxy_ip # 更新请求的代理IP
return request # 返回请求以重试
return response

def process_exception(self, request, exception, spider):
print(f"请求 {request.url} 发生异常: {exception}")
self.current_proxy_ip = self.get_proxy_ip() # 获取新的代理IP
request.meta['proxy'] = self.current_proxy_ip # 更新请求的代理IP
return request # 返回请求以重试

def get_proxy_ip(self):
api_url = "https://dps.kdlapi.com/api/getdps/?secret_id=o5b3w54kddfiskjsu5ta&signature=tr45ga5grnvp1943h0paert5qwquy7cb&num=1&pt=1&sep=1"
proxy_ip = requests.get(api_url).text
username = "d4472377283"
password = "rudm2ozb"
return f"http://{username}:{password}@{proxy_ip}/"

  1. 可重用ip版本
title
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Version_2: 采用类似"版本锁"的思想, 构建复用单ip
# 大体思路: 一个ip就是一个版本, ①版本相同+报错=ip失效(需要获取新ip) ②版本不同+报错=ip未及时更新(更新版本即可,无需获取新ip)
class IpDownloaderMiddleware_V2:
def __init__(self):
# self.current_proxy_ip = self.get_proxy_ip() # 初始化当前代理IP
self.current_proxy_ip = "http://3.3.3.3:8923" # 模拟ip失效

@classmethod
def from_crawler(cls, crawler):
s = cls()
return s

def process_request(self, request: Request, spider):
request.meta['proxy'] = self.current_proxy_ip # 使用当前代理IP
request.meta['download_timeout'] = 1 # 1s内没有返回就超时
request.meta['max_retry_times'] = 3 # 最多尝试0次
# 打印当前是第几次try
if "retry_times" not in request.meta:
request.meta['retry_times'] = 1
print(f"url:{request.url}, {request.meta['retry_times']}/{request.meta['max_retry_times']}")
return None

def process_response(self, request: Request, response: Response, spider):
if response.status != 200:
self.check_version(request)
return request
return response

def process_exception(self, request, exception, spider):
if isinstance(exception, twisted.internet.error.TimeoutError):
self.check_version(request)
return request

def check_version(self, request):
# 检测版本
# case1:版本相同,意味着ip失效, 需要新的ip
if request.meta['proxy'] == self.current_proxy_ip:
self.current_proxy_ip = self.get_proxy_ip() # 更新ip
request.meta['proxy'] = self.current_proxy_ip
print("获取新ip成功!!!")
# case2: 版本不同,意味着未及时更新ip
else:
print("ip未及时更新,已正确赋值新ip")
request.meta['proxy'] = self.current_proxy_ip

def get_proxy_ip(self):
api_url = "https://dps.kdlapi.com/api/getdps/?secret_id=o5b3w54kddfiskjsu5ta&signature=tr45ga5grnvp1943h0paert5qwquy7cb&num=1&pt=1&sep=1"
proxy_ip = requests.get(api_url).text
username = "d4472377283"
password = "rudm2ozb"
return f"http://{username}:{password}@{proxy_ip}/"

  1. 可复用ip代理池
del
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Version_3: 同Version_2的思想, 构建ip池
class IpDownloaderMiddleware_V3:
def __init__(self):
self.pool_size = 5
self.proxy_ip_pool = [f"http://3.3.3.3:333{i}" for i in range(self.pool_size)] # 模拟失效ip
# self.proxy_ip_pool = [self.get_proxy_ip() for _ in range(self.pool_size)] # 初始化当前代理IP

@classmethod
def from_crawler(cls, crawler):
s = cls()
return s

def process_request(self, request: Request, spider):
request.meta['proxy'] = random.choice(self.proxy_ip_pool) # 使用当前代理IP
request.meta['download_timeout'] = 1 # 1s内没有返回就超时
request.meta['max_retry_times'] = 3 # 最多尝试0次
# 打印当前是第几次try
if "retry_times" not in request.meta:
request.meta['retry_times'] = 1
print(f"url:{request.url}, {request.meta['retry_times']}/{request.meta['max_retry_times']}")
return None

def process_response(self, request: Request, response: Response, spider):
if response.status != 200:
self.check_version(request)
return request
return response

def process_exception(self, request, exception, spider):
if isinstance(exception, twisted.internet.error.TimeoutError):
self.check_version(request)
return request

def check_version(self, request):
# 检测版本
# case1:版本相同[在ip池里有],意味着ip失效, 需要新的ip
if request.meta['proxy'] in self.proxy_ip_pool:
# 更新ip
self.proxy_ip_pool.remove(request.meta['proxy']) # 删除错误版本
self.proxy_ip_pool.append(self.get_proxy_ip()) # 相当于替换

request.meta['proxy'] = random.choice(self.proxy_ip_pool)
print("获取新ip成功!!!")
# case2: 版本不同,意味着未及时更新ip
else:
request.meta['proxy'] = random.choice(self.proxy_ip_pool)
print("ip未及时更新,已正确赋值新ip")
print(f"当前代理池:{self.proxy_ip_pool}")

def get_proxy_ip(self):
api_url = "https://dps.kdlapi.com/api/getdps/?secret_id=o5b3w54kddfiskjsu5ta&signature=tr45ga5grnvp1943h0paert5qwquy7cb&num=1&pt=1&sep=1"
proxy_ip = requests.get(api_url).text
username = "d4472377283"
password = "rudm2ozb"
return f"http://{username}:{password}@{proxy_ip}/"
  1. 连接Redis的ip代理池

2.8.4 联动Selenium

  • & 大体思路: 在DowloaderMiddlewares的process_request()那 ①截获url, ②用Selenium发送, ③并直接返回Response结果 [[11.Scrapy框架#2.1.5 Scrapy框架的工作流程]]
  • ! 注意: ①Scrapy中的浏览器驱动必须使用 绝对路径 ②这里的浏览器不会自己关, 必须要quit()
del
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class SeleniumTxWorkDownloaderMiddleware:
# Selenium_STEP1: 创建Browser对象
def __init__(self):
self.service = Service(executable_path="D:\\用户\\文档\\CodeBeach\\Python\\爬虫逆向2024年11月25日_后端\\2.爬虫部分\\2.6自动化测试工具Selenium\\browser_driver\\chromedriver.exe") # 不能用相对路径!!!TODO
# 参数配置
options = webdriver.ChromeOptions()
# options.add_argument('-headless') # 无头模式
prefs = {"profile.managed_default_content_settings.images": 2}
options.add_experimental_option("prefs", prefs) # 不加载图片

self.browser = webdriver.Chrome(service=self.service, options=options)

@classmethod
def from_crawler(cls, crawler):
s = cls()
# crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
return s

def process_request(self, request, spider):
print("==>", request.url)
# Selenium_STEP2:使用浏览器对象打开指定的url
self.browser.get(request.url) # 看流程,在执行DownloaderMiddleware的process_request和response之前是先执行的spiders的start_request,所以到这里时request是有url的

# 等待浏览器执行完毕 # 延迟等待
wait = WebDriverWait(self.browser, 10) # 创建一个WebDriverWait对象,最多等待10秒
# presence_of_element_located:判断指定标签是否载入
wait.until(EC.presence_of_element_located(
(By.CLASS_NAME, 'recruit-list') # 等待类名为'recruit-list'的元素出现
))

# Selenium_STEP3:获取页面信息
# 获取页面信息, 并直接返回Response
body = self.browser.page_source # 页面源码
return scrapy.http.HtmlResponse(url=request.url, body=body, request=request, encoding="utf-8")

def process_response(self, request, response, spider):
return response

def process_exception(self, request, exception, spider):
pass

# 关闭时触发
def spider_closed(self, spider):
# Selenium_STEP4:关闭浏览器 (结合scrapy就不会自己关闭了,必须手动quit,浏览器才能退出)
self.browser.quit()

2.8.5 RetryMiddleware重试中间件

  • & 这个是Scrapy默认配置的中间件, 就这个容易晕☠️; 下图是官方中间件的”名称”和”权值”【图一】
    Pasted image 20250204223851|725Pasted image 20250204224430|650
  • $ 这个中间件的流程如下:
  1. 检查全局重试开关
    → 若 settings.RETRY_ENABLED = False request.meta['retry_enabled'] = False,直接跳过重试。
    → 若 request.meta['dont_retry'] = True,终止重试流程。

  1. 触发重试条件判断
    异常触发:请求抛出异常,且异常类型在 settings.RETRY_EXCEPTIONS 中。【图二】
    状态码触发:响应状态码在 settings.RETRY_HTTP_CODES 中。【图二】

  1. 计算最大重试次数
    → 优先级:request.meta['max_retry_times'] > settings.RETRY_TIMES(默认 2)。

  1. 判断当前重试次数
    request.meta['retry_times'] 记录已重试次数(初始为 0)。
    → 若 retry_times >= max_retry_times,停止重试。

  1. 执行重试
    → 克隆原始请求,retry_times +1,重新加入调度队列。
    → 设置request.dont_filter=True ==防止被过滤==

  • $ 关键属性/配置作用
    • request.meat['dont_retry']:强制跳过某请求的重试。
    • request.meat['max_retry_times']:单请求最大重试次数的局部覆盖。
    • request.meat['retry_times']:统计当前重试次数。
    • RETRY_ENABLED:settings.py中配置, 全局重试开关。
    • RETRY_HTTP_CODES:settings.py中配置, 需重试的 HTTP 状态码列表。
    • RETRY_EXCEPTIONS:settings.py中配置, 需重试的异常类型列表。

第九节 Scrapy爬虫监控

2.9.1 使用监控平台ScarpyOps

  • STEP1: 下载包
1
pip install scrapeops-scrapy


  • STEP3: settings.py
title
1
2
3
EXTENSIONS = {
'scrapeops_scrapy.extension.ScrapeOpsMonitor': 500, # 这个是ScrapyOps平台的Extension
}
title
1
2
3
4
DOWNLOADER_MIDDLEWARES = { 
'scrapeops_scrapy.middleware.retry.RetryMiddleware': 550,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': None,
}
title
1
SCRAPEOPS_API_KEY = 'YOUR_API_KEY'

2.9.2 发送邮件

  1. STEP1:settings.py
title
1
2
3
EXTENSIONS = {
'scrapy.extensions.statsmailer.StatsMailer': 500,
}
title
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 邮箱
sender_email = "[email protected]" # 发件人邮箱
SMTP_KEY = 'tqxovxdxfrstbabd'
receiver_email_list = ['[email protected]', '[email protected]'] # 收件人邮箱(列表)

STATSMAILER_RCPTS = receiver_email_list # 收件人
MAIL_FROM = sender_email # 发件人邮箱

# SMTP服务器配置
MAIL_HOST = 'smtp.qq.com' # SMTP服务器地址(这里是qq的)
MAIL_PORT = 465 # SMTP端口(默认)
MAIL_USER = sender_email # 发件人邮箱
MAIL_PASS = SMTP_KEY # SMTP服务的密码
MAIL_SSL = True # 是否使用SSL

  1. STEP2: spider编写
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class TxWorkSpider(scrapy.Spider):
name = "tx_work"
allowed_domains = ["careers.tencent.com"]
start_urls = ["https://careers.tencent.com/search.html"]

def parse(self, response: HtmlResponse, **kwargs):
print(f"parse==> 请求url:{response.request.url}, 使用代理:{response.request.meta['proxy']}")
yield response.json()['origin']

def start_requests(self):
# 发送邮箱STEP1:实例化邮箱
self.emailer = MailSender.from_settings(self.settings)

url = "http://httpbin.org/ip"
for page in range(1, 10):
yield scrapy.Request(url, callback=self.parse, dont_filter=True)

def close(self, spider, reason):
print("spider close")
# 发送邮件STEP2: 发送
subject = "scrapy test" # 邮件标题
body = f"爬虫结束 {time.time()}" # 邮件内容
return self.emailer.send(to=self.settings.STATSMAILER_RCPTS, subject=subject, body=body)

第十节 总结常用设置

2.10.1 Request对象

属性作用默认值注意
request.header['user-agent']设置本次请求的UA头
request.header['cookie']设置本次请求的Cookie内容;settings.py中的COOKIES_ENABLED = False必须设置
request.meta['proxy']设置请求的代理IP
request.meta['download_timeout']最长等待该请求响应的时间180s
request.meta[retry_times']是第几次重试0首次发送不算重试, 首页也就没有这个属性
request.meta['max_retry_times']最多重试几次2设置为3, 该请求会发4次, 因为首次不算重试
request.dont_filter是否将该URL进行过滤False默认是会过滤的

2.10.2 settings.py

  • & 说明: 这里只列举, 不是每次都用到, 但可能会用到的配置属性
属性作用归属默认值注意
DOWNLOAD_DELAY每个请求之间的延迟时间(秒)Request0
RETRY_DELAY重试请求之间的延迟时间(秒)Request3
CONCURRENT_REQUESTSScrapy同时可以处理的请求数目全局100
CONCURRENT_REQUESTS_PER_IP每个IP同时发起的请求数量全局8当请求的IP相同数量到达CONCURRENT_REQUESTS_PER_IP 后, 后续
使用该iP的请求, 会等其前面(使用该IP的)请求完成处理后, 再请求
  • 关于CONCURRENT_REQUESTS_PER_IP的Q&A:
1
2
3
4
5
6
7
8
问题: 10个请求, 使用同一个IP(CONCURRENT_REQUESTS_PER_IP=8), scrapy会如何调度
一个目标服务器(例如爬取 example.com,无论使用什么代理,目标IP都是固定的):
并发限制生效:CONCURRENT_REQUESTS_PER_IP=8 会限制对目标IP的并发请求数为 8。
请求处理顺序:
前8个请求会立即被发送。
剩余2个请求会进入队列等待。
当有请求完成时,队列中的请求会按顺序补位,直到所有请求完成。
代理IP的作用:虽然每个请求使用不同的代理IP,但目标IP相同,因此并发限制仍针对目标IP生效。

第三章 分布式爬虫scrapy_redis

第一节 初识scrapy_redis

3.1.1 分布式爬虫的架构(基本思想)

scrapy_redis地址: https://github.com/rmax/scrapy-redis

  • & 说明: 因为是分布式, 所以是多个Scheduler同时进行调度①【图一】可以选择用Queue,但因为Queue没办法实现共享 ② 【图二】所以用Redis实现数据的共享; 而这个Redis也不用我们自己实现, 有scrapy_redis帮我们写好了, 我们直接用就好
  • $ 总结: 使用scrapy_redis后在Redis中可能会存在以下键
    1. REDIS_KEY:request: 所有的yield scrapy.Request(...)会上传到Redis的这个键中, 所有的请求会从这个键中拿
    2. REDIS_KEY:dupefilter: 开启过滤后该集合会将所有的URL进行SA1 Hash编码, 用于判重
    3. REDIS_KEY:items: 如果配置scrapy_reids的管道就会存储到redis中'scrapy_redis.pipelines.RedisPipeline': 400
      Pasted image 20250128195304|650 Pasted image 20250128195318|675

第二节 Hello Scrapy_Redis

STEP1: settings.py配置

  • $ 语法: 包括①换调度器 ②换过滤器 ③开启断点续爬 ④配置Redis连接URL
1
2
3
4
SCHEDULER = 'scrapy_redis.scheduler.Scheduler' # 换调度器
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 换过滤器
SCHEDULER_PERSIST = True # 开启断点续爬
REDIS_URL = "redis://127.0.0.1:6379/0" # # Redis服务器地址

STEP2: spider配置

  • & 说明: 实际上只需要STEP1就可以联动Redis了, 但如果要涉及多台机器, 就必须要配置STEP2, 配置后会有以下变化
    • ①spider监听REDIS_KEY:request键是否有值 , 如果有会从该键中拿, ==否则不会关闭, 而是监听这个端口==(如果只配STEP1, 键中没值就退出程序)
1
2
3
4
class Top250RedisSpider(scrapy_redis.spiders.RedisSpider): # 换基类
name = "top250_redis"
REDIS_KEY = name # 设置REDIS_KEY
...

第三节 深度理解两个Redis中的键

3.3.1 REDIS_KEY:request

  • & 作用: 断点续爬就是靠这个, 相当于原生断点续爬的crawls目录下的文件
  • $ 如下两个属性可以看看, 注意只是知道即可, 不要乱用, 贼容易晕😵‍💫
    Pasted image 20250204223503|775Pasted image 20250204223508|775

3.3.2 REDIS_KEY:dupefilter

  • & 说明: 如果URL的request.dont_filter属性为True就不会进到这里进行去重, 否则会去重, 和原生的一样
  • Title: 11.Scrapy框架
  • Author: 明廷盛
  • Created at : 2025-02-15 14:34:49
  • Updated at : 2025-02-15 13:41:00
  • Link: https://blog.20040424.xyz/2025/02/15/🐍爬虫工程师/第一部分 爬虫基础/11.Scrapy框架/
  • License: All Rights Reserved © 明廷盛