8.高性能爬虫

8.高性能爬虫

明廷盛 嘻嘻😁

第一章 使用多线程

第一节 什么是任务计数

  • 概念:队列中未完成任务的个数,用于跟踪任务的处理进度。
  • & 额外说明: 每次q.put(x), 就相当于增加一个未完成的任务, q.get()并不能将任务的个数减少, 只能将值弹出去
  • 获取方法:通过 q.unfinished_tasks 属性获取。

第二节 queue在多线程中的常用方法

q.get()

  • 功能:从队列中取出一个元素。
  • 对任务计数的影响:只是将值弹出队列,并不会减少任务计数。

q.put(item)

  • 功能:将一个元素放入队列中。
  • 对任务计数的影响:增加任务计数,表示有一个新任务加入队列。

q.join()

  • 功能:阻塞调用它的线程,直到队列中的所有任务都被标记为完成。
  • 条件:当 task_done() 被调用的次数等于 put() 的次数时,队列中的任务才算完成,此时 join() 方法才会继续执行后续代码。

q.task_done()

  • 功能:标记一个任务完成。
  • 对任务计数的影响:减少任务计数。
  • 注意事项:如果任务计数已经为0,再调用 task_done() 会引发错误,因为这意味着没有任务可标记完成。
    Pasted image 20250111111741|1375

第三节 作业

使用多线程爬取aqy影视信息(同上)

1.3.1 多线程如何调用

爬虫四步: ①确定URL地址 ②模拟发送请求 ③清洗数据 ④存储数据
①确定URL地址: 有n页, 就会有n个地址, 所以会有确定n个URL
②模拟发送请求: 可以将①中的n个URL保存到一个Queue中==>在一个个弹出来,每弹出一个就处理一个
③清洗数据: 思路和②类似, ②结束有会有n个res, 将这n个res保存到一个Queue中,清洗完成后, 存储到一个清洗队列中
④存储数据: 再从清洗队列中一个个弹出, 做数据的存储

1.3.2 多线程体现在哪?

然后在main中(外部调用的那个方法), 我们创建
a) 1个线程用于处理 确定URL地址 的问题
b) n个线程用于处理 每个URL地址模拟发送请求 的问题 [TODO 这里感觉有bug,因为上面模拟发送请求时while true处理的,
也就是说一个方法会处理完n个请求,为什么这里还要开n个线程给他;
如果是方法只处理一个请求, 开三个线程自然而然同时模拟发送请求
先按照”老师”的写吧, 然后在写一份自己的, 然后对比下时间问问AI]
c) 1个线程用于做数据的清洗与保存

1.3.3 F&Q

  1. 为什么要用True而不是not queue.empty()作为while的条件?

    empty()不是线程安全的, 队列可能在检查和获取之间变化, 其他线程可能在这期间添加数据

  2. 为什么打印完”持续时间balbal”后, 还会有”插入成功”的打印?

    每个方法中的task_done()一定要在最后执行, 因为只要task_done()让”任务计数”为0了, join就会立即生效
    从而导致launch()方法结束, 从而执行main中的”持续时间”的打印, 所有必须等所有处理完成后再task_done()
    join的作用, 就是卡主launch()方法, 只要有这三个队列, 有一个”任务计数”没有到0, launch()方法就不可能结束

  3. one_url = self.url_queue.get()不加锁会导致错误读取吗?

    不会, queue的get()的是线程线程安全的(线程安全指的时在执行这个方法时候会锁上当前进程, 从而避免被其他线程操作,就是等同于帮你加锁了)

  4. 为什么给save()方法多开几个线程, 不能像get_response_queue()方法开几个线程一样增加运行效率

    get_response_queue()方法主要是进行网络请求,这通常是I/O密集型操作。在这种情况下,增加线程数量可以提高并发性,因为当一个线程在等待网络响应时,其他线程可以继续执行请求。
    而save()方法涉及到数据库操作(如插入数据),这也是I/O密集型操作,但通常数据库操作的性能受限于数据库的并发处理能力。
    如果数据库连接池的大小有限,或者数据库本身的写入性能受到限制,增加线程数量可能不会带来显著的性能提升,反而可能导致线程竞争和上下文切换的开销。

1.3.4 “生产者和消费者”的设计模式

  • 说明: 虽然它本身不是23种设计模式之一,但它是一个广为人知的并发编程模式。在这个模式中,生产者(Producer)和消费者(Consumer)共享一个固定大小的缓冲区。生产者负责生成数据并将其放入缓冲区,消费者则从缓冲区取出数据进行消费。
  • 回顾: PV操作还记得吧; 一个人只管生产, 生产后放到”缓冲区”(市场); 另一个人只管消费,从”缓冲区”获取消费的资源;
  • 这里的缓存区就是那几个队列,

Pasted image 20250112093848|1250


ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# =================================
# @Time : 2025年01月11日
# @Author : 明廷盛
# @File : 2.多线程爬虫.py
# @Software: PyCharm
# @ProjectBackground: 使用多线程进行爬虫, 对比单线程, 看时间上的差距
# =================================


import requests
import pymongo
import time
import threading
from queue import Queue


class AQY:
# 爬虫STEP1:确定目标网址
aim_url = "https://pcw-api.iqiyi.com/search/recommend/list?channel_id=1&data_type=1&mode=24&" \
"page_id={}&ret_num=48&session=6edd98b29ba0a0950a4d3556849e8506"
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}

TOTAL_PAGES = 10

def __init__(self):
# 连接mongodb
self.client = pymongo.MongoClient()
self.collection = self.client["aqy"]["item"] # 类别aqy数据库, item这张表
# 多线程
self.url_queue = Queue() # ①URL队列(元素:由每页page组成每个url)
self.response_queue = Queue() # ②请求队列(元素:每个url请求后的结果)
self.clear_queue = Queue() # ③清洗队列(元素:每个url请求后的结果)

# ①创建n个URL队列
def get_url_queue(self):
print("get_url_queue")
# 创建n页需要爬取的URL,然后放到url_queue中
for page in range(1, self.TOTAL_PAGES):
self.url_queue.put(self.aim_url.format(page))

# ②模拟发送n个请求,并创建response队列
def get_response_queue(self):
print("get_response_queue")
while True:
one_url = self.url_queue.get()
res = requests.get(one_url).json()['data']['list']
self.response_queue.put(res) # 将模拟请求的结果,存到 返回队列中,留个清洗方法去一个个清洗

self.url_queue.task_done() # 弹出一个元素,减少一个任务计数

# ③清洗并存储数据
def get_clear_queue(self):
print("get_clear_queue")
while True:
one_response = self.response_queue.get()

# 以下就是数据清洗的内容
for movie in one_response:
data = {}
data['name'] = movie['name']
data['playUrl'] = movie['playUrl']
data['description'] = movie['description']
self.clear_queue.put(data) # 将清洗好的data都放到清洗队列中, 下一步在存储方法()中进行存储
self.response_queue.task_done() # 弹一个, 减一个

def save_method(self):
print("save_method")
while True:
data = self.clear_queue.get() # 获取一个需要存储的数据
print(data)
self.collection.insert_one(data) # 爬虫STEP4:存储数据
self.clear_queue.task_done() # 弹一个,删一个

def launch(self):
# 创建需要执行的线程列表
thread_list = []
# ①创建1个执行get_url()方法的线程
thread_list.append(threading.Thread(target=self.get_url_queue, name="①执行获取所有url方法的线程"))
# ②创建3个用于执行get_response()方法线程[注意:创建几个,与爬取页数无直接关系,比如爬取5页; A:1,2 B:3,4 C:5 页]当然, 开的越多基本越快
for i in range(10):
thread_list.append(threading.Thread(target=self.get_response_queue, name=f"②第{i}个用于发送请求的线程"))
# ③创建1个执行clear()方法的线程
thread_list.append(threading.Thread(target=self.get_clear_queue, name="③用于执行clear方法的线程"))

# ④创建1个执行save()方法的线程;
for i in range(3):
thread_list.append(threading.Thread(target=self.save_method, name=f"④第{i}个执行存储方法的线程"))

# 将所有线程置为"线程守护"+执行每个线程
for thread in thread_list:
thread.setDaemon(True)
thread.start()

# join()每一个线程, 使得只有每个线程都完成,才会终止主线程
for queue in [self.url_queue, self.response_queue, self.clear_queue]:
queue.join()


if __name__ == '__main__':
start = time.time()
aqy = AQY()
aqy.launch()
print("持续时间:{}".format(time.time() - start))

第二章 使用线程池

第一节 回顾多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.futures import ThreadPoolExecutor


def get_url(url):
print(url)
return "返回值:" + url


if __name__ == '__main__':
base_url = 'https://jobs.51job.com/pachongkaifa/p{}/'
# 这里的max_worker是指定"线程池"中最多有5个线程(为你所有的submit工作)
with ThreadPoolExecutor(max_workers=5) as pool:
for page in range(1, 15):
future = pool.submit(get_url, url=base_url.format(page))
print(future.result()) # 获取"提交"方法返回值

第二节 作业

需求: 使用线程池爬取 腾讯招聘的信息
注意最后多线程launch()的方法:
Pasted image 20250112153036|1425

ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# =================================
# @Time : 2025年01月12日
# @Author : 明廷盛
# @File : 4.作业(线程池).py
# @Software: PyCharm
# @ProjectBackground: 需求: 使用线程池爬取 [阿里招聘](https://talent-holding.alibaba.com/off-campus/position-list?lang=zh)的信息
# =================================

import requests
import pymysql
import time
from concurrent.futures import ThreadPoolExecutor


class AlibabaWork:
aim_url = "https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1736664299813&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=python&pageIndex={}&pageSize=10&language=zh-cn&area=cn"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}

def __init__(self):
# 连接mysql数据库
self.db = pymysql.connect(host="localhost", user="root", password="root", database="py_spider")
self.cursor = self.db.cursor()

@classmethod
def get_information(cls, page):
res = requests.get(cls.aim_url.format(page), headers=cls.headers).json()
return res['Data']["Posts"]

def parse_data(self, works):
for work in works:
recruit_post_name = work['RecruitPostName']
responsibility = work['Responsibility']
post_url = work['PostURL']
self.save_data(recruit_post_name, responsibility, post_url)

def save_data(self, *params):
"""传参按照如下顺序 RecruitPostName, Responsibility, PostURL"""
sql = "insert into tx_work(recruit_post_name,responsibility,post_url) values(%s, %s, %s)"
try:
self.cursor.execute(sql, params)
self.db.commit()
print("插入成功!")
except Exception as e:
self.db.rollback()
print("==>插入失败", e)

def create_table(self):
# 创建表
sql_create_table = """
create table if not exists tx_work(
id int auto_increment primary key not null,
recruit_post_name varchar(500),
responsibility text,
post_url varchar(500)
) charset=utf8 engine=innodb;
"""
try:
self.cursor.execute(sql_create_table)
print("表创建成功")
except Exception as e:
print("==>表创建失败", e)

def launch(self):
# 创建表
self.create_table()
# 使用线程池多线程爬取数据
with ThreadPoolExecutor(max_workers=5) as thread:
futures = [thread.submit(self.get_information, page=page) for page in range(1, 30)]
for future in futures:
data = future.result() # 在所有任务提交后统一处理结果
self.parse_data(data)
self.db.close()

def single(self):
# 创建表
self.create_table()
for page in range(1, 30):
print(f"当前爬取第{page}页的数据")
res = self.get_information(page)
self.parse_data(res)


if __name__ == '__main__':
start = time.time()
alibaba_work = AlibabaWork()
alibaba_work.single()
# alibaba_work.launch()
print("持续时间: ", time.time() - start)
# 5线程: 持续时间: 4.6s
# 单线程: 持续时间: 7.9s

第三章 使用多进程

第一节 对比进程和线程

线程中资源是共享的, 而进程资源不共享

  • $ 原因: 进程是操作系统分配资源的基本单位,每个进程都有自己的地址空间、内存、文件描述符等资源。线程是进程中的一个执行单元,多个线程可以共享同一个进程的资源,包括内存、文件描述符等。
    Pasted image 20250112163522Pasted image 20250112163529|330

第二节 作业

需求: 爬取 爱奇艺影视信息

3.2.1 大致思路

和线程的代码可以说是完全一致, 思路也和那个一模一样, 但因为进程(一个进程可以有很多线程)的特性, 所以会有以下问题

3.2.2 出现的问题

  1. 线程无法序列化(pickle): 序列化指的是将源代码==>字节文件的过程;
    • 什么时候回导致无法序列化? 有锁的时候, 如果这里就是存在线程锁,
    • ! 为什么进程需要序列号, 线程不用? 进程是最小基本单位, 每个进程都有自己的空间, 而线程只是存于内存中的, 新增线程是无需问系统要内存的(问当前进程即可), 但开辟进程需要, 代码遇到Process时会将target指定的方法拷贝一份到新的进程中(因为你要在新的进程中执行这个方法), 而如何拷贝, ==>转为二进制, 也就是序列化;
    • 如何解决这一问题? ①将有”线程锁”的这个对象, 作为主线程的东西, 主线程无需新开辟, 所以也就无需序列号; ②什么是主线程的东西? 就是类对象
      Pasted image 20250113134255|675Pasted image 20250113135344|501
  2. 直接就结束了, 并没有爬取数据
    • ①这个是因为 进程的初始化 比 线程慢(毕竟要去开空间啥的)
    • ②所以线程还没初始化完成, 下面的join()如果任务计数为0, 就直接退出了, 而线程都没开始工作, 那任务计数肯定为0, 所以就直接退出了
    • 解决方法? ①方法一在判断阻塞前先睡一觉,等进程开始了再判断(不知道睡多久) ②方法二, 先运行第一个生产者,让缓冲区有东西(有点消耗速度)
      Pasted image 20250113135441|750

3.2.3 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# =================================
# @Time : 2025年01月13日
# @Author : 明廷盛
# @File : 5.多进程.py
# @Software: PyCharm
# @ProjectBackground: 一个进程中会有多个线程
# 需求: 爬取爱奇艺影视信息
# =================================

import requests
import pymongo
import time
from multiprocessing import JoinableQueue as Queue, Process


class AQYMovie:
aim_url = "https://pcw-api.iqiyi.com/search/recommend/list?channel_id=1&data_type=1&mode=24&" \
"page_id={}&ret_num=48&session=6edd98b29ba0a0950a4d3556849e8506"
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
TOTAL_PAGE = 9
# 数据库(避免"线程锁"导致的进程序列化问题)
client = pymongo.MongoClient()
collection = client["py_spider"]["aqy_movie"]

def __init__(self):
# 队列(缓冲区)
self.url_queue = Queue()
self.response_queue = Queue()
self.parse_queue = Queue()

def get_url(self):
# 生产者将url放到缓冲区(url_queue)
for page in range(1, self.TOTAL_PAGE):
url = self.aim_url.format(page)
self.url_queue.put(url)

def get_response(self):
# 消费url_queue+生产response_queue
while True:
url = self.url_queue.get() # <<消费url的缓存区
res = requests.get(url).json()['data']['list']
self.response_queue.put(res) # >>放入response缓存区
self.url_queue.task_done()

def get_parse(self):
# 消费response_queue +生产parse_queue
while True:
response = self.response_queue.get() # <<消费response缓冲区
for res in response:
data = {}
data['name'] = res['name']
data['playUrl'] = res['playUrl']
data['description'] = res['description']
self.parse_queue.put(data) # >>存入parse缓冲区
self.response_queue.task_done()

def save_data(self):
while True:
data = self.parse_queue.get() # <<消费parse缓冲区
print(data)
self.collection.insert_one(data)
self.parse_queue.task_done()

def run_get_url(self):
self.get_url()

def launch(self):
process_list = []
# 执行get_url()方法的进程 1个
# process_list.append(Process(target=self.run_get_url, name="执行get_url的进程"))

# 方法二:单个缓冲区(queue)只要一个有东西就能阻塞,那我直接get_url不用进程(因为最简单),
self.get_url()

# 执行get_response()方法的进程 3个
for i in range(3):
process_list.append(Process(target=self.get_response, name="执行get_response的进程{}".format(i)))

# 执行get_parse()方法的进程 1个
process_list.append(Process(target=self.get_parse, name="执行get_parse的进程"))

# 执行save_data()方法的进程 1个
process_list.append(Process(target=self.save_data, name="执行save_data的进程"))

# 设置守护进程, 并开始进程
for process in process_list:
process.daemon = True # 进程设置守护进程的方法, 等同于线程的setDaemon()
print(process.name)
process.start()

# 方法一: 简单粗暴,倒头就睡,等我"进程"开始运行在join判断(但问题是你怎么知道进程初始化要多久,不推荐)
# time.sleep(5)

# 阻塞,直到全部完成
for queue in [self.url_queue, self.response_queue, self.parse_queue]:
queue.join()


if __name__ == '__main__':
start = time.time()
aqy_movie = AQYMovie()
aqy_movie.launch()
print("done! ", time.time() - start)
# 多进程: 1.424278736114502 (3个用于response,其他都一个)
# 多线程: 1.0442798137664795 (3个用于response,其他都一个)
# 单线程: 2.3590798377990723

第四章 使用协程

第一节 基础知识

[[2.需要用到的python基础知识#十. 协程 asyncio]]

第二节 作业

需求: 爬取“王者荣耀”里所有英雄的皮肤

  • $ await 后面只能加①Future对象 ②Task对象 ③coroutine 协程对象
  • $ async 里
    • a) 单个等待请求等待的就用await 单个请求(注意是紧贴着await关键字是==紧贴==)
    • b) 多个请求 ①: 创建任务list ②执行方法并获取协程对象(a)) ③将协程对象转为task对象task对象 = asyncio.create_task(线程对象) ④将task对象添加到任务list ⑤await asyncio.wait(task_list)
    • c) 注意如果方法标注了async, 那里面只能是调用await, 不能调用单独的方法, 必须是支持协程的==>推出两个结论 ①只能用aiohttp不能用request ==②里面只要调用方法, 就98% 是a)情况就是使用await关键字==
  • $ 如何启动: asyncio.run(skin.get_hero_information()) ==wait方法启动”任务列表” ==; ==run方法启动 async方法==
  • ! 注意:执行多个请求(task_list)哪里不需要”await”
ins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# =================================
# @Time : 2025年01月14日
# @Author : 明廷盛
# @File : 6.作业(协程).py
# @Software: PyCharm
# @ProjectBackground: 需求: 爬取["王者荣耀"](https://pvp.qq.com/web201605/herolist.shtml)里所有英雄的皮肤
"""
皮肤链接=英雄ID+皮肤ID https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/558/558-bigskin-1.jpg
所有英雄链接的json文件

"""
# =================================
import asyncio
import requests
import os
import time

import aiohttp


# 协程实现
class WzGameSkin:
skin_url = "https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg"
hero_url = "https://pvp.qq.com/web201605/js/herolist.json"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
total_image = 0 # 王者荣耀总共的皮肤数量

async def get_hero_information(self):
async with aiohttp.ClientSession() as client:
res = await client.get(self.hero_url, headers=self.headers)
json_data = await res.json(content_type=None) # 包含了"英雄名称"(cname)和"英雄id"(ename)

# 数据清洗+创建"任务循环"列表
task_list = list() # 1.创建任务list
for one_hero in json_data:
hero_name = one_hero['cname'] # 英雄的名称
hero_id = one_hero['ename'] # 英雄的ID
# {皮肤ID, 皮肤名称}
skins = {skin_id + 1: skin_name for skin_id, skin_name in enumerate(one_hero['skin_name'].split("|"))}
coroutine_obj = self.get_hero_skin(client, hero_name, hero_id, skins) # 2.获取执行方法的协程对象
task = asyncio.create_task(coroutine_obj) # 3.协程对象转为task对象; 这个方法支持将: 协程对象==>task对象
task_list.append(task) # 4.添加task到任务列表
await asyncio.wait(task_list) # 5.开始任务循环

async def get_hero_skin(self, client, hero_name, hero_id, skins):
"""下载该英雄的所有皮肤"""
# 创建文件夹
if not os.path.exists("./skin_image"):
os.makedirs("./skin_image")
# 爬取该英雄的所有皮肤
for skin_id in skins.keys():
with open(f"./skin_image/{hero_name}-{skins[skin_id]}.png", 'wb') as f:
res = await client.get(self.skin_url.format(hero_id, hero_id, skin_id), headers=self.headers) # 页面响应
image = await res.read() # `aiohttp`包获取二进制图片
print(f"{res.status}: {hero_name}-{skins[skin_id]}.png 下载中....")
f.write(image)
self.total_image += 1

if __name__ == '__main__':
start = time.time()
skin = WzGameSkin()
asyncio.run(skin.get_hero_information())
print(f"执行时间{time.time() - start},皮肤数量:{skin.total_image}")
# 单线程: 执行时间124.47043371200562,皮肤数量:761
# 协程: 执行时间 31.235466718673706,皮肤数量:761

第三节 对比requestsaiohttp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import requests
import aiohttp
import asyncio

aim_url = "https://www.baidu.com"


def request_method():
res = requests.get(aim_url)
print(res, type(res)) # <class 'requests.models.Response'>


async def aiohttp_method():
async with aiohttp.ClientSession() as client:
res = await client.get(aim_url)
print(res, type(res)) # <class 'aiohttp.client_reqrep.ClientResponse'>


if __name__ == '__main__':
print("----------requests----------")
request_method()
print("----------aiohttp----------")
asyncio.run(aiohttp_method())
序号requests模块常用方法作用aiohttp模块常用方法 只有带( )前面才需要加 await作用
result.text返回以文本格式(HTML代码)response.text()异步返回以文本格式的响应内容
result.content返回以二进制格式response.read()异步返回以二进制格式的响应内容
result.json()返回以json格式response.json()异步返回以json格式的响应内容
result.status_code返回此次请求服务器返回的状态码response.status返回此次请求服务器返回的状态码
result.headers获取响应头response.headers获取响应头
result.request.headers获取该次请求的请求头res.request_info获取该次请求的请求头
result.cookies获取cookiesresponse.cookies获取cookies
result.content.decode("指定编码")先返回二进制, 然后自定义解码获取html源码response.read("自定义解码")先返回二进制, 然后自定义解码获取html源码

4.3.1 携带请求

  • requests包
    • $ 语法: proxies=dict类型
1
2
3
4
5
6
7
8
9
import requests

proxies = {
"http": "http://user:password@proxyserver:port",
"https": "http://user:password@proxyserver:port",
}

response = requests.get("http://example.com", proxies=proxies)
print(response.text)
  • aiohttp包
    • $ 语法: proxy=字符串
    • ! 注意: 一个请求只能指定一个代理!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
import aiohttp
import asyncio

async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy="http://user:password@proxyserver:port") as response:
return await response.text()

async def main():
html = await fetch("http://example.com")
print(html)

asyncio.run(main())

第四节 关于协程对象的异常处理gather()

  • & 说明: 这个是针对①多个协程对象 ②协程对象可能报错; 如果不报错, 就直接asyncio.wait()即可
  • $ 语法: 前面 转协程对象为task对象啥的都一样, 只是最后的wait改为gather response = asyncio.gather(*tasks, return_exceptions=True)
    • 返回值: ①done, pending = wait(); 两个都是<Task>对象 ==②gather的返回值是没有异常, 返回这个协程对象的返回值, 否者返回异常对象 配置了return_exceptions=True==
    • gather()的配置: 配置return_exceptions=True: 会将这些异常作为结果返回,而不是直接抛出, 默认为False直接抛出
      Pasted image 20250117210008|1200Pasted image 20250117210021|369

第五章 协程和retry

  • 普通的retry: @retrying.retry(stop_max_attempt_number=10)
  • 协程中支持的retry
    • $ 语法:
      • 包:from tenacity import retry, stop_after_attempt, retry_if_exception_type
      • 语法: @retry(stop=stop_after_attempt(10))
      • 位置: 放在async标注的方法上面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import aiohttp
import retrying
from tenacity import retry, stop_after_attempt, retry_if_exception_type

count = 0

@retry(stop=stop_after_attempt(10))
async def fetch(client, url):
global count
count += 1
return await client.get(url, proxy="http://2.2.2.2:8890", timeout =1)



async def launch():
async with aiohttp.ClientSession() as client:
try:
coroutine_obj = await fetch(client, "https://www.baidu.com")
except:
print(count)
print(type(coroutine_obj))


if __name__ == '__main__':
asyncio.run(launch())
print(count)

第五章 aiomysql

第一节 单个连接对象和游标

和pymysql一模一样, 只是改为了协程 [[7.数据存储#第四章 Python和Mysql]]

STEP1: 建立连接

STEP2: 获取游标对象

STEP3:使用游标对象

STEP4:关闭连接:

  • ! 这里注意, db的关闭不能用协程, 因为有可能其他方法正在调用, 如果也使用await中, 就会导致其他协程任务还在执行就, 意外断了.
  • ! 总结下协程: 在一个方法中是一定顺序执行的, 但是涉及多个方法, 就不一定是顺序的了(说的是加入”任务列表”的协程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# =================================
# @Time : 2025年01月14日
# @Author : 明廷盛
# @File : 7.异步数据存储.py
# @Software: PyCharm
# @ProjectBackground: aiomysql的使用
# =================================
import asyncio
import aiomysql

async def connect_mysql():
# 1.获取连接对象
db = await aiomysql.connect(user="root", password="root", db="py_spider")
# 2.获取游标
cursor = await db.cursor()

# 3.执行sql语句,并获取结果
await cursor.execute("select * from students")
res = await cursor.fetchall() # 获取结果
print(res)

# 4.关闭游标,并断开连接
await cursor.close()
db.close()


if __name__ == '__main__':
asyncio.run(connect_mysql())

第二节 多个连接对象和游标

  • $ 大致说明: 只有获取connnect和cursor的方式和普通的不同了
    • ①获取mysql连接池 aiomysql.create_pool()
    • ②获取多个connect pool.acquire()
    • ③获取多个cursor connects.cursor()
1
2
3
4
5
6
7
8
9
10
11
12
13
# 用connect池,实线多个connect和cursor对象
async def many_mysql():
# STEP1: 获取mysql连接池对象
pool = await aiomysql.create_pool(user="root", password="root", db="py_spider")

# STEP2: 通过"上下文管理器",创建多个connect和cursor
async with pool.acquire() as connects:
async with connects.cursor() as cursors:
# 同上3.执行sql语句,并获取结果
await cursors.execute("select * from tx_work")
res = await cursors.fetchall() # 获取结果
print(res)
# STEP3: 无需关闭,因为"上下文管理器"会帮你关闭

第三节 作业

需求: 爬取 汽车之家 的车辆数据

  • Title: 8.高性能爬虫
  • Author: 明廷盛
  • Created at : 2025-02-15 14:34:49
  • Updated at : 2025-02-15 13:41:00
  • Link: https://blog.20040424.xyz/2025/02/15/🐍爬虫工程师/第一部分 爬虫基础/8.高性能爬虫/
  • License: All Rights Reserved © 明廷盛