asyncio和 aiohttp

文章目录

      • asyncio和 aiohttp
      • 3.8版本+ 特性
      • aiohttp
      • 案例
      • 优化方案

asyncio和 aiohttp

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

import asyncio


async def task(i):
    print(f"task {i} start")
    await asyncio.sleep(1)
    print(f"task {i} end")


# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

task: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task是Future的一个子类,用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()创建。

import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


start_time = time.time()  # 开始时间

tasks = [asyncio.ensure_future(work(1, 1)),
         asyncio.ensure_future(work(2, 2)),
         asyncio.ensure_future(work(3, 3))]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())

3.8版本+ 特性

async.run() 运行协程
async.create_task()创建task
async.gather()获取返回值

import asyncio, time

async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


tasks = []
async def main():
    global tasks
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    await asyncio.wait(tasks) # 阻塞


start_time = time.time()  # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())

asyncio.create_task() 函数在 Python 3.7 中被加入。

asyncio.gather方法

# 用gather()收集返回值

import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


async def main():
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    # 将task作为参数传入gather,等异步任务都结束后返回结果列表
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])
    print("异步任务结果:", response)


start_time = time.time()  # 开始时间

asyncio.run(main())

print('运行时间: ', time.time() - start_time)

aiohttp

爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。安装方式,pip install aiohttp。

aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,aiohttp就是基于asyncio实现的http框架, 使用方式如下。

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://httpbin.org/headers") as response:
            print(await response.text())

asyncio.run(main())

案例

import asyncio
import os
import aiohttp
import time
from utils.aiorequests import aiorequest
from lxml import etree


headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}

url = "https://www.pkdoutu.com/photo/list/"



base_url = "https://www.xr02.vip/"




async def get_home_url():
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers, ssl=False) as resp:
            res = await resp.content.read()
            selector = etree.HTML(res)
            urls = selector.xpath('//ul/li[@class="i_list list_n2"]/a/@href')
            return map(lambda i: base_url+i, urls)


async def get_page_url(urls):
    async with aiohttp.ClientSession() as session:
        async with session.get(urls, headers=headers, ssl=False) as resp:
            res = await resp.content.read()
            selector = etree.HTML(res)
            page_urls = selector.xpath('//div[@class="page"]/a/@href')
            return map(lambda i: base_url+i, set(page_urls))


async def get_img_url(urls):
    async with aiohttp.ClientSession() as session:
        async with session.get(urls, headers=headers, ssl=False) as resp:
            res = await resp.content.read()
            selector = etree.HTML(res)
            name = selector.xpath("//h1/text()")[0].replace("[XiuRen秀人网]",'')
            img_urls = selector.xpath('//p/img/@src')
            return name, map(lambda i: base_url+i, img_urls)

async def download_img(urls, base_name):
    name = os.path.basename(urls)
    name = base_name + '_' + name
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(urls, headers=headers, ssl=False) as resp:
                res = await resp.content.read()
                with open(f"./imgs/{name}","wb") as f:
                    f.write(res)
                print(f"url: {urls} 下载成功,存储文件为{name}")
    except:
        print(f"url: {urls} 下载失败")
    return "success"

async def main():
    tasks_1 = [asyncio.create_task(get_page_url(i)) for i in await get_home_url()]
    result_1 = await asyncio.gather(*tasks_1)
    result_list = []
    for i in result_1: result_list.extend(list(i))
    tasks_2 = [asyncio.create_task(get_img_url(i)) for i in result_list]
    result_2 = await asyncio.gather(*tasks_2)
    tasks_3 = []
    for name, img_url in result_2:
        tasks_3.extend(asyncio.create_task(download_img(url, name)) for url in img_url)

    await asyncio.gather(*tasks_3)


if __name__ == '__main__':
    if not os.path.isdir("./imgs"):
        os.mkdir("./imgs")
    start = time.time()
    asyncio.run(main())
    print(time.time()-start)

通过这个案例,可以看到一个问题,那就是 aiohttp的使用,每次都需要写一堆重复代码,并且整个代码结构看起来复杂,作为一个高级开发,必须要会做的就是减少代码重复编写,要将其模块化,封装起来

优化方案

aiorequest.py

class AioRequest:

    async def request(self, method: str, url: str, data: Union[Dict, bytes, None] = None, **kwargs: Any) -> Any:
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, ssl=False, data=data, **kwargs) as response:
                if response.status != 200:
                    raise Exception(f"{method.upper()} request failed with status {response.status}")
                # return await handler(await response.content.read()
                # return 这里必须带上await,但不支持 await ClientResponse 对象直接返回 必须要处理响应数据
                # 根据内容类型处理响应体
                content_type = response.headers.get('Content-Type')
                if content_type and ('image' in content_type or 'video' in content_type):
                    return await response.read()  # 返回图片或视频的二进制数据
                elif 'application/json' in content_type:
                    return await response.json()  # 假设响应是JSON格式
                else:
                    return await response.text()  # 读取文本内容

    async def get(self, url: str, **kwargs: Any):
        return await self.request("GET", url, **kwargs)

    async def post(self, url: str, data: Union[Dict, bytes], **kwargs: Any):
        return await self.request("POST", url, data=data, **kwargs)


    # 处理大文件
    async def save_binary_content(self, url: str, file_path: str, headers: Dict[str, str] = None, **kwargs: Any):
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.get(url, ssl=False, **kwargs) as response:
                if response.status != 200:
                    raise Exception(f"GET request failed with status {response.status}")
                with open(file_path, 'wb') as f:
                    while True:
                        chunk = await response.content.read(1024)  # 每次读取1024字节
                        if not chunk:
                            break
                        f.write(chunk)
                        

aiorequest = AioRequest()  # 减少对象的重复创建消耗内存

使用aiorequest 后,代码就简洁明了多了,

import asyncio
import os
import time
from utils.aiorequests import aiorequest
from lxml import etree


headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}



base_url = "https://www.xr02.vip/"



img_urls_dict = dict()

async def get_home_url():
    res  = await aiorequest.get(base_url)
    selector = etree.HTML(res)
    urls = selector.xpath('//ul/li[@class="i_list list_n2"]/a/@href')
    return map(lambda i: base_url+i, urls)


async def get_page_url(urls):
    res = await aiorequest.get(urls)
    selector = etree.HTML(res)
    await get_img_url(res)
    page_urls = selector.xpath('//div[@class="page"]/a/@href')
    page_urls = list(map(lambda i: base_url + i, set(page_urls)))
    page_urls.remove(urls)
    return page_urls

async def get_img_url(res):
    selector = etree.HTML(res)
    name = selector.xpath("//h1/text()")[0].replace("[XiuRen秀人网]",'')
    img_list = selector.xpath('//p/img/@src')
    img_list = map(lambda i: base_url+i, img_list)
    if name not in img_urls_dict:
        img_urls_dict.setdefault(name, list(img_list))
    else:
        img_urls_dict[name].extend(list(img_list))

async def get_imgs_url(urls):
    res = await aiorequest.get(urls)
    await get_img_url(res)


async def download_img(urls, base_name):
    name = os.path.basename(urls)
    name = base_name + '_' + name
    try:
        res = await aiorequest.get(urls)
        with open(f"./imgs_2/{name}","wb") as f:
            f.write(res)
        print(f"url: {urls} 下载成功,存储文件为{name}")
    except:
        print(f"url: {urls} 下载失败")
    return "success"

async def main():
    tasks_1 = [asyncio.create_task(get_page_url(i)) for i in await get_home_url()]
    result_1 = await asyncio.gather(*tasks_1)
    result_list = []
    for i in result_1: result_list.extend(i)
    tasks_2 = [asyncio.create_task(get_imgs_url(i)) for i in result_list]
    await asyncio.wait(tasks_2)
    tasks_3 = []
    for name, img_url in img_urls_dict.items():
        tasks_3.extend(asyncio.create_task(download_img(url, name)) for url in img_url)
    await asyncio.wait(tasks_3)



if __name__ == '__main__':
    if not os.path.isdir("./imgs_2"):
        os.mkdir("./imgs_2")
    start = time.time()
    asyncio.run(main())
    print(time.time()-start)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/498277.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】带头双向链表的实现

👑个人主页:啊Q闻 🎇收录专栏:《数据结构》 🎉道阻且长,行则将至 前言 带头双向链表是链表的一种,相较于单链表的实现,其更为简单 一.初识带头双向循环链表 带头…

【漏洞分析】浅析android手游lua脚本的加密与解密(二)

反编译本人用到的是luajit-decomp,这里需要注意,luajit-decomp默认的lua版本为5.1,luajit版本为2.0.2,我们需要下载对应lua和luajit的版本,编译后替换luajit-decomp下的lua51.dll、luajit.exe、jit文件夹。反编译时需要注意的文件和文件夹: 这里需要下载版本为2.1.0-bet…

用 AI 编程-释放ChatGPT的力量

最近读了本书,是 Sean A Williams 写的,感觉上还是相当不错的。一本薄薄的英文书,还真是写的相当好。如果你想看,还找不到,可以考虑私信我吧。 ChatGPT for Coders Unlock the Power of AI with ChatGPT: A Comprehens…

SAP-CO主数据之统计指标创建-<KK01>

公告:周一至周五每日一更,周六日存稿,请您点“关注”和“在看”,后续推送的时候不至于看不到每日更新内容,感谢。 目录 一、背景: 成本中心主数据创建:传送门 成本要素主数据创建&#xff1…

OpenHarmony实战开发-滑动容器组件Swiper的使用

介绍 本篇Codelab主要介绍了滑动容器组件Swiper的几种常见的应用场景,包括顶部导航、轮播图以及视频滑动播放。 相关概念 Swiper:滑动容器,提供子组件切换滑动的能力。Stack:堆叠容器,子组件按照顺序依次入栈&#x…

康耐视visionpro-CogFindLineTool工具详细说明

CogFindeLineTool功能说明: 检测图像的直线边缘,实现边缘的定位、测量。 CogFindeLineTool操作说明: ①.打开工具栏,双击或点击鼠标拖拽添加CogFindLineTool工具 ②.添加输入图像,点击鼠标右键“链接到”选择输入图像或以连线拖拽的方式选择相应输入图像 ③. 所选空间名…

振弦采集仪在预防地质灾害监测中的作用与应用前景

振弦采集仪在预防地质灾害监测中的作用与应用前景 振弦采集仪(String Vibrating Sensor,简称SVM)是一种用于地质灾害监测的重要仪器,它通过测量地面振动信号来预测和预警地质灾害的发生。SVM的作用在于提供实时、准确的地质灾害监…

威联通安装Kafka

最近在学习 Kafka 的知识,遇到一些问题网上搜到的信息不全。想要在本地安装一个 Kafka 进行验证,想到了之前买的 Nas 就开始折腾。 用 Docker 的方式安装 Kafka 现在的 Nas 很多都支持 Docker,我买的也支持。威联通的 Docker 叫 Container S…

AugmentedReality之路-通过蓝图启动AR相机(2)

本文实现打开AR相机和关闭AR相机功能,在主界面点击Start AR按钮后打开AR相机,在主界面点击Stop AR按钮后关闭AR相机 1、启动AR相关插件 通过Edit->Plugins启用AugmentedReality下面的所有插件 2、自定义Pawn 在Content->ARBase目录右键&…

如何降低 BlueNRG-LPS 的开机峰值电流

1. 前言 BlueNRG 系列存在开机瞬间会出现很大的峰值电流的现象,预计有 20ma 左右。针对此现象,经常有客户询问该峰值电流会不会导致设备工作异常?会不会导致电池使用寿命缩短(考虑到一般纽扣电池能承受的峰值电流大概在 15ma 左右…

B64843-4M 1553B总线 控制时序、寄存器介绍。

B64843-4M系统架构 注: 1 、 CPU ADDRESS LATCH 信号由带地址 / 数据复用总线的处理器提供,对不带地址 / 数据复用总线的处理器,CPU ADDRESS LATCH 信号与 3.3V 信号连接。 2、如果 POLARITY_SEL="1" , RD/信号为高时读使能,为低时写使POLARITY_S…

Codeforces Round 937 (Div. 4)

Codeforces Round 937 (Div. 4) Codeforces Round 937 (Div. 4) A. Stair, Peak, or Neither? 题意&#xff1a;略 思路&#xff1a;照着题模拟&#xff1b; AC code&#xff1a; void solve() {int a, b, c; cin >> a >> b >> c;if (a < b) {if (…

【一种基于改进A*算法和CSA-APF算法的混合路径规划方法】—— 论文阅读

论文题目&#xff1a;A Hybrid Path Planning Method Based on Improved A∗ and CSA-APF Algorithms 1 摘要 大问题&#xff1a;复杂动态环境下全局路径规划难以避开动态障碍物&#xff0c;且局部路径容易陷入局部最优的问题 问题1&#xff1a;针对A*算法产生冗余路径节点和…

基于Python的电商特产数据可视化分析与推荐系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 利用网络爬虫技术从某东采集某城市的特产价格、销量、评论等数据&#xff0c;经过数据清洗后存入数据库&#xff0c;并实现特产销售、市场占有率、价格区间等多维度的可视化统计分析&#xff0c;并…

2024蓝桥杯每日一题(背包2)

备战2024年蓝桥杯 -- 每日一题 Python大学A组 试题一&#xff1a;包子凑数 试题二&#xff1a;砝码称重 试题三&#xff1a;倍数问题 试题一&#xff1a;包子称重 【题目描述】 小明几乎每天早晨都会在一家包子铺吃早餐。他发现这家包子铺有 N 种蒸笼&#xf…

row_number 函数和关联更新

生成测试数据&#xff0c;房间号数据如下&#xff1a; CREATE TABLE hotel (floor_nbr,room_nbr) ASSELECT 1,100 FROM DUAL UNION ALLSELECT 1,100 FROM DUAL UNION ALLSELECT 2,100 FROM DUAL UNION ALLSELECT 2,100 FROM DUAL UNION ALLSELECT 3,100 FROM DUAL; 里面的房间号…

Ubuntu18.04安装wireshark

安装wireshark 环境Ubuntu18.04 1.使用root用户进行安装 2.将 wireshark-dev/stable PPA 添加到系统的软件源列表中。系统就可以从该PPA获取Wireshark软件包及其更新了。 apt-add-repository ppa:wireshark-dev/stable3.确保你系统上的软件包信息是最新的&#xff0c;这样在…

若依 3.8.7版本springboot前后端分离 整合mabatis plus

1.去掉mybatis 这一步我没有操作&#xff0c;看别人的博客有说不去掉可能冲突&#xff0c;也可能不冲突&#xff0c;我试下来就没去掉如需要去除&#xff0c;到总的pom.xml中properties标签下的<mybatis-spring-boot.version>x.x.x</mybatis-spring-boot.version>…

灰色预测模型以及matlab软件使用

1&#xff0c;灰色系统简介 著名学者邓聚龙教授于20世纪70年代末、80年代初提出&#xff1a; “ The诞生标志:邓教授第一篇灰色系统论文Control Problems of Grey Systems”&#xff0c;发表于北荷兰出版公司期刊 System & Control Letter,1982, No.5. 1.1 灰色系统&…

|行业洞察·香氛|《小红书2023香水香氛营销宝典-71页》

报告内容的详细解读&#xff1a; 行业格局 预计到2025年&#xff0c;香水市场规模将超过300亿&#xff0c;小红书成为香水种草的重要平台。从2018年到2025年&#xff0c;市场规模持续增长&#xff0c;年增速保持在20%左右。香水市场的热度在节日节点尤为明显&#xff0c;如情…
最新文章