还是希望能比较系统地回顾一下爬虫的一些知识,不过有一些我也记不清了。

可能对小白不是很友好。

如果看视频可以参考这个

Python爬虫从入门到小黑屋

概论

爬虫的矛与盾

robots.txt协议: 君子协议。规定了网站中哪些数据可以被爬取哪些数据不可以被爬取。

Request与第一个爬虫

1
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple requests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests

if __name__ == '__main__':
url = "https://fanyi.baidu.com/sug"

s = input("请输入你要翻译的英文单词")
dat = {
"kw": s
}

# 发送post请求, 发送的数据必须放在字典中, 通过data参数进行传递
resp = requests.post(url, data=dat)
print(resp.json()) # 将服务器返回的内容直接处理成json() => dict
resp.close()

数据解析

有时我们不需要爬取整个界面的内容,我们只希望提取出一小部分从而提高效率,有不同的解析方式可以使用,混合使用也无妨。

正则表达式

Python中使用re模块进行正则匹配,re中封装了一些函数,不过还是喜欢直接写正则表达式。

1
2
3
使用函数:
findall: 匹配字符串中所有的符合正则的内容,返回list
finditer: 匹配字符串中所有的内容[返回的是迭代器], 从迭代器中拿到内容需要.group()

有些正则含有特殊符号比如\n,\d等等,PyCharm可能有Warning,前面加个r就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import re

# 预加载正则表达式
obj = re.compile(r"\d+")

ret = obj.finditer("我的电话号是:10086, 我女朋友的电话是:10010")
for it in ret:
print(it.group())

ret = obj.findall("呵呵哒, 我就不信你不换我1000000000")
print(ret)

s = """
<div class='jay'><span id='1'>郭麒麟</span></div>
<div class='jj'><span id='2'>宋铁</span></div>
<div class='jolin'><span id='3'>大聪明</span></div>
<div class='sylar'><span id='4'>范思哲</span></div>
<div class='tory'><span id='5'>胡说八道</span></div>
"""

# (?P<分组名字>正则) 可以单独从正则匹配的内容中进一步提取内容
obj = re.compile(r"<div class='.*?'><span id='(?P<id>\d+)'>(?P<wahaha>.*?)</span></div>", re.S) # re.S: 让.能匹配换行符

result = obj.finditer(s)
for it in result:
print(it.group("wahaha"))
print(it.group("id"))

正则样例:豆瓣top250

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 拿到页面源代码.   requests
# 通过re来提取想要的有效信息 re
import requests
import re
import csv
if __name__ == '__main__':

url = "https://movie.douban.com/top250"
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36"
}
resp = requests.get(url, headers=headers)
page_content = resp.text

# 解析数据
obj = re.compile('<li>.*?<div class="item">.*?<span class="title">(?P<name>.*?)'
'</span>.*?<p class="">.*?<br>(?P<year>.*?)&nbsp.*?<span '
'class="rating_num" property="v:average">(?P<score>.*?)</span>.*?'
'<span>(?P<num>.*?)人评价</span>', re.S)
# 开始匹配
result = obj.finditer(page_content)
f = open("data.csv", mode="w")
csvwriter = csv.writer(f)
for it in result:
dic = it.groupdict()
dic['year'] = dic['year'].strip()
csvwriter.writerow(dic.values())

f.close()
resp.close()
print("over!")

其中.*?过滤标签后可能的换行或者空格,或者一些不必要的标签内容。

bs4

bs4的原理就是解析检索html标签,对于文件路径直接写在网站源码里的情况十分方便。

通常使用的方法是find()find_all()以及get(),故名思意,前两个是标签查找,会查找到相应的标签,并且可以通过标签内属性值进行过滤,后一个则是获取标签中属性的内容。具体用法还是看样例。

样例

这个样例是爬图片,一个问题是图片链接在二级子页面中,所以先要获取子页面源码

先找到这个div下的所有表项。

然后在其中找到子页面的路径

到子页面中找到图片链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 1.拿到主页面的源代码. 然后提取到子页面的链接地址, href
# 2.通过href拿到子页面的内容. 从子页面中找到图片的下载地址 img -> src
# 3.下载图片
import requests
from bs4 import BeautifulSoup
import time

url = "https://www.umei.cc/bizhitupian/weimeibizhi/"
resp = requests.get(url)
resp.encoding = 'utf-8' # 处理乱码

# print(resp.text)
# 把源代码交给bs
main_page = BeautifulSoup(resp.text, features="html.parser")
resp.close()
alist = main_page.find('div', class_="swiper-wrapper after").find_all("a")
print(alist)
for a in alist:
href = a.get('href') # 直接通过get就可以拿到属性的值
print(href)
# 拿到子页面的源代码
child_page_resp = requests.get('https://www.umei.cc' + href)
child_page_resp.encoding = 'utf-8'
child_page_text = child_page_resp.text
# 从子页面中拿到图片的下载路径
child_page = BeautifulSoup(child_page_text, "html.parser")
child_page_resp.close()
src = child_page.find('section', class_="img-content").find('img').get('src')
# 下载图片
img_resp = requests.get(src)
# img_resp.content # 这里拿到的是字节
img_name = src.split("/")[-1] # 拿到url中的最后一个/以后的内容
with open("img/" + img_name, mode="wb") as f:
f.write(img_resp.content) # 图片内容写入文件

print("over!!!", img_name)

print("all over!!!")

xpath

常用来直接解析网页源码,对于html标签,我们可以将其转化成树的形式,就可以通过树的父子关系进行提取。

div[2]代表第ndiv标签,div表示该层标签只有唯一一个,最后使用text()提取标签内数据。

获取xpath的方法:浏览器控制台 -> Elements -> 找到标签所在块 -> 右键选Copy -> 选 Copy full Xpath

样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import requests
from lxml import etree

url = "https://www.runoob.com/python/att-string-strip.html"
resp = requests.get(url,proxies={"https":"127.0.0.1:7890"})
# print(resp.text)

# 解析
html = etree.HTML(resp.text)

divs = html.xpath("/html/body/div[4]/div/div[2]/div/div[3]/div/h1/text()")
# 甚至可以这样
h1 = html.xpath("/html/body/div[4]/div/div[2]/div/div[3]/div/h1")
res = h1.xpath("./text()")
print(divs)

模拟登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 登录 -> 得到cookie
# 带着cookie 去请求到书架url -> 书架上的内容

# 必须得把上面的两个操作连起来
# 我们可以使用session进行请求 -> session你可以认为是一连串的请求. 在这个过程中的cookie不会丢失
import requests

# 会话
session = requests.session()
data = {
"loginName": "你的用户名",
"password": "密码"
}

# 1. 登录
url = "https://passport.17k.com/ck/user/login"
resp = session.post(url, data=data)
print(resp.text)
print(resp.cookies)

# 2. 拿书架上的数据
# 刚才的那个session中是有cookie的
resp = session.get('https://user.17k.com/ck/author/shelf?page=1&appKey=2406394919')
print(resp.json())
resp.close()

防盗链

有些资源(视频)使用防盗链标记打开视频的界面,在请求时没有放到链就会被认为是非法请求。

同时有些视频的video链接并不显式地写在页面源代码里,此处以梨视频爬取为例。

首先检查源代码我们只能找到视频封面,但在播放时审查界面元素我们是可以找到视频播放地址的,说明地址是动态生成的。

比如我所看的这个视频链接是:

https://video.pearvideo.com/mp4/adshort/20220607/cont-1764629-15892192_adpkg-ad_hd.mp4

刷新界面,使用控制台进行跟踪,选中Fetch/XHR我们会找到一个请求,进行预览就会发现一个貌似是视频链接的链接:

https://video.pearvideo.com/mp4/adshort/20220607/1657102159673-15892192_adpkg-ad_hd.mp4

但一比对就会发现好像不太对,我们找到的这个还不太一样,观察这个请求就会发现其实应该把下面链接中不一样的那部分换上cont-加上请求中的systemTime参数值,然后我们就可以下载视频。

但如果请求头中没有加Referer参数,你会发现返回结果是 “视频已下架”。加上即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 1. 拿到contId
# 2. 拿到videoStatus返回的json. -> srcURL
# 3. srcURL里面的内容进行修整
# 4. 下载视频
import requests

# 拉取视频的网址
url = "https://www.pearvideo.com/video_1721605"
contId = url.split("_")[1]

videoStatusUrl = f"https://www.pearvideo.com/videoStatus.jsp?contId={contId}"

headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36",
# 防盗链: 溯源, 当前本次请求的上一级是谁
"Referer": url
}

resp = requests.get(videoStatusUrl, headers=headers)
dic = resp.json()

srcUrl = dic['videoInfo']['videos']['srcUrl']
systemTime = dic['systemTime']
srcUrl = srcUrl.replace(systemTime, f"cont-{contId}")

# 下载视频
with open("a.mp4", mode="wb") as f:
f.write(requests.get(srcUrl).content)

加密数据的解析-以抓取网易云热评为例

这里是我自己实践的过程,具体详细步骤可以参看B站视频

同样我们可以找到请求所有评论的url(当然这就意味着不是写在源代码里的)

这样我们可以确定请求的链接

题外话:请求头中也包括了防盗链如果请求失败(没有返回)可以考虑防盗链,或者考虑加上里面的user-agent字段。

我们来查看请求的参数,但发现是加密过的,而且如果不加,请求是没有返回值的,我们的难点在于确定加密过程与原来的参数。我们查看请求的调用栈

调用栈自下向上是调用的从先到后的顺序,我们查看最后调用的代码,也就是第一行。

注意进入后使用左下角的一对大括号对代码进行格式化。

js代码中打断点,刷新界面,点击resume直到到达我们的目标url,我们会发现断点处请求的数据是加密过的(废话

我们沿着调用栈 (Call Stack) 一层一层向下找。我们会找到一层调用栈,数据是明文的,那么加密的过程就在下一层调用栈中。

我们最终会找到加密代码与赋值的代码

直接ctrl+f搜索就能找到加密用的函数,我们就可以模拟这个加密过程进行加密

注意:常用的序列化与反序列化

json.dumps()----将Python的字典数据转换成json字符,数据的最外面都添加一层""变为字符串,这也是数据的序列化步骤

json.loads()----将json字符串数据转换为字典或列表(去掉外面一层""),这个也是反序列化的步骤。

多线程

前言

作者:DarrenChan陈驰
链接:https://www.zhihu.com/question/23474039/answer/269526476

在介绍Python中的线程之前,先明确一个问题,Python中的多线程是假的多线程! 为什么这么说,我们先明确一个概念,全局解释器锁(GIL)。

Python代码的执行由Python虚拟机(解释器)来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。在多线程环境中,Python虚拟机按照以下方式执行。

1.设置GIL。

2.切换到一个线程去执行。

3.运行。

4.把线程设置为睡眠状态。

5.解锁GIL。

6.再次重复以上步骤。

对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。

我们都知道,比方我有一个4核的CPU,那么这样一来,在单位时间内每个核只能跑一个线程,然后时间片轮转切换。但是Python不一样,它不管你有几个核,单位时间多个核只能跑一个线程,然后时间片轮转。看起来很不可思议?但是这就是GIL搞的鬼。任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

我们不妨做个试验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#coding=utf-8
from multiprocessing import Pool
from threading import Thread

from multiprocessing import Process


def loop():
while True:
pass

if __name__ == '__main__':

for i in range(3):
t = Thread(target=loop)
t.start()

while True:
pass

我的电脑是4核,所以我开了4个线程,看一下CPU资源占有率:

我们发现CPU利用率并没有占满,大致相当于单核水平。

而如果我们变成进程呢?

我们改一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#coding=utf-8
from multiprocessing import Pool
from threading import Thread

from multiprocessing import Process


def loop():
while True:
pass

if __name__ == '__main__':

for i in range(3):
t = Process(target=loop)
t.start()

while True:
pass

结果直接飙到了100%,说明进程是可以利用多核的!

为了验证这是Python中的GIL搞得鬼,我试着用Java写相同的代码,开启线程,我们观察一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.darrenchan.thread;

public class TestThread {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {

@Override
public void run() {
while (true) {}
}
}).start();
}
while(true){}
}
}

由此可见,Java中的多线程是可以利用多核的,这是真正的多线程!而Python中的多线程只能利用单核,这是假的多线程!

难道就如此?我们没有办法在Python中利用多核?当然可以!刚才的多进程算是一种解决方案,还有一种就是调用C语言的链接库。对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。我们可以把一些 计算密集型任务用C语言编写,然后把.so链接库内容加载到Python中,因为执行C代码,GIL锁会释放,这样一来,就可以做到每个核都跑一个线程的目的!

可能有的小伙伴不太理解什么是计算密集型任务,什么是I/O密集型任务?

计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

综上,Python多线程相当于单核多线程,多线程有两个好处:CPU并行,IO并行,单核多线程相当于自断一臂。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

多线程的两种形式

1
2
3
4
5
6
7
8
9
10
11
def func(name):  
for i in range(1000):
print(name, i)


if __name__ == '__main__':
t1 = Thread(target=func, args=("t1",)) # 传递参数必须是元组
t1.start()

t2 = Thread(target=func, args=("t2",))
t2.start()

上面其实给出了多线程常用的一种形式,也可以通过面向对象的方式:

1
2
3
4
5
6
7
8
9
10
11
from threading import Thread


class MyThread(Thread):
def run(self):
print("子线程")


if __name__ == '__main__':
t = MyThread()
t.start()

多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process


def func():
for i in range(1000):
print("子进程", i)


if __name__ == '__main__':
p = Process(target=func)
p.start()
for i in range(1000):
print("主进程", i)
1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process


class MyProcess(Process):
def run(self):
for i in range(1000):
print("子进程", i)


if __name__ == '__main__':
t = MyProcess()
t.start()

线程池与进程池

通过这种方式减少垃圾对象回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def fn(name):
for i in range(1000):
print(name, i)


if __name__ == '__main__':
# 创建线程池
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(fn, name=f"线程{i}")
# 等待线程池中的任务全部执行完毕. 才继续执行后面代码(守护)
print("123")

协程

转自 https://zhuanlan.zhihu.com/p/68043798,有改动

由于GIL的存在,导致Python多线程性能甚至比单线程更糟。

GIL: 全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。

于是出现了协程(Coroutine)这么个东西。

协程: 协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行.

协程由于由程序主动控制切换,没有线程切换的开销,所以执行效率极高。对于IO密集型任务非常适用,如果是cpu密集型,推荐多进程+协程的方式。

进程/线程:操作系统提供的一种并发处理任务的能力。

协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内的任务调度技巧。

多进程和多线程体现的是操作系统的能力,而协程体现的是程序员的流程控制能力。

Python3.4之前,官方没有对协程的支持,存在一些三方库的实现,比如geventTornado。3.4之后就内置了asyncio标准库,官方真正实现了协程这一特性。

Python对协程的支持,是通过Generator实现的,协程是遵循某些规则的生成器。因此,我们在了解协程之前,我们先要学习生成器。

生成器(Generator)

我们这里主要讨论yieldyield from这两个表达式,这两个表达式和协程的实现息息相关。

  • Python2.5中引入yield表达式,参见PEP342
  • Python3.3中增加yield from语法,参见PEP380

方法中包含yield表达式后,Python会将其视作generator对象,不再是普通的方法。

yield表达式的使用

yield的语法规则是:在yield这里暂停函数的执行,并返回yield后面表达式的值(默认为None),直到被next()方法再次调用时,从上次暂停的yield代码处继续往下执行。当没有可以继续next()的时候,抛出异常,该异常可被for循环处理。

该表达式的具体使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def test():
print("generator start")
n = 1
while True:
yield_expression_value = yield n
print(f"yield_expression_value = {yield_expression_value}")
n += 1


# ①创建generator对象
generator = test()
print(type(generator))

print("\n---------------\n")

# ②启动generator
next_result = generator.__next__()
print(f"next_result = {next_result}")

print("\n---------------\n")

# ③发送值给yield表达式
send_result = generator.send(666)
print(f"send_result = {send_result}")

执行结果:

1
2
3
4
5
6
7
8
9
10
11
<class 'generator'>

---------------

generator start
next_result = 1

---------------

yield_expression_value = 666
send_result = 2

方法说明:

  • __next__()方法: 作用是启动或者恢复generator的执行,相当于send(None)
  • send(value)方法:作用是发送值给yield表达式。启动generator则是调用send(None)

执行结果的说明:

  • ①创建generator对象:包含yield表达式的函数将不再是一个函数,调用之后将会返回generator对象
  • ②启动generator:使用生成器之前需要先调用__next__或者send(None),否则将报错。启动generator后,代码将执行到yield出现的位置,也就是执行到yield n,然后将n传递到generator.__next__()这行的返回值。(注意,生成器执行到yield n后将暂停在这里,直到下一次生成器被启动)
  • ③发送值给yield表达式:调用send方法可以发送值给yield表达式,同时恢复生成器的执行。生成器从上次中断的位置继续向下执行,然后遇到下一个yield,生成器再次暂停,切换到主函数打印出send_result

理解这个demo的关键是:生成器启动或恢复执行一次,将会在yield处暂停,返回yield后面表达式的值。上面的第②步仅仅执行到了yield n,并没有执行到赋值语句,到了第③步,生成器恢复执行才给yield_expression_value赋值。

生产者和消费者模型

上面的例子中,代码中断–>切换执行,体现出了协程的部分特点。

我们再举一个生产者、消费者的例子,这个例子来自廖雪峰的Python教程

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
现在改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def consumer():
print("[CONSUMER] start")
r = 'start'
while True:
n = yield r
if not n:
print("n is empty")
continue
print("[CONSUMER] Consumer is consuming %s" % n)
r = "200 ok"


def producer(c):
# 启动generator
start_value = c.send(None)
print(start_value)
n = 0
while n < 3:
n += 1
print("[PRODUCER] Producer is producing %d" % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
# 关闭generator
c.close()


# 创建生成器
c = consumer()
# 传入generator
producer(c)

执行结果:

1
2
3
4
5
6
7
8
9
10
11
[CONSUMER] start
start
[PRODUCER] producer is producing 1
[CONSUMER] consumer is consuming 1
[PRODUCER] Consumer return: 200 ok
[PRODUCER] producer is producing 2
[CONSUMER] consumer is consuming 2
[PRODUCER] Consumer return: 200 ok
[PRODUCER] producer is producing 3
[CONSUMER] consumer is consuming 3
[PRODUCER] Consumer return: 200 ok

注意到consumer函数是一个generator,把一个consumer传入produce后:

  1. 首先调用c.send(None)启动生成器;
  2. 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
  3. consumer通过yield拿到消息,处理,又通过yield把结果传回;
  4. produce拿到consumer处理的结果,继续生产下一条消息;
  5. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。

整个流程无锁,由一个线程执行,produceconsumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

yield from表达式

Python3.3版本新增yield from语法,新语法用于将一个生成器部分操作委托给另一个生成器。此外,允许子生成器(即yield from后的“参数”)返回一个值,该值可供委派生成器(即包含yield from的生成器)使用。并且在委派生成器中,可对子生成器进行优化。

我们先来看最简单的应用,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 子生成器
def test(n):
i = 0
while i < n:
yield i
i += 1

# 委派生成器
def test_yield_from(n):
print("test_yield_from start")
yield from test(n)
print("test_yield_from end")


for i in test_yield_from(3):
print(i)

输出:

1
2
3
4
5
test_yield_from start
0
1
2
test_yield_from end

这里我们仅仅给这个生成器添加了一些打印,如果是正式的代码中,你可以添加正常的执行逻辑。

如果上面的test_yield_from函数中有两个yield from语句,将串行执行。比如将上面的test_yield_from函数改写成这样:

1
2
3
4
5
6
def test_yield_from(n):
print("test_yield_from start")
yield from test(n)
print("test_yield_from doing")
yield from test(n)
print("test_yield_from end")

将输出:

1
2
3
4
5
6
7
8
9
test_yield_from start
0
1
2
test_yield_from doing
0
1
2
test_yield_from end

在这里,yield from起到的作用相当于下面写法的简写形式

1
2
for item in test(n):
yield item

看起来这个yield from也没做什么大不了的事,其实它还帮我们处理了异常之类的。具体可以看stackoverflow上的这个问题:In practice, what are the main uses for the new “yield from” syntax in Python 3.3?

协程(Coroutine)

  • Python3.4开始,新增了asyncio相关的API,语法使用[@asyncio.coroutine](mailto:@asyncio.coroutine)和yield from实现协程
  • Python3.5中引入async/await语法,参见PEP492

我们先来看Python3.4的实现。

[@asyncio.coroutine](mailto:@asyncio.coroutine)

Python3.4中,使用[@asyncio.coroutine](mailto:@asyncio.coroutine)装饰的函数称为协程。不过没有从语法层面进行严格约束。

理解Python装饰器(Decorator)

Python装饰器看起来类似Java中的注解,然鹅和注解并不相同,不过同样能够实现面向切面编程。

想要理解Python中的装饰器,不得不先理解闭包(closure)这一概念。

闭包

看看维基百科中的解释:

在计算机科学中,闭包(英语:Closure),又称词法闭包(Lexical Closure)或函数闭包(function closures),是引用了自由变量的函数。这个被引用的自由变量将和这个函数一同存在,即使已经离开了创造它的环境也不例外。

官方的解释总是不说人话,but–talk is cheap,show me the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# print_msg是外围函数
def print_msg():
msg = "I'm closure"

# printer是嵌套函数
def printer():
print(msg)

return printer


# 这里获得的就是一个闭包
closure = print_msg()
# 输出 I'm closure
closure()

# 上面两行代码相当于print_msg()()

msg是一个局部变量,在print_msg函数执行之后应该就不会存在了。但是嵌套函数引用了这个变量,将这个局部变量封闭在了嵌套函数中,这样就形成了一个闭包。

结合这个例子再看维基百科的解释,就清晰明了多了。闭包就是引用了自有变量的函数,这个函数保存了执行的上下文,可以脱离原本的作用域独立存在

装饰器

一个普通的装饰器一般是这样:

1
2
3
4
5
6
7
8
9
10
11
import functools


def log(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print('call %s():' % func.__name__)
print('args = {}'.format(*args))
return func(*args, **kwargs)

return wrapper

这样就定义了一个打印出方法名及其参数的装饰器。

调用:

1
2
3
4
5
@log
def test(p):
print(test.__name__ + " param: " + p)

test("I'm a param")

输出:

1
2
3
call test():
args = I'm a param
test param: I'm a param

装饰器在使用时,用了@语法,让人有些困扰。其实,装饰器只是个方法,与下面的调用方式没有区别:

1
2
3
4
5
def test(p):
print(test.__name__ + " param: " + p)

wrapper = log(test)
wrapper("I'm a param")

@语法只是将函数传入装饰器函数,并无神奇之处。

值得注意的是@functools.wraps(func),这是python提供的装饰器。它能把原函数的元信息拷贝到装饰器里面的 func 函数中。函数的元信息包括docstring、name、参数列表等等。可以尝试去除@functools.wraps(func),你会发现test.__name__的输出变成了wrapper。

带参数的装饰器

装饰器允许传入参数,一个携带了参数的装饰器将有三层函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import functools

def log_with_param(text):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print('call %s():' % func.__name__)
print('args = {}'.format(*args))
print('log_param = {}'.format(text))
return func(*args, **kwargs)

return wrapper

return decorator

@log_with_param("param")
def test_with_param(p):
print(test_with_param.__name__)

看到这个代码是不是又有些疑问,内层的decorator函数的参数func是怎么传进去的?和上面一般的装饰器不大一样啊。

其实道理是一样的,将其@语法去除,恢复函数调用的形式一看就明白了:

1
2
3
4
5
6
# 传入装饰器的参数,并接收返回的decorator函数
decorator = log_with_param("param")
# 传入test_with_param函数
wrapper = decorator(test_with_param)
# 调用装饰器函数
wrapper("I'm a param")

输出结果与正常使用装饰器相同:

1
2
3
4
call test_with_param():
args = I'm a param
log_param = param
test_with_param

至此,装饰器这个有点费解的特性也没什么神秘了。

装饰器这一语法体现了Python中函数是第一公民,函数是对象、是变量,可以作为参数、可以是返回值,非常的灵活与强大。

作者:聪明叉
链接:https://www.jianshu.com/p/ee82b941772a
来源:简书

对于Python原生支持的协程来说,Python对协程和生成器做了一些区分,便于消除这两个不同但相关的概念的歧义:

  • 标记了[@asyncio.coroutine](mailto:@asyncio.coroutine)装饰器的函数称为协程函数,iscoroutinefunction()方法返回True
  • 调用协程函数返回的对象称为协程对象,iscoroutine()函数返回True

举个栗子,我们给上面yield from的demo中添加[@asyncio.coroutine](mailto:@asyncio.coroutine)

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

...

@asyncio.coroutine
def test_yield_from(n):
...

# 是否是协程函数
print(asyncio.iscoroutinefunction(test_yield_from))
# 是否是协程对象
print(asyncio.iscoroutine(test_yield_from(3)))

毫无疑问输出结果是True。

可以看下[@asyncio.coroutine](mailto:@asyncio.coroutine)的源码中查看其做了什么,我将其源码简化下,大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import functools
import types
import inspect

def coroutine(func):
# 判断是否是生成器
if inspect.isgeneratorfunction(func):
coro = func
else:
# 将普通函数变成generator
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
res = yield from res
return res
# 将generator转换成coroutine
wrapper = types.coroutine(coro)
# For iscoroutinefunction().
wrapper._is_coroutine = True
return wrapper

将这个装饰器标记在一个生成器上,就会将其转换成coroutine

然后,我们来实际使用下[@asyncio.coroutine](mailto:@asyncio.coroutine)yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y

@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
print("start")
# 中断调用,直到协程执行结束
loop.run_until_complete(print_sum(1, 2))
print("end")
loop.close()

执行结果:

1
2
3
4
start
Compute 1 + 2 ...
1 + 2 = 3
end

print_sum这个协程中调用了子协程compute,它将等待compute执行结束才返回结果。

这个demo点调用流程如下图:

EventLoop将会把print_sum封装成Task对象

流程图展示了这个demo的控制流程,不过没有展示其全部细节。比如其中“暂停”的1s,实际上创建了一个future对象, 然后通过BaseEventLoop.call_later()在1s后唤醒这个任务。

值得注意的是,[@asyncio.coroutine](mailto:@asyncio.coroutine)将在Python3.10版本中移除。

async/await

Python3.5开始引入async/await语法(PEP 492),用来简化协程的使用并且便于理解。

async/await实际上只是[@asyncio.coroutine](mailto:@asyncio.coroutine)yield from的语法糖:

  • 把[@asyncio.coroutine](mailto:@asyncio.coroutine)替换为async
  • yield from替换为await

即可。

比如上面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio


async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y


async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))


loop = asyncio.get_event_loop()
print("start")
loop.run_until_complete(print_sum(1, 2))
print("end")
loop.close()

我们再来看一个asyncioFuture的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

future = asyncio.Future()


async def coro1():
print("wait 1 second")
await asyncio.sleep(1)
print("set_result")
future.set_result('data')


async def coro2():
result = await future
print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1()
coro2()
]))
loop.close()

输出结果:

1
2
3
4
wait 1 second
(大约等待1秒)
set_result
data

这里await后面跟随的future对象,协程中yield from或者await后面可以调用future对象,其作用是:暂停协程,直到future执行结束或者返回result或抛出异常。

而在我们的例子中,await future必须要等待future.set_result('data')后才能够结束。将coro2()作为第二个协程可能体现得不够明显,可以将协程的调用改成这样:

1
2
3
4
5
6
7
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
# coro1(),
coro2(),
coro1()
]))
loop.close()

输出的结果仍旧与上面相同。

其实,async这个关键字的用法不止能用在函数上,还有async with异步上下文管理器,async for异步迭代器. 对这些感兴趣且觉得有用的可以网上找找资料,这里限于篇幅就不过多展开了。

使用协程

协程有很多种写法,我们以一种最流行的写法为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import time

async def func1():
print("你好啊, 我叫潘金莲")
await asyncio.sleep(3)
print("你好啊, 我叫潘金莲")


async def func2():
print("你好啊, 我叫王建国")
await asyncio.sleep(2)
print("你好啊, 我叫王建国")


async def func3():
print("你好啊, 我叫李雪琴")
await asyncio.sleep(4)
print("你好啊, 我叫李雪琴")


async def main():
tasks = [
asyncio.create_task(func1()), # py3.8以后加上asyncio.create_task()
asyncio.create_task(func2()),
asyncio.create_task(func3())
]
await asyncio.wait(tasks)


if __name__ == '__main__':
t1 = time.time()
# 一次性启动多个任务(协程)
asyncio.run(main())
t2 = time.time()
print(t2 - t1)

以爬取网页代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import time

async def download(url):
print("准备开始下载")
resp = requests.get(url) # 网络请求 requests.get()
print(resp.text)
resp.close()
await asyncio.sleep(2)
print("下载完成")


async def main():
urls = [
"http://www.baidu.com",
"http://www.bilibili.com",
"http://www.163.com"
]

# 准备异步协程对象列表
tasks = []
for url in urls:
d = asyncio.create_task(download(url))
tasks.append(d)

# tasks = [asyncio.create_task(download(url)) for url in urls] # 这么干也行哦~

# 一次性把所有任务都执行
await asyncio.wait(tasks)


if __name__ == '__main__':
asyncio.run(main())

异步网络请求

aiohttp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# requests.get() 同步的代码 -> 异步操作aiohttp
# pip install aiohttp

import asyncio
import aiohttp

urls = [
"http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg",
"http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg",
"http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg"
]


async def aiodownload(url):
# 发送请求.
# 得到图片内容
# 保存到文件
name = url.rsplit("/", 1)[1] # 从右边切, 切一次. 得到[1]位置的内容
async with aiohttp.ClientSession() as session: # requests
async with session.get(url) as resp: # resp = requests.get()
# 请求回来了. 写入文件
# async with aiofiles.open(name, mode="wb") as f: #异步写文件
with open(name, mode="wb") as f: # 创建文件
f.write(await resp.content.read()) # 读取内容是异步的. 需要await挂起, resp.text()

print(name, "搞定")


async def main():
tasks = []
for url in urls:
tasks.append(aiodownload(url))

await asyncio.wait(tasks)


if __name__ == '__main__':
asyncio.run(main())

可以参考:https://www.cnblogs.com/ssyfj/p/9222342.html

注意事项

转自https://blog.csdn.net/lymmurrain/article/details/109037460

简单请求

如果只发出简单的请求(如只有一次请求,无需cookie,SSL,等),可用如下方法。

但其实吧很少用,因为一般爬虫中用协程都是要爬取大量页面,可能会使得aiohttpUnclosed client session的错误。这种情况官方是建议用ClientSession(连接池,见下文)的,性能也有一定的提高。

1
2
3
4
5
6
7
8
9
10
import aiohttp

async def fetch():
async with aiohttp.request('GET',
'http://python.org/') as resp:
assert resp.status == 200
print(await resp.text())
#将协程放入时间循环
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())

使用连接池请求

一般情况下使用如下示例,由官网摘抄。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import aiohttp
import asyncio

#传入client使用
async def fetch(client,url):
async with client.get(url) as resp:
assert resp.status == 200
return await resp.text()

async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client,url)
print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

是不是感觉有点绕呢,其实平时使用是不必这样将fetch函数抽象出去,可以简单写成下面的简洁示例。

1
2
3
4
5
6
7
8
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as client:
async with aiohttp.request('GET',
'http://python.org/') as resp:
assert resp.status == 200
print(await resp.text())

发现有什么不同没有,官网的fetch函数抽象出去后,把ClientSession的一个实例作为参数了。所以在with代码块中使用ClientSession实例的情况下,这两者是等同的(我认为,因为两者都是用的都是with代码块中创建的实例)。

连接池重用

而其实官网这段代码是在ClientSession的参考处摘抄的,所以官方这样写我认为只是在提醒要注意ClientSession的用法。那么ClientSession有啥得注意的呢

Session 封装了一个连接池(连接器实例),并且默认情况下支持keep-alive。除非在应用程序的生存期内连接到大量未知的不同服务器,否则建议您在应用程序的生存期内使用单个会话以受益于连接池。

不要为每个请求创建Session 。每个应用程序很可能需要一个会话,以完全执行所有请求。

更复杂的情况可能需要在每个站点上进行一次会话,例如,一个会话用于Github,另一个会话用于Facebook API。无论如何,为每个请求建立会话是一个非常糟糕的主意。

会话内部包含一个连接池。连接重用和保持活动状态(默认情况下均处于启用状态)可能会提高整体性能。

以上这几段话由官网翻译而来。这几段话都是说,如无必要,只用一个ClientSession实例即可。

但我在很多资料看到的是像如下这样用的呀

1
2
3
4
5
6
async def fetch(url):
async with aiohttp.ClientSession() as client:
async with aiohttp.request('GET',
url) as resp:
assert resp.status == 200
print(await resp.text())

这不明显每请求一次就实例化一个ClientSession嘛,并没有重用ClientSession啊。那应该咋办呢,然而官网并没有举出重用ClientSession的示例(我也是服了,你这么浓墨重彩说道只需一个session,倒是给个示例啊)。

那只得继续找找资料。然而国内资料不多,只能上githubstackoverflow看看。看了半天也没个定论,主要是两个方法。

下面是我写的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def fetch(client,url):
async with client.get(url) as resp:
assert resp.status == 200
text = await resp.text()
return len(text)

#urls是包含多个url的列表
async def fetch_all(urls):
async with aiohttp.ClientSession() as client:
return await asyncio.gather(*[fetch(client,url) for url in urls])

urls = ['http://python.org/' for i in range(3)]
loop=asyncio.get_event_loop()
results = loop.run_until_complete(fetch_all(urls))
print(results)
print(type(results))

手动创建session,不用with

该方法可以让你获取一个session实例而不仅局限于with代码块中,可以在后续代码中继续使用该session。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def fetch(client,url):
async with client.get(url) as resp:
assert resp.status == 200
text = await resp.text()
return len(text)

async def fetch_all_manual(urls,client):
return await asyncio.gather(*[fetch(client, url) for url in urls])

urls = ['http://python.org/' for i in range(3)]
loop=asyncio.get_event_loop()
client = aiohttp.ClientSession()
results = loop.run_until_complete(fetch_all_manual(urls,client))
#要手动关闭自己创建的ClientSession,并且client.close()是个协程,得用事件循环关闭
loop.run_until_complete(client.close())
#在关闭loop之前要给aiohttp一点时间关闭ClientSession
loop.run_until_complete(asyncio.sleep(3))
loop.close()
print(results)
print(type(results))

此处着重说明以下该方法一些相关事项

手动创建ClientSession要手动关闭自己创建的ClientSession,并且client.close()是个协程,得用事件循环关闭。
在关闭loop之前要给aiohttp一点时间关闭ClientSession

如果无上述步骤会报Unclosed client session的错误,也即ClientSession没有关闭

但就算你遵循了以上两个事项,如此运行程序会报以下warning,虽然不会影响程序正常进行

1
2
DeprecationWarning: The object should be created from async function
client = aiohttp.ClientSession()

这说的是client = aiohttp.ClientSession()这行代码应该在异步函数中执行。如果你无法忍受可以在定义个用异步方法用作创建session

1
2
3
4
async def create_session():
return aiohttp.ClientSession()

session = asyncio.get_event_loop().run_until_complete(create_session())

ClientSession 部分重要参数

下面是ClientSession的所有参数,这里用的比较多的是connector,headers,cookies。headers和cookies写过爬虫的可能都认识了,这里只谈一下connector。

connector是aiohttp客户端API的传输工具。并发量控制,ssl证书验证,都可通过connector设置,然后传入ClientSession

标准connector有两种:

  • TCPConnector用于常规TCP套接字(同时支持HTTP和 HTTPS方案)(绝大部分情况使用这种)。
  • UnixConnector 用于通过UNIX套接字进行连接(主要用于测试)。
    所有连接器类都应继承自BaseConnector

使用可以按以下实例

1
2
3
4
#创建一个TCPConnector
conn=aiohttp.TCPConnector(verify_ssl=False)
#作为参数传入ClientSession
async with aiohttp.ClientSession(connector=conn) as session:

TCPConnector比较重要的参数有

  • verify_ssl(bool)– 布尔值,对HTTPS请求执行SSL证书验证 (默认情况下启用)。当要跳过对具有无效证书的站点的验证时可设置为False。
  • limit(int)– 整型,同时连接的总数。如果为limit为 None则connector没有限制(默认值:100)。
    limit_per_host(int)–限制同时连接到同一端点的总数。如果(host, port, is_ssl)三者相同,则端点是相同的。如果为limit=0,则connector没有限制(默认值:0)。

如果爬虫用上协程,请求速度是非常快的,很可能会对别人服务器造成拒绝服务的攻击,所以平常使用若无需求,最好还是不要设置limit为0。

限制并发量的另一个做法(使用Semaphore)

使用Semaphore直接限制发送请求。此处只写用法,作抛砖引玉之用。也很容易用,在fetch_all_manual函数里加上Semaphore的使用即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def fetch(client,url):
async with client.get(url) as resp:
assert resp.status == 200
text = await resp.text()
return len(text)

async def fetch_all_manual(urls,client):
async with asyncio.Semaphore(5):
return await asyncio.gather(*[fetch(client, url) for url in urls])

urls = ['http://python.org/' for i in range(3)]
loop=asyncio.get_event_loop()
client = aiohttp.ClientSession()
results = loop.run_until_complete(fetch_all_manual(urls,client))
#要手动关闭自己创建的ClientSession,并且client.close()是个协程,得用事件循环关闭
loop.run_until_complete(client.close())
#在关闭loop之前要给aiohttp一点时间关闭ClientSession
loop.run_until_complete(asyncio.sleep(3))
loop.close()
print(results)
print(type(results))

aiofiles

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 异步文件操作
# pip install aiofiles

# 基本用法
import asyncio
import aiofiles


async def wirte_demo():
# 异步方式执行with操作,修改为 async with
async with aiofiles.open("text.txt","w",encoding="utf-8") as fp:
await fp.write("hello world ")
print("数据写入成功")

async def read_demo():
async with aiofiles.open("text.txt","r",encoding="utf-8") as fp:
content = await fp.read()
print(content)

async def read2_demo():
async with aiofiles.open("text.txt","r",encoding="utf-8") as fp:
# 读取每行
async for line in fp:
print(line)
if __name__ == "__main__":
asyncio.run(wirte_demo())
asyncio.run(read_demo())
asyncio.run(read2_demo())

视频的爬取

程序员会想办法把用户上传好的视频进行转码(不同清晰度)做切片(ts)处理。这样既方便用户进行大跨度的调整进度条(最小延迟)。也能为公司节省大量的流量费。既然要把视频切成非常多个小碎片。那就需要有个文件来记录这些小碎片的路径。该文件⼀般为M3U文件,M3U文件中的内容经过UTF-8的编码后,就是M3U8文件。今天, 我们看到的各大视频网站平台使用的几乎都是M3U8文件。

M3U8文件

我们首先就要找这个文件。

想要抓取一个视频:

  1. 找到m3u8 (各种手段)
  2. 通过m3u8下载到ts文件
  3. 可以通过各种手段(不仅是编程手段,甚至pr) 把ts文件合并为一个mp4文件

一些格式固定的代码

跳过其中的无效信息,去掉行中的换行、空格等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import aiohttp
import aiofiles

async def aio_download(up_url): # up_url顶层域名,可能需要进行url拼接
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open("m3u8.txt", mode="r", encoding='utf-8') as f:
async for line in f:
if line.startswith("#"):
continue
# line就是xxxxx.ts
line = line.strip() # 去掉没用的空格和换行
# 拼接真正的ts路径(如果需要)
ts_url = up_url + line
task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务
tasks.append(task)

await asyncio.wait(tasks) # 等待任务结束

async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f"video/{name}", mode="wb") as f:
await f.write(await resp.content.read()) # 把下载到的内容写入到文件中
print(f"{name}下载完毕")

调用时:

1
2
# 异步协程
asyncio.run(aio_download(url_up))

加密的问题

ts文件可能加密过,需要用m3u8的key.key文件中找密钥,需要抓包确定密钥文件前面的域名,然后下载密钥文件,对ts文件解密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from Crypto.Cipher import AES 
async def dec_ts(name, key): # name是文件名,key是从文件中拿到的密钥
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f"video/{name}", mode="rb") as f1,\
aiofiles.open(f"video/temp_{name}", mode="wb") as f2:

bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f"{name}处理完毕")


async def aio_dec(key):
# 解密
tasks = []
async with aiofiles.open("m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 开始创建异步任务
task = asyncio.create_task(dec_ts(line, key))
tasks.append(task)
await asyncio.wait(tasks)

一般使用from Crypto.Cipher import AES,在m3u8文件中会有标注

合并ts文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def merge_ts():
# mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
s = ''
with open("shdbz.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
lst.append(f"{line}+")

print(lst)
for str in lst:
s += str # 或者使用join,但是我的好像有点问题
s = s[0:-1]
os.system(f"copy/b {s} xxx.mp4")
# 可能不成功,建议使用cmd

Selenium

Selenium 是一种开源工具,用于在 Web 浏览器上执行自动化测试(使用任何 Web 浏览器进行 Web 应用程序测试)。可以对抗一些反爬但是问题是比较慢。

一些基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from selenium.webdriver import Chrome
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
import time

web = Chrome()
web.get("http://lagou.com")

# 找到某个元素. 点击它
el = web.find_element(By.XPATH, '//*[@id="changeCityBox"]/p[1]/a')
el.click() # 点击事件

time.sleep(1) # 让浏览器缓一会儿

# 找到输入框. 输入python => 输入回车
web.find_element(By.XPATH, '//*[@id="search_input"]').send_keys("python", Keys.ENTER)

web.find_element_by_xpath('//*[@id="s_position_list"]/ul/li[1]/div[1]/div[1]/div[1]/a/h3').click()

# 如何进入到进窗口中进行提取
# 注意, 在selenium的眼中. 新窗口默认是不切换过来的.
web.switch_to.window(web.window_handles[-1])

# 在新窗口中提取内容
job_detail = web.find_element_by_xpath('//*[@id="job_detail"]/dd[2]/div').text
print(job_detail)

# 关掉子窗口
web.close()
# 变更selenium的窗口视角. 回到原来的窗口中
web.switch_to.window(web.window_handles[0])

页面中iframe如何处理

1
2
3
4
5
6
7
8
web.get("https://www.91kanju.com/vod-play/541-2-1.html")

# 处理iframe的话. 必须先拿到iframe. 然后切换视角到iframe . 再然后才可以拿数据
iframe = web.find_element(By.XPATH,'//*[@id="player_iframe"]')
web.switch_to.frame(iframe) # 切换到iframe
# web.switch_to.default_content() # 切换回原页面
tx = web.find_element(By.XPATH,'//*[@id="main"]/h3[1]').text
print(tx)

无头浏览器与下拉列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.select import Select
import time
from selenium.webdriver.common.by import By

# 准备好参数配置
opt = Options()
opt.add_argument("--headless")
opt.add_argument("--disbale-gpu")

web = Chrome(options=opt) # 把参数配置设置到浏览器中

web.get("https://www.endata.com.cn/BoxOffice/BO/Year/index.html")

time.sleep(2)
# 定位到下拉列表
sel_el = web.find_element(By.XPATH, '//*[@id="OptionDate"]')
# 对元素进行包装, 包装成下拉菜单
sel = Select(sel_el)
# 让浏览器进行调整选项
for i in range(len(sel.options)): # i就是每一个下拉框选项的索引位置
sel.select_by_index(i) # 按照索引进行切换
time.sleep(2)
table = web.find_element(By.XPATH,'//*[@id="TableList"]/table')
print(table.text) # 打印所有文本信息
print("===================================")

print("运行完毕. ")
web.close()

处理滑块

https://blog.csdn.net/Jeeson_Z/article/details/82047685

以B站为例,但是现在B站好像已经不用滑块了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from urllib.request import urlretrieve
from bs4 import BeautifulSoup
import re
from PIL import Image
from time import sleep


# 初始化
def init():
# 定义为全局变量,方便其他模块使用
global url, browser, username, password, wait
# 登录界面的url
url = 'https://passport.bilibili.com/login'
# 实例化一个chrome浏览器
browser = webdriver.Chrome()
# 用户名
username = '17389025490'
# 密码
password = 'Zhaoweining750'
# 设置等待超时
wait = WebDriverWait(browser, 20)


# 登录
def login():
# 打开登录页面
browser.get(url)
# 获取用户名输入框
user = wait.until(EC.presence_of_element_located((By.ID, 'login-username')))
# 获取密码输入框
passwd = wait.until(EC.presence_of_element_located((By.ID, 'login-passwd')))
# 输入用户名
user.send_keys(username)
# 输入密码
passwd.send_keys(password)


# 获取图片信息
def get_image_info(img):
'''
:param img: (Str)想要获取的图片类型:带缺口、原始
:return: 该图片(Image)、位置信息(List)
'''

# 将网页源码转化为能被解析的lxml格式
soup = BeautifulSoup(browser.page_source, 'lxml')
# 获取验证图片的所有组成片标签
imgs = soup.find_all('div', {'class': 'gt_cut_' + img + '_slice'})
# 用正则提取缺口的小图片的url,并替换后缀
img_url = re.findall('url\(\"(.*)\"\);', imgs[0].get('style'))[0].replace('webp', 'jpg')
# 使用urlretrieve()方法根据url下载缺口图片对象
urlretrieve(url=img_url, filename=img + '.jpg')
# 生成缺口图片对象
image = Image.open(img + '.jpg')
# 获取组成他们的小图片的位置信息
position = get_position(imgs)
# 返回图片对象及其位置信息
return image, position


# 获取小图片位置
def get_position(img):
'''
:param img: (List)存放多个小图片的标签
:return: (List)每个小图片的位置信息
'''

img_position = []
for small_img in img:
position = {}
# 获取每个小图片的横坐标
position['x'] = int(re.findall('background-position: (.*)px (.*)px;', small_img.get('style'))[0][0])
# 获取每个小图片的纵坐标
position['y'] = int(re.findall('background-position: (.*)px (.*)px;', small_img.get('style'))[0][1])
img_position.append(position)
return img_position


# 裁剪图片
def Corp(image, position):
'''
:param image:(Image)被裁剪的图片
:param position: (List)该图片的位置信息
:return: (List)存放裁剪后的每个图片信息
'''

# 第一行图片信息
first_line_img = []
# 第二行图片信息
second_line_img = []
for pos in position:
if pos['y'] == -58:
first_line_img.append(image.crop((abs(pos['x']), 58, abs(pos['x']) + 10, 116)))
if pos['y'] == 0:
second_line_img.append(image.crop((abs(pos['x']), 0, abs(pos['x']) + 10, 58)))
return first_line_img, second_line_img


# 拼接大图
def put_imgs_together(first_line_img, second_line_img, img_name):
'''
:param first_line_img: (List)第一行图片位置信息
:param second_line_img: (List)第二行图片信息
:return: (Image)拼接后的正确顺序的图片
'''

# 新建一个图片,new()第一个参数是颜色模式,第二个是图片尺寸
image = Image.new('RGB', (260, 116))
# 初始化偏移量为0
offset = 0
# 拼接第一行
for img in first_line_img:
# past()方法进行粘贴,第一个参数是被粘对象,第二个是粘贴位置
image.paste(img, (offset, 0))
# 偏移量对应增加移动到下一个图片位置,size[0]表示图片宽度
offset += img.size[0]
# 偏移量重置为0
x_offset = 0
# 拼接第二行
for img in second_line_img:
# past()方法进行粘贴,第一个参数是被粘对象,第二个是粘贴位置
image.paste(img, (x_offset, 58))
# 偏移量对应增加移动到下一个图片位置,size[0]表示图片宽度
x_offset += img.size[0]
# 保存图片
image.save(img_name)
# 返回图片对象
return image


# 判断像素是否相同
def is_pixel_equal(bg_image, fullbg_image, x, y):
"""
:param bg_image: (Image)缺口图片
:param fullbg_image: (Image)完整图片
:param x: (Int)位置x
:param y: (Int)位置y
:return: (Boolean)像素是否相同
"""

# 获取缺口图片的像素点(按照RGB格式)
bg_pixel = bg_image.load()[x, y]
# 获取完整图片的像素点(按照RGB格式)
fullbg_pixel = fullbg_image.load()[x, y]
# 设置一个判定值,像素值之差超过判定值则认为该像素不相同
threshold = 60
# 判断像素的各个颜色之差,abs()用于取绝对值
if (abs(bg_pixel[0] - fullbg_pixel[0] < threshold) and abs(bg_pixel[1] - fullbg_pixel[1] < threshold) and abs(
bg_pixel[2] - fullbg_pixel[2] < threshold)):
# 如果差值在判断值之内,返回是相同像素
return True

else:
# 如果差值在判断值之外,返回不是相同像素
return False


# 计算滑块移动距离
def get_distance(bg_image, fullbg_image):
'''
:param bg_image: (Image)缺口图片
:param fullbg_image: (Image)完整图片
:return: (Int)缺口离滑块的距离
'''

# 滑块的初始位置
distance = 57
# 遍历像素点横坐标
for i in range(distance, fullbg_image.size[0]):
# 遍历像素点纵坐标
for j in range(fullbg_image.size[1]):
# 如果不是相同像素
if not is_pixel_equal(fullbg_image, bg_image, i, j):
# 返回此时横轴坐标就是滑块需要移动的距离
return i


# 构造滑动轨迹
def get_trace(distance):
'''
:param distance: (Int)缺口离滑块的距离
:return: (List)移动轨迹
'''

# 创建存放轨迹信息的列表
trace = []
# 设置加速的距离
faster_distance = distance * (4 / 5)
# 设置初始位置、初始速度、时间间隔
start, v0, t = 0, 0, 0.2
# 当尚未移动到终点时
while start < distance:
# 如果处于加速阶段
if start < faster_distance:
# 设置加速度为2
a = 1.5
# 如果处于减速阶段
else:
# 设置加速度为-3
a = -3
# 移动的距离公式
move = v0 * t + 1 / 2 * a * t * t
# 此刻速度
v = v0 + a * t
# 重置初速度
v0 = v
# 重置起点
start += move
# 将移动的距离加入轨迹列表
trace.append(round(move))
# 返回轨迹信息
return trace


# 模拟拖动
def move_to_gap(trace):
# 得到滑块标签
slider = wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'gt_slider_knob')))
# 使用click_and_hold()方法悬停在滑块上,perform()方法用于执行
ActionChains(browser).click_and_hold(slider).perform()
for x in trace:
# 使用move_by_offset()方法拖动滑块,perform()方法用于执行
ActionChains(browser).move_by_offset(xoffset=x, yoffset=0).perform()
# 模拟人类对准时间
sleep(0.5)
# 释放滑块
ActionChains(browser).release().perform()


# 主程序
def main():
# 初始化
init()
# 登录
login()
# 获取缺口图片及其位置信息
bg, bg_position = get_image_info('bg')
# 获取完整图片及其位置信息
fullbg, fullbg_position = get_image_info('fullbg')
# 将混乱的缺口图片裁剪成小图,获取两行的位置信息
bg_first_line_img, bg_second_line_img = Corp(bg, bg_position)
# 将混乱的完整图片裁剪成小图,获取两行的位置信息
fullbg_first_line_img, fullbg_second_line_img = Corp(fullbg, fullbg_position)
# 根据两行图片信息拼接出缺口图片正确排列的图片
bg_image = put_imgs_together(bg_first_line_img, bg_second_line_img, 'bg.jpg')
# 根据两行图片信息拼接出完整图片正确排列的图片
fullbg_image = put_imgs_together(fullbg_first_line_img, fullbg_second_line_img, 'fullbg.jpg')
# 计算滑块移动距离
distance = get_distance(bg_image, fullbg_image)
# 计算移动轨迹
trace = get_trace(distance - 10)
# 移动滑块
move_to_gap(trace)
sleep(5)


# 程序入口
if __name__ == '__main__':
main()

处理滑块的一些问题

在处理12306的滑块时总是失败,使用selenium套件操作滑块后会出现“哎呀,出错了,点击刷新再来一次”这样的提示。

知己知彼,参见https://blog.csdn.net/weixin_44685869/article/details/105602629

网页只要设置了检查webdriverJavascript方法,就很容易发现爬虫。使用的方法就是Navigator对象的webdriver属性,用这个属性来判断客户端是否通过WebDriver驱动浏览器。如果监测到客户端的webdriver属性存在,则无法继续操作获取数据。selenium就存在WebDriver属性。

监测结果有3种,分别是truefalseundefind

最广为人知的识别是否是selenium的方法就是 window.navigator.webdriver,当浏览器被打开后,js就会给当前窗口一个window属性,里面存放着用户的各种"信息"。

使用渲染工具有 webdriver 属性时,navigation.webdriver的返回值时true。反之则会返回false或者undefind

了解了WebDriver识别的原理和返回值后,我们就能相处应对的办法。既然 Web Driver 的识别依赖navigation.webdriver的返回值,那么我们在触发Javascript办法前将navigation.webdriver的返回值改为false或者undefind,问题就解决了。

不过这种修改该属性值的办法只在当前页面有效,当浏览器打开新标签或新窗口时需要重新执行改变navigator.webdriver值的JavaScript代码。一般来说足够了。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from selenium.webdriver import Chrome
import time

brower = Chrome(executable_path=r'D:\python\chromedriver_win32\chromedriver.exe')
url = 'http://www.porters.vip/features/webdriver.html'
brower.get(url)

#关键=================================
script = 'Object.defineProperty(navigator,"webdriver",{get:() => false,});'
#运行Javascript
brower.execute_script(script)
#======================================

#定位按钮并点击
brower.find_element_by_css_selector('.btn.btn-primary.btn-lg').click()

#定位到文章内容元素
elements = brower.find_element_by_css_selector('#content')
time.sleep(1)
print(elements.text)
brower.close()

或者在创建浏览器对象时:

1
2
3
4
5
# 2.chrome的版本大于等于88
option = Options()
option.add_argument('--disable-blink-features=AutomationControlled')

web = Chrome(options=option)

PS. 使用opencv处理缺口类滑块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import cv2

GAUSSIAN_BLUR_KERNEL_SIZE = (5, 5)
GAUSSIAN_BLUR_SIGMA_X = 0
CANNY_THRESHOLD1 = 200
CANNY_THRESHOLD2 = 450


def get_gaussian_blur_image(image):
return cv2.GaussianBlur(image, GAUSSIAN_BLUR_KERNEL_SIZE, GAUSSIAN_BLUR_SIGMA_X)


def get_canny_image(image):
return cv2.Canny(image, CANNY_THRESHOLD1, CANNY_THRESHOLD2)


def get_contours(image):
contours, _ = cv2.findContours(image, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE)
return contours


def get_contour_area_threshold(image_width, image_height):
contour_area_min = (image_width * 0.15) * (image_height * 0.25) * 0.8
contour_area_max = (image_width * 0.15) * (image_height * 0.25) * 1.2
return contour_area_min, contour_area_max


def get_arc_length_threshold(image_width, image_height):
arc_length_min = ((image_width * 0.15) + (image_height * 0.25)) * 2 * 0.8
arc_length_max = ((image_width * 0.15) + (image_height * 0.25)) * 2 * 1.2
return arc_length_min, arc_length_max


def get_offset_threshold(image_width):
offset_min = 0.2 * image_width
offset_max = 0.85 * image_width
return offset_min, offset_max


def main():
image_raw = cv2.imread('captcha.png')
image_height, image_width, _ = image_raw.shape
image_gaussian_blur = get_gaussian_blur_image(image_raw)
image_canny = get_canny_image(image_gaussian_blur)
contours = get_contours(image_canny)
cv2.imwrite('image_canny.png', image_canny)
cv2.imwrite('image_gaussian_blur.png', image_gaussian_blur)
contour_area_min, contour_area_max = get_contour_area_threshold(image_width, image_height)
arc_length_min, arc_length_max = get_arc_length_threshold(image_width, image_height)
offset_min, offset_max = get_offset_threshold(image_width)
offset = None
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if contour_area_min < cv2.contourArea(contour) < contour_area_max and \
arc_length_min < cv2.arcLength(contour, True) < arc_length_max and \
offset_min < x < offset_max:
cv2.rectangle(image_raw, (x, y), (x + w, y + h), (0, 0, 255), 2)
offset = x
cv2.imwrite('image_label.png', image_raw)
print('offset', offset)


if __name__ == '__main__':
main()