8.高性能爬虫
第一章 使用多线程
第一节 什么是任务计数
- 概念:队列中未完成任务的个数,用于跟踪任务的处理进度。
- & 额外说明: 每次q.put(x), 就相当于增加一个未完成的任务, q.get()并不能将任务的个数减少, 只能将值弹出去
- 获取方法:通过 q.unfinished_tasks 属性获取。
第二节 queue在多线程中的常用方法
q.get()
- 功能:从队列中取出一个元素。
- 对任务计数的影响:只是将值弹出队列,并不会减少任务计数。
q.put(item)
- 功能:将一个元素放入队列中。
- 对任务计数的影响:增加任务计数,表示有一个新任务加入队列。
q.join()
- 功能:阻塞调用它的线程,直到队列中的所有任务都被标记为完成。
- 条件:当 task_done() 被调用的次数等于 put() 的次数时,队列中的任务才算完成,此时 join() 方法才会继续执行后续代码。
q.task_done()
- 功能:标记一个任务完成。
- 对任务计数的影响:减少任务计数。
- 注意事项:如果任务计数已经为0,再调用 task_done() 会引发错误,因为这意味着没有任务可标记完成。

第三节 作业
使用多线程爬取aqy影视信息(同上)
1.3.1 多线程如何调用
爬虫四步: ①确定URL地址 ②模拟发送请求 ③清洗数据 ④存储数据
①确定URL地址: 有n页, 就会有n个地址, 所以会有确定n个URL
②模拟发送请求: 可以将①中的n个URL保存到一个Queue中==>在一个个弹出来,每弹出一个就处理一个
③清洗数据: 思路和②类似, ②结束有会有n个res, 将这n个res保存到一个Queue中,清洗完成后, 存储到一个清洗队列中
④存储数据: 再从清洗队列中一个个弹出, 做数据的存储
1.3.2 多线程体现在哪?
然后在main中(外部调用的那个方法), 我们创建
a) 1个线程用于处理 确定URL地址 的问题
b) n个线程用于处理 每个URL地址模拟发送请求 的问题 [TODO 这里感觉有bug,因为上面模拟发送请求时while true处理的,
也就是说一个方法会处理完n个请求,为什么这里还要开n个线程给他;
如果是方法只处理一个请求, 开三个线程自然而然同时模拟发送请求
先按照”老师”的写吧, 然后在写一份自己的, 然后对比下时间问问AI]
c) 1个线程用于做数据的清洗与保存
1.3.3 F&Q
为什么要用True而不是not queue.empty()作为while的条件?
empty()不是线程安全的, 队列可能在检查和获取之间变化, 其他线程可能在这期间添加数据
为什么打印完”持续时间balbal”后, 还会有”插入成功”的打印?
每个方法中的task_done()一定要在最后执行, 因为只要task_done()让”任务计数”为0了, join就会立即生效
从而导致launch()方法结束, 从而执行main中的”持续时间”的打印, 所有必须等所有处理完成后再task_done()
join的作用, 就是卡主launch()方法, 只要有这三个队列, 有一个”任务计数”没有到0, launch()方法就不可能结束one_url = self.url_queue.get()不加锁会导致错误读取吗?
不会, queue的get()的是线程线程安全的(线程安全指的时在执行这个方法时候会锁上当前进程, 从而避免被其他线程操作,就是等同于帮你加锁了)
为什么给save()方法多开几个线程, 不能像get_response_queue()方法开几个线程一样增加运行效率
get_response_queue()方法主要是进行网络请求,这通常是I/O密集型操作。在这种情况下,增加线程数量可以提高并发性,因为当一个线程在等待网络响应时,其他线程可以继续执行请求。
而save()方法涉及到数据库操作(如插入数据),这也是I/O密集型操作,但通常数据库操作的性能受限于数据库的并发处理能力。
如果数据库连接池的大小有限,或者数据库本身的写入性能受到限制,增加线程数量可能不会带来显著的性能提升,反而可能导致线程竞争和上下文切换的开销。
1.3.4 “生产者和消费者”的设计模式
- 说明: 虽然它本身不是23种设计模式之一,但它是一个广为人知的并发编程模式。在这个模式中,生产者(Producer)和消费者(Consumer)共享一个固定大小的缓冲区。生产者负责生成数据并将其放入缓冲区,消费者则从缓冲区取出数据进行消费。
- 回顾: PV操作还记得吧; 一个人只管生产, 生产后放到”缓冲区”(市场); 另一个人只管消费,从”缓冲区”获取消费的资源;
- 这里的缓存区就是那几个队列,

# =================================
# @Time : 2025年01月11日
# @Author : 明廷盛
# @File : 2.多线程爬虫.py
# @Software: PyCharm
# @ProjectBackground: 使用多线程进行爬虫, 对比单线程, 看时间上的差距
# =================================
import requests
import pymongo
import time
import threading
from queue import Queue
class AQY:
# 爬虫STEP1:确定目标网址
aim_url = "https://pcw-api.iqiyi.com/search/recommend/list?channel_id=1&data_type=1&mode=24&" \
"page_id={}&ret_num=48&session=6edd98b29ba0a0950a4d3556849e8506"
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
TOTAL_PAGES = 10
def __init__(self):
# 连接mongodb
self.client = pymongo.MongoClient()
self.collection = self.client["aqy"]["item"] # 类别aqy数据库, item这张表
# 多线程
self.url_queue = Queue() # ①URL队列(元素:由每页page组成每个url)
self.response_queue = Queue() # ②请求队列(元素:每个url请求后的结果)
self.clear_queue = Queue() # ③清洗队列(元素:每个url请求后的结果)
# ①创建n个URL队列
def get_url_queue(self):
print("get_url_queue")
# 创建n页需要爬取的URL,然后放到url_queue中
for page in range(1, self.TOTAL_PAGES):
self.url_queue.put(self.aim_url.format(page))
# ②模拟发送n个请求,并创建response队列
def get_response_queue(self):
print("get_response_queue")
while True:
one_url = self.url_queue.get()
res = requests.get(one_url).json()['data']['list']
self.response_queue.put(res) # 将模拟请求的结果,存到 返回队列中,留个清洗方法去一个个清洗
self.url_queue.task_done() # 弹出一个元素,减少一个任务计数
# ③清洗并存储数据
def get_clear_queue(self):
print("get_clear_queue")
while True:
one_response = self.response_queue.get()
# 以下就是数据清洗的内容
for movie in one_response:
data = {}
data['name'] = movie['name']
data['playUrl'] = movie['playUrl']
data['description'] = movie['description']
self.clear_queue.put(data) # 将清洗好的data都放到清洗队列中, 下一步在存储方法()中进行存储
self.response_queue.task_done() # 弹一个, 减一个
def save_method(self):
print("save_method")
while True:
data = self.clear_queue.get() # 获取一个需要存储的数据
print(data)
self.collection.insert_one(data) # 爬虫STEP4:存储数据
self.clear_queue.task_done() # 弹一个,删一个
def launch(self):
# 创建需要执行的线程列表
thread_list = []
# ①创建1个执行get_url()方法的线程
thread_list.append(threading.Thread(target=self.get_url_queue, name="①执行获取所有url方法的线程"))
# ②创建3个用于执行get_response()方法线程[注意:创建几个,与爬取页数无直接关系,比如爬取5页; A:1,2 B:3,4 C:5 页]当然, 开的越多基本越快
for i in range(10):
thread_list.append(threading.Thread(target=self.get_response_queue, name=f"②第{i}个用于发送请求的线程"))
# ③创建1个执行clear()方法的线程
thread_list.append(threading.Thread(target=self.get_clear_queue, name="③用于执行clear方法的线程"))
# ④创建1个执行save()方法的线程;
for i in range(3):
thread_list.append(threading.Thread(target=self.save_method, name=f"④第{i}个执行存储方法的线程"))
# 将所有线程置为"线程守护"+执行每个线程
for thread in thread_list:
thread.setDaemon(True)
thread.start()
# join()每一个线程, 使得只有每个线程都完成,才会终止主线程
for queue in [self.url_queue, self.response_queue, self.clear_queue]:
queue.join()
if __name__ == '__main__':
start = time.time()
aqy = AQY()
aqy.launch()
print("持续时间:{}".format(time.time() - start))
第二章 使用线程池
第一节 回顾多线程
from concurrent.futures import ThreadPoolExecutor
def get_url(url):
print(url)
return "返回值:" + url
if __name__ == '__main__':
base_url = 'https://jobs.51job.com/pachongkaifa/p{}/'
# 这里的max_worker是指定"线程池"中最多有5个线程(为你所有的submit工作)
with ThreadPoolExecutor(max_workers=5) as pool:
for page in range(1, 15):
future = pool.submit(get_url, url=base_url.format(page))
print(future.result()) # 获取"提交"方法返回值
第二节 作业
需求: 使用线程池爬取 腾讯招聘的信息
注意最后多线程launch()的方法:
# =================================
# @Time : 2025年01月12日
# @Author : 明廷盛
# @File : 4.作业(线程池).py
# @Software: PyCharm
# @ProjectBackground: 需求: 使用线程池爬取 [阿里招聘](https://talent-holding.alibaba.com/off-campus/position-list?lang=zh)的信息
# =================================
import requests
import pymysql
import time
from concurrent.futures import ThreadPoolExecutor
class AlibabaWork:
aim_url = "https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1736664299813&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=python&pageIndex={}&pageSize=10&language=zh-cn&area=cn"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
def __init__(self):
# 连接mysql数据库
self.db = pymysql.connect(host="localhost", user="root", password="root", database="py_spider")
self.cursor = self.db.cursor()
def get_information(cls, page):
res = requests.get(cls.aim_url.format(page), headers=cls.headers).json()
return res['Data']["Posts"]
def parse_data(self, works):
for work in works:
recruit_post_name = work['RecruitPostName']
responsibility = work['Responsibility']
post_url = work['PostURL']
self.save_data(recruit_post_name, responsibility, post_url)
def save_data(self, *params):
"""传参按照如下顺序 RecruitPostName, Responsibility, PostURL"""
sql = "insert into tx_work(recruit_post_name,responsibility,post_url) values(%s, %s, %s)"
try:
self.cursor.execute(sql, params)
self.db.commit()
print("插入成功!")
except Exception as e:
self.db.rollback()
print("==>插入失败", e)
def create_table(self):
# 创建表
sql_create_table = """
create table if not exists tx_work(
id int auto_increment primary key not null,
recruit_post_name varchar(500),
responsibility text,
post_url varchar(500)
) charset=utf8 engine=innodb;
"""
try:
self.cursor.execute(sql_create_table)
print("表创建成功")
except Exception as e:
print("==>表创建失败", e)
def launch(self):
# 创建表
self.create_table()
# 使用线程池多线程爬取数据
with ThreadPoolExecutor(max_workers=5) as thread:
futures = [thread.submit(self.get_information, page=page) for page in range(1, 30)]
for future in futures:
data = future.result() # 在所有任务提交后统一处理结果
self.parse_data(data)
self.db.close()
def single(self):
# 创建表
self.create_table()
for page in range(1, 30):
print(f"当前爬取第{page}页的数据")
res = self.get_information(page)
self.parse_data(res)
if __name__ == '__main__':
start = time.time()
alibaba_work = AlibabaWork()
alibaba_work.single()
# alibaba_work.launch()
print("持续时间: ", time.time() - start)
# 5线程: 持续时间: 4.6s
# 单线程: 持续时间: 7.9s
第三章 使用多进程
第一节 对比进程和线程
线程中资源是共享的, 而进程资源不共享
- $ 原因: 进程是操作系统分配资源的基本单位,每个进程都有自己的地址空间、内存、文件描述符等资源。线程是进程中的一个执行单元,多个线程可以共享同一个进程的资源,包括内存、文件描述符等。


第二节 作业
需求: 爬取 爱奇艺影视信息
3.2.1 大致思路
和线程的代码可以说是完全一致, 思路也和那个一模一样, 但因为进程(一个进程可以有很多线程)的特性, 所以会有以下问题
3.2.2 出现的问题
- 线程无法序列化(pickle): 序列化指的是将源代码==>字节文件的过程;
- 什么时候回导致无法序列化? 有锁的时候, 如果这里就是存在线程锁,
- ! 为什么进程需要序列号, 线程不用? 进程是最小基本单位, 每个进程都有自己的空间, 而线程只是存于内存中的, 新增线程是无需问系统要内存的(问当前进程即可), 但开辟进程需要, 代码遇到Process时会将target指定的方法拷贝一份到新的进程中(因为你要在新的进程中执行这个方法), 而如何拷贝, ==>转为二进制, 也就是序列化;
- 如何解决这一问题? ①将有”线程锁”的这个对象, 作为主线程的东西, 主线程无需新开辟, 所以也就无需序列号; ②什么是主线程的东西? 就是类对象


- 直接就结束了, 并没有爬取数据
- ①这个是因为 进程的初始化 比 线程慢(毕竟要去开空间啥的)
- ②所以线程还没初始化完成, 下面的join()如果任务计数为0, 就直接退出了, 而线程都没开始工作, 那任务计数肯定为0, 所以就直接退出了
- 解决方法? ①方法一在判断阻塞前先睡一觉,等进程开始了再判断(不知道睡多久) ②方法二, 先运行第一个生产者,让缓冲区有东西(有点消耗速度)

3.2.3 完整代码
# =================================
# @Time : 2025年01月13日
# @Author : 明廷盛
# @File : 5.多进程.py
# @Software: PyCharm
# @ProjectBackground: 一个进程中会有多个线程
# 需求: 爬取爱奇艺影视信息
# =================================
import requests
import pymongo
import time
from multiprocessing import JoinableQueue as Queue, Process
class AQYMovie:
aim_url = "https://pcw-api.iqiyi.com/search/recommend/list?channel_id=1&data_type=1&mode=24&" \
"page_id={}&ret_num=48&session=6edd98b29ba0a0950a4d3556849e8506"
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
TOTAL_PAGE = 9
# 数据库(避免"线程锁"导致的进程序列化问题)
client = pymongo.MongoClient()
collection = client["py_spider"]["aqy_movie"]
def __init__(self):
# 队列(缓冲区)
self.url_queue = Queue()
self.response_queue = Queue()
self.parse_queue = Queue()
def get_url(self):
# 生产者将url放到缓冲区(url_queue)
for page in range(1, self.TOTAL_PAGE):
url = self.aim_url.format(page)
self.url_queue.put(url)
def get_response(self):
# 消费url_queue+生产response_queue
while True:
url = self.url_queue.get() # <<消费url的缓存区
res = requests.get(url).json()['data']['list']
self.response_queue.put(res) # >>放入response缓存区
self.url_queue.task_done()
def get_parse(self):
# 消费response_queue +生产parse_queue
while True:
response = self.response_queue.get() # <<消费response缓冲区
for res in response:
data = {}
data['name'] = res['name']
data['playUrl'] = res['playUrl']
data['description'] = res['description']
self.parse_queue.put(data) # >>存入parse缓冲区
self.response_queue.task_done()
def save_data(self):
while True:
data = self.parse_queue.get() # <<消费parse缓冲区
print(data)
self.collection.insert_one(data)
self.parse_queue.task_done()
def run_get_url(self):
self.get_url()
def launch(self):
process_list = []
# 执行get_url()方法的进程 1个
# process_list.append(Process(target=self.run_get_url, name="执行get_url的进程"))
# 方法二:单个缓冲区(queue)只要一个有东西就能阻塞,那我直接get_url不用进程(因为最简单),
self.get_url()
# 执行get_response()方法的进程 3个
for i in range(3):
process_list.append(Process(target=self.get_response, name="执行get_response的进程{}".format(i)))
# 执行get_parse()方法的进程 1个
process_list.append(Process(target=self.get_parse, name="执行get_parse的进程"))
# 执行save_data()方法的进程 1个
process_list.append(Process(target=self.save_data, name="执行save_data的进程"))
# 设置守护进程, 并开始进程
for process in process_list:
process.daemon = True # 进程设置守护进程的方法, 等同于线程的setDaemon()
print(process.name)
process.start()
# 方法一: 简单粗暴,倒头就睡,等我"进程"开始运行在join判断(但问题是你怎么知道进程初始化要多久,不推荐)
# time.sleep(5)
# 阻塞,直到全部完成
for queue in [self.url_queue, self.response_queue, self.parse_queue]:
queue.join()
if __name__ == '__main__':
start = time.time()
aqy_movie = AQYMovie()
aqy_movie.launch()
print("done! ", time.time() - start)
# 多进程: 1.424278736114502 (3个用于response,其他都一个)
# 多线程: 1.0442798137664795 (3个用于response,其他都一个)
# 单线程: 2.3590798377990723
第四章 使用协程
第一节 基础知识
[[2.需要用到的python基础知识#十. 协程 asyncio]]
第二节 作业
需求: 爬取“王者荣耀”里所有英雄的皮肤
- $ await 后面只能加①Future对象 ②Task对象 ③coroutine 协程对象
- $ async 里
- a) 单个等待请求等待的就用
await 单个请求(注意是紧贴着await关键字是==紧贴==) - b) 多个请求 ①: 创建任务list ②执行方法并获取协程对象(a)) ③将协程对象转为task对象
task对象 = asyncio.create_task(线程对象)④将task对象添加到任务list ⑤await asyncio.wait(task_list) - c) 注意如果方法标注了async, 那里面只能是调用await, 不能调用单独的方法, 必须是支持协程的==>推出两个结论 ①只能用aiohttp
不能用request==②里面只要调用方法, 就98% 是a)情况就是使用await关键字==
- a) 单个等待请求等待的就用
- $ 如何启动:
asyncio.run(skin.get_hero_information())==wait方法启动”任务列表” ==; ==run方法启动 async方法== - ! 注意:执行多个请求(task_list)哪里不需要”await”
# =================================
# @Time : 2025年01月14日
# @Author : 明廷盛
# @File : 6.作业(协程).py
# @Software: PyCharm
# @ProjectBackground: 需求: 爬取["王者荣耀"](https://pvp.qq.com/web201605/herolist.shtml)里所有英雄的皮肤
"""
皮肤链接=英雄ID+皮肤ID https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/558/558-bigskin-1.jpg
所有英雄链接的json文件
"""
# =================================
import asyncio
import requests
import os
import time
import aiohttp
# 协程实现
class WzGameSkin:
skin_url = "https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg"
hero_url = "https://pvp.qq.com/web201605/js/herolist.json"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
total_image = 0 # 王者荣耀总共的皮肤数量
async def get_hero_information(self):
async with aiohttp.ClientSession() as client:
res = await client.get(self.hero_url, headers=self.headers)
json_data = await res.json(content_type=None) # 包含了"英雄名称"(cname)和"英雄id"(ename)
# 数据清洗+创建"任务循环"列表
task_list = list() # 1.创建任务list
for one_hero in json_data:
hero_name = one_hero['cname'] # 英雄的名称
hero_id = one_hero['ename'] # 英雄的ID
# {皮肤ID, 皮肤名称}
skins = {skin_id + 1: skin_name for skin_id, skin_name in enumerate(one_hero['skin_name'].split("|"))}
coroutine_obj = self.get_hero_skin(client, hero_name, hero_id, skins) # 2.获取执行方法的协程对象
task = asyncio.create_task(coroutine_obj) # 3.协程对象转为task对象; 这个方法支持将: 协程对象==>task对象
task_list.append(task) # 4.添加task到任务列表
await asyncio.wait(task_list) # 5.开始任务循环
async def get_hero_skin(self, client, hero_name, hero_id, skins):
"""下载该英雄的所有皮肤"""
# 创建文件夹
if not os.path.exists("./skin_image"):
os.makedirs("./skin_image")
# 爬取该英雄的所有皮肤
for skin_id in skins.keys():
with open(f"./skin_image/{hero_name}-{skins[skin_id]}.png", 'wb') as f:
res = await client.get(self.skin_url.format(hero_id, hero_id, skin_id), headers=self.headers) # 页面响应
image = await res.read() # `aiohttp`包获取二进制图片
print(f"{res.status}: {hero_name}-{skins[skin_id]}.png 下载中....")
f.write(image)
self.total_image += 1
if __name__ == '__main__':
start = time.time()
skin = WzGameSkin()
asyncio.run(skin.get_hero_information())
print(f"执行时间{time.time() - start},皮肤数量:{skin.total_image}")
# 单线程: 执行时间124.47043371200562,皮肤数量:761
# 协程: 执行时间 31.235466718673706,皮肤数量:761
第三节 对比requests和aiohttp
import requests
import aiohttp
import asyncio
aim_url = "https://www.baidu.com"
def request_method():
res = requests.get(aim_url)
print(res, type(res)) # <class 'requests.models.Response'>
async def aiohttp_method():
async with aiohttp.ClientSession() as client:
res = await client.get(aim_url)
print(res, type(res)) # <class 'aiohttp.client_reqrep.ClientResponse'>
if __name__ == '__main__':
print("----------requests----------")
request_method()
print("----------aiohttp----------")
asyncio.run(aiohttp_method())
| 序号 | requests模块常用方法 | 作用 | aiohttp模块常用方法 只有带( )前面才需要加 await |
作用 |
|---|---|---|---|---|
| ① | result.text |
返回以文本格式(HTML代码) | response.text() |
异步返回以文本格式的响应内容 |
| ② | result.content |
返回以二进制格式 | response.read() |
异步返回以二进制格式的响应内容 |
| ③ | result.json() |
返回以json格式 | response.json() |
异步返回以json格式的响应内容 |
| ④ | result.status_code |
返回此次请求服务器返回的状态码 | response.status |
返回此次请求服务器返回的状态码 |
| ⑤ | result.headers |
获取响应头 | response.headers |
获取响应头 |
| ⑥ | result.request.headers |
获取该次请求的请求头 | res.request_info |
获取该次请求的请求头 |
| ⑦ | result.cookies |
获取cookies | response.cookies |
获取cookies |
| ⑧ | result.content.decode("指定编码") |
先返回二进制, 然后自定义解码获取html源码 | response.read("自定义解码") |
先返回二进制, 然后自定义解码获取html源码 |
4.3.1 携带请求
requests包- $ 语法:
proxies=dict类型
- $ 语法:
import requests
proxies = {
"http": "http://user:password@proxyserver:port",
"https": "http://user:password@proxyserver:port",
}
response = requests.get("http://example.com", proxies=proxies)
print(response.text)
aiohttp包- $ 语法:
proxy=字符串 - ! 注意: 一个请求只能指定一个代理!!!
- $ 语法:
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy="http://user:password@proxyserver:port") as response:
return await response.text()
async def main():
html = await fetch("http://example.com")
print(html)
asyncio.run(main())
第四节 关于协程对象的异常处理gather()
- & 说明: 这个是针对①多个协程对象 ②协程对象可能报错; 如果不报错, 就直接
asyncio.wait()即可 - $ 语法: 前面 转协程对象为task对象啥的都一样, 只是最后的wait改为gather
response = asyncio.gather(*tasks, return_exceptions=True)- 返回值: ①done, pending = wait(); 两个都是
<Task>对象 ==②gather的返回值是没有异常, 返回这个协程对象的返回值, 否者返回异常对象配置了return_exceptions=True== - gather()的配置: 配置return_exceptions=True: 会将这些异常作为结果返回,而不是直接抛出, 默认为False直接抛出


- 返回值: ①done, pending = wait(); 两个都是
第五章 协程和retry
- 普通的retry:
@retrying.retry(stop_max_attempt_number=10) - 协程中支持的retry
- $ 语法:
- 包:
from tenacity import retry, stop_after_attempt, retry_if_exception_type - 语法:
@retry(stop=stop_after_attempt(10)) - 位置: 放在async标注的方法上面
- 包:
- $ 语法:
import asyncio
import aiohttp
import retrying
from tenacity import retry, stop_after_attempt, retry_if_exception_type
count = 0
async def fetch(client, url):
global count
count += 1
return await client.get(url, proxy="http://2.2.2.2:8890", timeout =1)
async def launch():
async with aiohttp.ClientSession() as client:
try:
coroutine_obj = await fetch(client, "https://www.baidu.com")
except:
print(count)
print(type(coroutine_obj))
if __name__ == '__main__':
asyncio.run(launch())
print(count)
第五章 aiomysql
第一节 单个连接对象和游标
和pymysql一模一样, 只是改为了协程 [[7.数据存储#第四章 Python和Mysql]]
STEP1: 建立连接
STEP2: 获取游标对象
STEP3:使用游标对象
STEP4:关闭连接:
- ! 这里注意, db的关闭不能用协程, 因为有可能其他方法正在调用, 如果也使用await中, 就会导致其他协程任务还在执行就, 意外断了.
- ! 总结下协程: 在一个方法中是一定顺序执行的, 但是涉及多个方法, 就不一定是顺序的了(说的是加入”任务列表”的协程)
# =================================
# @Time : 2025年01月14日
# @Author : 明廷盛
# @File : 7.异步数据存储.py
# @Software: PyCharm
# @ProjectBackground: aiomysql的使用
# =================================
import asyncio
import aiomysql
async def connect_mysql():
# 1.获取连接对象
db = await aiomysql.connect(user="root", password="root", db="py_spider")
# 2.获取游标
cursor = await db.cursor()
# 3.执行sql语句,并获取结果
await cursor.execute("select * from students")
res = await cursor.fetchall() # 获取结果
print(res)
# 4.关闭游标,并断开连接
await cursor.close()
db.close()
if __name__ == '__main__':
asyncio.run(connect_mysql())
第二节 多个连接对象和游标
- $ 大致说明: 只有获取connnect和cursor的方式和普通的不同了
- ①获取mysql连接池
aiomysql.create_pool() - ②获取多个connect
pool.acquire() - ③获取多个cursor
connects.cursor()
- ①获取mysql连接池
# 用connect池,实线多个connect和cursor对象
async def many_mysql():
# STEP1: 获取mysql连接池对象
pool = await aiomysql.create_pool(user="root", password="root", db="py_spider")
# STEP2: 通过"上下文管理器",创建多个connect和cursor
async with pool.acquire() as connects:
async with connects.cursor() as cursors:
# 同上3.执行sql语句,并获取结果
await cursors.execute("select * from tx_work")
res = await cursors.fetchall() # 获取结果
print(res)
# STEP3: 无需关闭,因为"上下文管理器"会帮你关闭
第三节 作业
需求: 爬取 汽车之家 的车辆数据
- Title: 8.高性能爬虫
- Author: 明廷盛
- Created at : 2026-02-02 05:32:25
- Updated at : 2025-02-15 13:41:00
- Link: https://blog.20040424.xyz/2026/02/02/🐍爬虫工程师/第一部分 爬虫基础/8.高性能爬虫/
- License: All Rights Reserved © 明廷盛








