还是希望能比较系统地回顾一下爬虫的一些知识,不过有一些我也记不清了。
可能对小白不是很友好。
如果看视频可以参考这个 。
Python爬虫从入门到小黑屋 概论 爬虫的矛与盾 robots.txt
协议: 君子协议。规定了网站中哪些数据可以被爬取哪些数据不可以被爬取。
Request
与第一个爬虫1 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple requests
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requestsif __name__ == '__main__' : url = "https://fanyi.baidu.com/sug" s = input ("请输入你要翻译的英文单词" ) dat = { "kw" : s } resp = requests.post(url, data=dat) print (resp.json()) resp.close()
数据解析 有时我们不需要爬取整个界面的内容,我们只希望提取出一小部分从而提高效率,有不同的解析方式可以使用,混合使用也无妨。
正则表达式 Python
中使用re
模块进行正则匹配,re
中封装了一些函数,不过还是喜欢直接写正则表达式。
1 2 3 使用函数: findall: 匹配字符串中所有的符合正则的内容,返回list finditer: 匹配字符串中所有的内容[返回的是迭代器], 从迭代器中拿到内容需要.group()
有些正则含有特殊符号比如\n,\d
等等,PyCharm
可能有Warning
,前面加个r
就好了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import reobj = re.compile (r"\d+" ) ret = obj.finditer("我的电话号是:10086, 我女朋友的电话是:10010" ) for it in ret: print (it.group()) ret = obj.findall("呵呵哒, 我就不信你不换我1000000000" ) print (ret)s = """ <div class='jay'><span id='1'>郭麒麟</span></div> <div class='jj'><span id='2'>宋铁</span></div> <div class='jolin'><span id='3'>大聪明</span></div> <div class='sylar'><span id='4'>范思哲</span></div> <div class='tory'><span id='5'>胡说八道</span></div> """ obj = re.compile (r"<div class='.*?'><span id='(?P<id>\d+)'>(?P<wahaha>.*?)</span></div>" , re.S) result = obj.finditer(s) for it in result: print (it.group("wahaha" )) print (it.group("id" ))
正则样例:豆瓣top250 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import requestsimport reimport csvif __name__ == '__main__' : url = "https://movie.douban.com/top250" headers = { "user-agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36" } resp = requests.get(url, headers=headers) page_content = resp.text obj = re.compile ('<li>.*?<div class="item">.*?<span class="title">(?P<name>.*?)' '</span>.*?<p class="">.*?<br>(?P<year>.*?) .*?<span ' 'class="rating_num" property="v:average">(?P<score>.*?)</span>.*?' '<span>(?P<num>.*?)人评价</span>' , re.S) result = obj.finditer(page_content) f = open ("data.csv" , mode="w" ) csvwriter = csv.writer(f) for it in result: dic = it.groupdict() dic['year' ] = dic['year' ].strip() csvwriter.writerow(dic.values()) f.close() resp.close() print ("over!" )
其中.*?
过滤标签后可能的换行或者空格,或者一些不必要的标签内容。
bs4
bs4
的原理就是解析检索html
标签,对于文件路径直接写在网站源码里的情况十分方便。
通常使用的方法是find()
和find_all()
以及get()
,故名思意,前两个是标签查找,会查找到相应的标签,并且可以通过标签内属性值进行过滤,后一个则是获取标签中属性的内容。具体用法还是看样例。
样例 这个样例是爬图片,一个问题是图片链接在二级子页面中,所以先要获取子页面源码
先找到这个div
下的所有表项。
然后在其中找到子页面的路径
到子页面中找到图片链接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import requestsfrom bs4 import BeautifulSoupimport timeurl = "https://www.umei.cc/bizhitupian/weimeibizhi/" resp = requests.get(url) resp.encoding = 'utf-8' main_page = BeautifulSoup(resp.text, features="html.parser" ) resp.close() alist = main_page.find('div' , class_="swiper-wrapper after" ).find_all("a" ) print (alist)for a in alist: href = a.get('href' ) print (href) child_page_resp = requests.get('https://www.umei.cc' + href) child_page_resp.encoding = 'utf-8' child_page_text = child_page_resp.text child_page = BeautifulSoup(child_page_text, "html.parser" ) child_page_resp.close() src = child_page.find('section' , class_="img-content" ).find('img' ).get('src' ) img_resp = requests.get(src) img_name = src.split("/" )[-1 ] with open ("img/" + img_name, mode="wb" ) as f: f.write(img_resp.content) print ("over!!!" , img_name) print ("all over!!!" )
xpath
常用来直接解析网页源码,对于html
标签,我们可以将其转化成树的形式,就可以通过树的父子关系进行提取。
div[2]
代表第n
个div
标签,div
表示该层标签只有唯一一个,最后使用text()
提取标签内数据。
获取xpath
的方法:浏览器控制台 -> Elements
-> 找到标签所在块 -> 右键选Copy
-> 选 Copy full Xpath
样例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import requestsfrom lxml import etreeurl = "https://www.runoob.com/python/att-string-strip.html" resp = requests.get(url,proxies={"https" :"127.0.0.1:7890" }) html = etree.HTML(resp.text) divs = html.xpath("/html/body/div[4]/div/div[2]/div/div[3]/div/h1/text()" ) h1 = html.xpath("/html/body/div[4]/div/div[2]/div/div[3]/div/h1" ) res = h1.xpath("./text()" ) print (divs)
模拟登录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import requestssession = requests.session() data = { "loginName" : "你的用户名" , "password" : "密码" } url = "https://passport.17k.com/ck/user/login" resp = session.post(url, data=data) print (resp.text)print (resp.cookies)resp = session.get('https://user.17k.com/ck/author/shelf?page=1&appKey=2406394919' ) print (resp.json())resp.close()
防盗链 有些资源(视频)使用防盗链标记打开视频的界面,在请求时没有放到链就会被认为是非法请求。
同时有些视频的video
链接并不显式地写在页面源代码里,此处以梨视频爬取为例。
首先检查源代码我们只能找到视频封面,但在播放时审查界面元素我们是可以找到视频播放地址的,说明地址是动态生成的。
比如我所看的这个视频链接是:
https://video.pearvideo.com/mp4/adshort/20220607/cont-1764629-15892192_adpkg-ad_hd.mp4
刷新界面,使用控制台进行跟踪,选中Fetch/XHR
我们会找到一个请求,进行预览就会发现一个貌似是视频链接的链接:
https://video.pearvideo.com/mp4/adshort/20220607/1657102159673-15892192_adpkg-ad_hd.mp4
但一比对就会发现好像不太对,我们找到的这个还不太一样,观察这个请求就会发现其实应该把下面链接中不一样的那部分换上cont-
加上请求中的systemTime
参数值,然后我们就可以下载视频。
但如果请求头中没有加Referer
参数,你会发现返回结果是 “视频已下架”。加上即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import requestsurl = "https://www.pearvideo.com/video_1721605" contId = url.split("_" )[1 ] videoStatusUrl = f"https://www.pearvideo.com/videoStatus.jsp?contId={contId} " headers = { "User-Agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36" , "Referer" : url } resp = requests.get(videoStatusUrl, headers=headers) dic = resp.json() srcUrl = dic['videoInfo' ]['videos' ]['srcUrl' ] systemTime = dic['systemTime' ] srcUrl = srcUrl.replace(systemTime, f"cont-{contId} " ) with open ("a.mp4" , mode="wb" ) as f: f.write(requests.get(srcUrl).content)
加密数据的解析-以抓取网易云热评为例 这里是我自己实践的过程,具体详细步骤可以参看B站视频
同样我们可以找到请求所有评论的url(当然这就意味着不是写在源代码里的)
这样我们可以确定请求的链接
题外话:请求头中也包括了防盗链如果请求失败(没有返回)可以考虑防盗链,或者考虑加上里面的user-agent
字段。
我们来查看请求的参数,但发现是加密过的,而且如果不加,请求是没有返回值的,我们的难点在于确定加密过程与原来的参数。我们查看请求的调用栈
调用栈自下向上是调用的从先到后的顺序,我们查看最后调用的代码,也就是第一行。
注意进入后使用左下角的一对大括号对代码进行格式化。
在js
代码中打断点,刷新界面,点击resume直到到达我们的目标url,我们会发现断点处请求的数据是加密过的(废话 )
我们沿着调用栈 (Call Stack
) 一层一层向下找。我们会找到一层调用栈,数据是明文的,那么加密的过程就在下一层调用栈中。
我们最终会找到加密代码与赋值的代码
直接ctrl+f
搜索就能找到加密用的函数,我们就可以模拟这个加密过程进行加密
注意:常用的序列化与反序列化
json.dumps()
----将Python
的字典数据转换成json
字符,数据的最外面都添加一层""变为字符串,这也是数据的序列化步骤
json.loads()
----将json
字符串数据转换为字典或列表(去掉外面一层""),这个也是反序列化的步骤。
多线程 前言 作者:DarrenChan陈驰 链接:https://www.zhihu.com/question/23474039/answer/269526476
在介绍Python中的线程之前,先明确一个问题,Python中的多线程是假的多线程 ! 为什么这么说,我们先明确一个概念,全局解释器锁(GIL)。
Python代码的执行由Python虚拟机(解释器)来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。在多线程环境中,Python虚拟机按照以下方式执行。
1.设置GIL。
2.切换到一个线程去执行。
3.运行。
4.把线程设置为睡眠状态。
5.解锁GIL。
6.再次重复以上步骤。
对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。
我们都知道,比方我有一个4核的CPU,那么这样一来,在单位时间内每个核只能跑一个线程,然后时间片轮转切换。但是Python不一样,它不管你有几个核,单位时间多个核只能跑一个线程,然后时间片轮转。看起来很不可思议?但是这就是GIL搞的鬼。任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码 ,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
我们不妨做个试验:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Poolfrom threading import Threadfrom multiprocessing import Processdef loop (): while True : pass if __name__ == '__main__' : for i in range (3 ): t = Thread(target=loop) t.start() while True : pass
我的电脑是4核,所以我开了4个线程,看一下CPU资源占有率:
我们发现CPU利用率并没有占满,大致相当于单核水平。
而如果我们变成进程呢?
我们改一下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Poolfrom threading import Threadfrom multiprocessing import Processdef loop (): while True : pass if __name__ == '__main__' : for i in range (3 ): t = Process(target=loop) t.start() while True : pass
结果直接飙到了100%,说明进程是可以利用多核的!
为了验证这是Python中的GIL搞得鬼,我试着用Java写相同的代码,开启线程,我们观察一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.darrenchan.thread;public class TestThread { public static void main (String[] args) { for (int i = 0 ; i < 3 ; i++) { new Thread (new Runnable () { @Override public void run () { while (true ) {} } }).start(); } while (true ){} } }
由此可见,Java中的多线程是可以利用多核的,这是真正的多线程!而Python中的多线程只能利用单核,这是假的多线程!
难道就如此?我们没有办法在Python中利用多核?当然可以!刚才的多进程 算是一种解决方案,还有一种就是调用C语言的链接库。对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。我们可以把一些 计算密集型任务用C语言编写,然后把.so链接库内容加载到Python中,因为执行C代码,GIL锁会释放,这样一来,就可以做到每个核都跑一个线程的目的!
可能有的小伙伴不太理解什么是计算密集型任务,什么是I/O密集型任务?
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
综上,Python多线程相当于单核多线程,多线程有两个好处:CPU并行,IO并行,单核多线程相当于自断一臂。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
多线程的两种形式 1 2 3 4 5 6 7 8 9 10 11 def func (name ): for i in range (1000 ): print (name, i) if __name__ == '__main__' : t1 = Thread(target=func, args=("t1" ,)) t1.start() t2 = Thread(target=func, args=("t2" ,)) t2.start()
上面其实给出了多线程常用的一种形式,也可以通过面向对象的方式:
1 2 3 4 5 6 7 8 9 10 11 from threading import Threadclass MyThread (Thread ): def run (self ): print ("子线程" ) if __name__ == '__main__' : t = MyThread() t.start()
多进程 1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Processdef func (): for i in range (1000 ): print ("子进程" , i) if __name__ == '__main__' : p = Process(target=func) p.start() for i in range (1000 ): print ("主进程" , i)
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Processclass MyProcess (Process ): def run (self ): for i in range (1000 ): print ("子进程" , i) if __name__ == '__main__' : t = MyProcess() t.start()
线程池与进程池 通过这种方式减少垃圾对象回收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutordef fn (name ): for i in range (1000 ): print (name, i) if __name__ == '__main__' : with ThreadPoolExecutor(50 ) as t: for i in range (100 ): t.submit(fn, name=f"线程{i} " ) print ("123" )
协程 转自 https://zhuanlan.zhihu.com/p/68043798,有改动
由于GIL
的存在,导致Python
多线程性能甚至比单线程更糟。
GIL
: 全局解释器锁(英语:Global Interpreter Lock
,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。
于是出现了协程(Coroutine
)这么个东西。
协程: 协程,又称微线程,纤程,英文名Coroutine
。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行.
协程由于由程序主动控制切换,没有线程切换的开销,所以执行效率极高。对于IO密集型任务非常适用,如果是cpu密集型,推荐多进程+协程的方式。
进程/线程:操作系统提供的一种并发处理任务的能力。
协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内的任务调度技巧。
多进程和多线程体现的是操作系统的能力,而协程体现的是程序员的流程控制能力。
在Python3.4
之前,官方没有对协程的支持,存在一些三方库的实现,比如gevent
和Tornado
。3.4之后就内置了asyncio
标准库,官方真正实现了协程这一特性。
而Python
对协程的支持,是通过Generator
实现的,协程是遵循某些规则的生成器。因此,我们在了解协程之前,我们先要学习生成器。
生成器(Generator
) 我们这里主要讨论yield
和yield from
这两个表达式,这两个表达式和协程的实现息息相关。
Python2.5
中引入yield
表达式,参见PEP342 Python3.3
中增加yield from
语法,参见PEP380 ,方法中包含yield
表达式后,Python
会将其视作generator
对象,不再是普通的方法。
yield
表达式的使用yield
的语法规则是:在yield
这里暂停函数的执行,并返回yield
后面表达式的值(默认为None),直到被next()
方法再次调用时,从上次暂停的yield代码处继续往下执行 。当没有可以继续next()
的时候,抛出异常,该异常可被for
循环处理。
该表达式的具体使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def test (): print ("generator start" ) n = 1 while True : yield_expression_value = yield n print (f"yield_expression_value = {yield_expression_value} " ) n += 1 generator = test() print (type (generator))print ("\n---------------\n" )next_result = generator.__next__() print (f"next_result = {next_result} " )print ("\n---------------\n" )send_result = generator.send(666 ) print (f"send_result = {send_result} " )
执行结果:
1 2 3 4 5 6 7 8 9 10 11 <class 'generator' > --------------- generator start next_result = 1 --------------- yield_expression_value = 666 send_result = 2
方法说明:
__next__()
方法: 作用是启动或者恢复generator
的执行,相当于send(None)
send(value)
方法:作用是发送值给yield
表达式。启动generator
则是调用send(None)
执行结果的说明:
①创建generator
对象:包含yield表达式的函数将不再是一个函数,调用之后将会返回generator
对象 ②启动generator
:使用生成器之前需要先调用__next__
或者send(None)
,否则将报错。启动generator
后,代码将执行到yield
出现的位置,也就是执行到yield n
,然后将n
传递到generator.__next__()
这行的返回值。(注意,生成器执行到yield n
后将暂停在这里,直到下一次生成器被启动) ③发送值给yield
表达式:调用send
方法可以发送值给yield
表达式,同时恢复生成器的执行。生成器从上次中断的位置继续向下执行,然后遇到下一个yield
,生成器再次暂停,切换到主函数打印出send_result
。 理解这个demo的关键是:生成器启动或恢复执行一次,将会在yield
处暂停,返回yield
后面表达式的值。上面的第②步仅仅执行到了yield n
,并没有执行到赋值语句,到了第③步,生成器恢复执行才给yield_expression_value
赋值。
生产者和消费者模型 上面的例子中,代码中断–>切换执行,体现出了协程的部分特点。
我们再举一个生产者、消费者的例子,这个例子来自廖雪峰的Python教程 :
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。 现在改用协程,生产者生产消息后,直接通过yield
跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def consumer (): print ("[CONSUMER] start" ) r = 'start' while True : n = yield r if not n: print ("n is empty" ) continue print ("[CONSUMER] Consumer is consuming %s" % n) r = "200 ok" def producer (c ): start_value = c.send(None ) print (start_value) n = 0 while n < 3 : n += 1 print ("[PRODUCER] Producer is producing %d" % n) r = c.send(n) print ('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() producer(c)
执行结果:
1 2 3 4 5 6 7 8 9 10 11 [CONSUMER] start start [PRODUCER] producer is producing 1 [CONSUMER] consumer is consuming 1 [PRODUCER] Consumer return : 200 ok [PRODUCER] producer is producing 2 [CONSUMER] consumer is consuming 2 [PRODUCER] Consumer return : 200 ok [PRODUCER] producer is producing 3 [CONSUMER] consumer is consuming 3 [PRODUCER] Consumer return : 200 ok
注意到consumer
函数是一个generator
,把一个consumer
传入produce
后:
首先调用c.send(None)
启动生成器; 然后,一旦生产了东西,通过c.send(n)
切换到consumer
执行; consumer
通过yield
拿到消息,处理,又通过yield
把结果传回;produce
拿到consumer
处理的结果,继续生产下一条消息;produce
决定不生产了,通过c.close()
关闭consumer
,整个过程结束。整个流程无锁,由一个线程执行,produce
和consumer
协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
yield from
表达式Python3.3
版本新增yield from
语法,新语法用于将一个生成器部分操作委托给另一个生成器 。此外,允许子生成器(即yield from
后的“参数”)返回一个值,该值可供委派生成器(即包含yield from
的生成器)使用。并且在委派生成器中,可对子生成器进行优化。
我们先来看最简单的应用,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def test (n ): i = 0 while i < n: yield i i += 1 def test_yield_from (n ): print ("test_yield_from start" ) yield from test(n) print ("test_yield_from end" ) for i in test_yield_from(3 ): print (i)
输出:
1 2 3 4 5 test_yield_from start 0 1 2 test_yield_from end
这里我们仅仅给这个生成器添加了一些打印,如果是正式的代码中,你可以添加正常的执行逻辑。
如果上面的test_yield_from
函数中有两个yield from
语句,将串行执行。比如将上面的test_yield_from
函数改写成这样:
1 2 3 4 5 6 def test_yield_from (n ): print ("test_yield_from start" ) yield from test(n) print ("test_yield_from doing" ) yield from test(n) print ("test_yield_from end" )
将输出:
1 2 3 4 5 6 7 8 9 test_yield_from start 0 1 2 test_yield_from doing 0 1 2 test_yield_from end
在这里,yield from
起到的作用相当于下面写法的简写形式
1 2 for item in test(n): yield item
看起来这个yield from
也没做什么大不了的事,其实它还帮我们处理了异常之类的。具体可以看stackoverflow
上的这个问题:In practice, what are the main uses for the new “yield from” syntax in Python 3.3?
协程(Coroutine) Python3.4
开始,新增了asyncio
相关的API
,语法使用[@asyncio.coroutine](mailto:@asyncio.coroutine
)和yield from
实现协程Python3.5
中引入async
/await
语法,参见PEP492 我们先来看Python3.4
的实现。
[@asyncio.coroutine](mailto:@asyncio.coroutine)
Python3.4
中,使用[@asyncio.coroutine](mailto:@asyncio.coroutine)
装饰的函数称为协程。不过没有从语法层面进行严格约束。
理解Python装饰器(Decorator) Python装饰器看起来类似Java中的注解,然鹅和注解并不相同,不过同样能够实现面向切面编程。
想要理解Python中的装饰器,不得不先理解闭包(closure)这一概念。
闭包 看看维基百科中的解释:
在计算机科学中,闭包(英语:Closure),又称词法闭包(Lexical Closure)或函数闭包(function closures),是引用了自由变量的函数。这个被引用的自由变量将和这个函数一同存在,即使已经离开了创造它的环境也不例外。
官方的解释总是不说人话,but–talk is cheap,show me the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def print_msg (): msg = "I'm closure" def printer (): print (msg) return printer closure = print_msg() closure()
msg
是一个局部变量,在print_msg
函数执行之后应该就不会存在了。但是嵌套函数引用了这个变量,将这个局部变量封闭在了嵌套函数中,这样就形成了一个闭包。
结合这个例子再看维基百科的解释,就清晰明了多了。闭包就是引用了自有变量的函数,这个函数保存了执行的上下文,可以脱离原本的作用域独立存在 。
装饰器 一个普通的装饰器一般是这样:
1 2 3 4 5 6 7 8 9 10 11 import functoolsdef log(func ): @functools.wraps(func ) def wrapper(*args, **kwargs): print ('call %s():' % func .__name__) print ('args = {}' .format(*args)) return func (*args, **kwargs) return wrapper
这样就定义了一个打印出方法名及其参数的装饰器。
调用:
1 2 3 4 5 @log def test (p ): print (test.__name__ + " param: " + p) test("I'm a param" )
输出:
1 2 3 call test (): args = I'm a param test param: I' m a param
装饰器在使用时,用了@
语法,让人有些困扰。其实,装饰器只是个方法,与下面的调用方式没有区别:
1 2 3 4 5 def test (p ): print (test.__name__ + " param: " + p) wrapper = log(test) wrapper("I'm a param" )
@
语法只是将函数传入装饰器函数,并无神奇之处。
值得注意的是@functools.wraps(func)
,这是python提供的装饰器。它能把原函数的元信息拷贝到装饰器里面的 func 函数中。函数的元信息包括docstring、name 、参数列表等等。可以尝试去除@functools.wraps(func)
,你会发现test.__name__
的输出变成了wrapper。
带参数的装饰器 装饰器允许传入参数,一个携带了参数的装饰器将有三层函数,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import functoolsdef log_with_param (text ): def decorator (func ): @functools.wraps(func ) def wrapper (*args, **kwargs ): print ('call %s():' % func.__name__) print ('args = {}' .format (*args)) print ('log_param = {}' .format (text)) return func(*args, **kwargs) return wrapper return decorator @log_with_param("param" ) def test_with_param (p ): print (test_with_param.__name__)
看到这个代码是不是又有些疑问,内层的decorator函数的参数func是怎么传进去的?和上面一般的装饰器不大一样啊。
其实道理是一样的,将其@
语法去除,恢复函数调用的形式一看就明白了:
1 2 3 4 5 6 decorator = log_with_param("param" ) wrapper = decorator(test_with_param) wrapper("I'm a param" )
输出结果与正常使用装饰器相同:
1 2 3 4 call test_with_param (): args = I'm a param log_param = param test_with_param
至此,装饰器这个有点费解的特性也没什么神秘了。
装饰器这一语法体现了Python中函数是第一公民,函数是对象、是变量,可以作为参数、可以是返回值,非常的灵活与强大。
作者:聪明叉 链接:https://www.jianshu.com/p/ee82b941772a 来源:简书
对于Python原生支持的协程来说,Python对协程和生成器做了一些区分,便于消除这两个不同但相关的概念的歧义:
标记了[@asyncio.coroutine](mailto:@asyncio.coroutine)
装饰器的函数称为协程函数,iscoroutinefunction()
方法返回True 调用协程函数返回的对象称为协程对象,iscoroutine()
函数返回True 举个栗子,我们给上面yield from
的demo中添加[@asyncio.coroutine](mailto:@asyncio.coroutine)
:
1 2 3 4 5 6 7 8 9 10 11 12 import asyncio... @asyncio.coroutine def test_yield_from (n ): ... print (asyncio.iscoroutinefunction(test_yield_from))print (asyncio.iscoroutine(test_yield_from(3 )))
毫无疑问输出结果是True。
可以看下[@asyncio.coroutine](mailto:@asyncio.coroutine)
的源码中查看其做了什么,我将其源码简化下,大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import functoolsimport typesimport inspectdef coroutine (func ): if inspect.isgeneratorfunction(func): coro = func else : @functools.wraps(func ) def coro (*args, **kw ): res = func(*args, **kw) res = yield from res return res wrapper = types.coroutine(coro) wrapper._is_coroutine = True return wrapper
将这个装饰器标记在一个生成器上,就会将其转换成coroutine
。
然后,我们来实际使用下[@asyncio.coroutine](mailto:@asyncio.coroutine)
和yield from
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncio@asyncio.coroutine def compute (x, y ): print ("Compute %s + %s ..." % (x, y)) yield from asyncio.sleep(1.0 ) return x + y @asyncio.coroutine def print_sum (x, y ): result = yield from compute(x, y) print ("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() print ("start" )loop.run_until_complete(print_sum(1 , 2 )) print ("end" )loop.close()
执行结果:
1 2 3 4 start Compute 1 + 2 ... 1 + 2 = 3 end
print_sum
这个协程中调用了子协程compute
,它将等待compute
执行结束才返回结果。
这个demo点调用流程如下图:
EventLoop
将会把print_sum
封装成Task对象
流程图展示了这个demo的控制流程,不过没有展示其全部细节。比如其中“暂停”的1s,实际上创建了一个future对象, 然后通过BaseEventLoop.call_later()
在1s后唤醒这个任务。
值得注意的是,[@asyncio.coroutine](mailto:@asyncio.coroutine)
将在Python3.10版本中移除。
async
/await
Python3.5开始引入async
/await
语法(PEP 492 ),用来简化协程的使用并且便于理解。
async
/await
实际上只是[@asyncio.coroutine](mailto:@asyncio.coroutine)
和yield from
的语法糖:
把[@asyncio.coroutine](mailto:@asyncio.coroutine)
替换为async
把yield from
替换为await
即可。
比如上面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def compute (x, y ): print ("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0 ) return x + y async def print_sum (x, y ): result = await compute(x, y) print ("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() print ("start" )loop.run_until_complete(print_sum(1 , 2 )) print ("end" )loop.close()
我们再来看一个asyncio
中Future
的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asynciofuture = asyncio.Future() async def coro1 (): print ("wait 1 second" ) await asyncio.sleep(1 ) print ("set_result" ) future.set_result('data' ) async def coro2 (): result = await future print (result) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro1() coro2() ])) loop.close()
输出结果:
1 2 3 4 wait 1 second (大约等待1秒) set_result data
这里await
后面跟随的future
对象,协程中yield from
或者await
后面可以调用future
对象,其作用是:暂停协程,直到future
执行结束或者返回result
或抛出异常。
而在我们的例子中,await future
必须要等待future.set_result('data')
后才能够结束。将coro2()
作为第二个协程可能体现得不够明显,可以将协程的调用改成这样:
1 2 3 4 5 6 7 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro2(), coro1() ])) loop.close()
输出的结果仍旧与上面相同。
其实,async
这个关键字的用法不止能用在函数上,还有async with
异步上下文管理器,async for
异步迭代器. 对这些感兴趣且觉得有用的可以网上找找资料,这里限于篇幅就不过多展开了。
使用协程 协程有很多种写法,我们以一种最流行的写法为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import asyncioimport timeasync def func1 (): print ("你好啊, 我叫潘金莲" ) await asyncio.sleep(3 ) print ("你好啊, 我叫潘金莲" ) async def func2 (): print ("你好啊, 我叫王建国" ) await asyncio.sleep(2 ) print ("你好啊, 我叫王建国" ) async def func3 (): print ("你好啊, 我叫李雪琴" ) await asyncio.sleep(4 ) print ("你好啊, 我叫李雪琴" ) async def main (): tasks = [ asyncio.create_task(func1()), asyncio.create_task(func2()), asyncio.create_task(func3()) ] await asyncio.wait(tasks) if __name__ == '__main__' : t1 = time.time() asyncio.run(main()) t2 = time.time() print (t2 - t1)
以爬取网页代码为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport timeasync def download (url ): print ("准备开始下载" ) resp = requests.get(url) print (resp.text) resp.close() await asyncio.sleep(2 ) print ("下载完成" ) async def main (): urls = [ "http://www.baidu.com" , "http://www.bilibili.com" , "http://www.163.com" ] tasks = [] for url in urls: d = asyncio.create_task(download(url)) tasks.append(d) await asyncio.wait(tasks) if __name__ == '__main__' : asyncio.run(main())
异步网络请求 aiohttp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport aiohttpurls = [ "http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg" , "http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg" , "http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg" ] async def aiodownload (url ): name = url.rsplit("/" , 1 )[1 ] async with aiohttp.ClientSession() as session: async with session.get(url) as resp: with open (name, mode="wb" ) as f: f.write(await resp.content.read()) print (name, "搞定" ) async def main (): tasks = [] for url in urls: tasks.append(aiodownload(url)) await asyncio.wait(tasks) if __name__ == '__main__' : asyncio.run(main())
可以参考:https://www.cnblogs.com/ssyfj/p/9222342.html
注意事项 转自https://blog.csdn.net/lymmurrain/article/details/109037460
简单请求 如果只发出简单的请求(如只有一次请求,无需cookie,SSL,等),可用如下方法。
但其实吧很少用,因为一般爬虫中用协程都是要爬取大量页面,可能会使得aiohttp
报Unclosed client session
的错误。这种情况官方是建议用ClientSession
(连接池,见下文)的,性能也有一定的提高。
1 2 3 4 5 6 7 8 9 10 import aiohttpasync def fetch (): async with aiohttp.request('GET' , 'http://python.org/' ) as resp: assert resp.status == 200 print (await resp.text()) loop = asyncio.get_event_loop() loop.run_until_complete(fetch())
使用连接池请求
一般情况下使用如下示例,由官网摘抄。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import aiohttpimport asyncioasync def fetch (client,url ): async with client.get(url) as resp: assert resp.status == 200 return await resp.text() async def main (): async with aiohttp.ClientSession() as client: html = await fetch(client,url) print (html) loop = asyncio.get_event_loop() loop.run_until_complete(main())
是不是感觉有点绕呢,其实平时使用是不必这样将fetch函数抽象出去,可以简单写成下面的简洁示例。
1 2 3 4 5 6 7 8 import aiohttpimport asyncioasync def main (): async with aiohttp.ClientSession() as client: async with aiohttp.request('GET' , 'http://python.org/' ) as resp: assert resp.status == 200 print (await resp.text())
发现有什么不同没有,官网的fetch
函数抽象出去后,把ClientSession
的一个实例作为参数了。所以在with
代码块中使用ClientSession
实例的情况下,这两者是等同的(我认为,因为两者都是用的都是with
代码块中创建的实例)。
连接池重用 而其实官网这段代码是在ClientSession
的参考处摘抄的,所以官方这样写我认为只是在提醒要注意ClientSession
的用法。那么ClientSession
有啥得注意的呢
Session
封装了一个连接池(连接器实例),并且默认情况下支持keep-alive
。除非在应用程序的生存期内连接到大量未知的不同服务器,否则建议您在应用程序的生存期内使用单个会话以受益于连接池。
不要为每个请求创建Session
。每个应用程序很可能需要一个会话,以完全执行所有请求。
更复杂的情况可能需要在每个站点上进行一次会话,例如,一个会话用于Github
,另一个会话用于Facebook API
。无论如何,为每个请求建立会话是一个非常糟糕的主意。
会话内部包含一个连接池。连接重用和保持活动状态(默认情况下均处于启用状态)可能会提高整体性能。
以上这几段话由官网翻译而来。这几段话都是说,如无必要,只用一个ClientSession
实例即可。
但我在很多资料看到的是像如下这样用的呀
1 2 3 4 5 6 async def fetch (url ): async with aiohttp.ClientSession() as client: async with aiohttp.request('GET' , url) as resp: assert resp.status == 200 print (await resp.text())
这不明显每请求一次就实例化一个ClientSession
嘛,并没有重用ClientSession
啊。那应该咋办呢,然而官网并没有举出重用ClientSession
的示例(我也是服了,你这么浓墨重彩说道只需一个session
,倒是给个示例啊)。
那只得继续找找资料。然而国内资料不多,只能上github
和stackoverflow
看看。看了半天也没个定论,主要是两个方法。
下面是我写的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 async def fetch (client,url ): async with client.get(url) as resp: assert resp.status == 200 text = await resp.text() return len (text) async def fetch_all (urls ): async with aiohttp.ClientSession() as client: return await asyncio.gather(*[fetch(client,url) for url in urls]) urls = ['http://python.org/' for i in range (3 )] loop=asyncio.get_event_loop() results = loop.run_until_complete(fetch_all(urls)) print (results)print (type (results))
手动创建session,不用with
该方法可以让你获取一个session实例而不仅局限于with代码块中,可以在后续代码中继续使用该session。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def fetch (client,url ): async with client.get(url) as resp: assert resp.status == 200 text = await resp.text() return len (text) async def fetch_all_manual (urls,client ): return await asyncio.gather(*[fetch(client, url) for url in urls]) urls = ['http://python.org/' for i in range (3 )] loop=asyncio.get_event_loop() client = aiohttp.ClientSession() results = loop.run_until_complete(fetch_all_manual(urls,client)) loop.run_until_complete(client.close()) loop.run_until_complete(asyncio.sleep(3 )) loop.close() print (results)print (type (results))
此处着重说明以下该方法一些相关事项
手动创建ClientSession
要手动关闭自己创建的ClientSession
,并且client.close()
是个协程,得用事件循环关闭。 在关闭loop之前要给aiohttp
一点时间关闭ClientSession
如果无上述步骤会报Unclosed client session
的错误,也即ClientSession
没有关闭
但就算你遵循了以上两个事项,如此运行程序会报以下warning,虽然不会影响程序正常进行
1 2 DeprecationWarning: The object should be created from async function client = aiohttp.ClientSession()
这说的是client = aiohttp.ClientSession()
这行代码应该在异步函数中执行。如果你无法忍受可以在定义个用异步方法用作创建session
1 2 3 4 async def create_session (): return aiohttp.ClientSession() session = asyncio.get_event_loop().run_until_complete(create_session())
ClientSession
部分重要参数
下面是ClientSession
的所有参数,这里用的比较多的是connector,headers,cookies。headers和cookies写过爬虫的可能都认识了,这里只谈一下connector。
connector是aiohttp
客户端API的传输工具。并发量控制,ssl证书验证,都可通过connector设置,然后传入ClientSession
。
标准connector有两种:
TCPConnector
用于常规TCP套接字(同时支持HTTP和 HTTPS方案)(绝大部分情况使用这种)。UnixConnector
用于通过UNIX套接字进行连接(主要用于测试)。 所有连接器类都应继承自BaseConnector
。使用可以按以下实例
1 2 3 4 conn=aiohttp.TCPConnector(verify_ssl=False ) async with aiohttp.ClientSession(connector=conn) as session:
TCPConnector
比较重要的参数有
verify_ssl(bool)– 布尔值,对HTTPS请求执行SSL证书验证 (默认情况下启用)。当要跳过对具有无效证书的站点的验证时可设置为False。 limit(int)– 整型,同时连接的总数。如果为limit为 None则connector没有限制(默认值:100)。 limit_per_host(int)–限制同时连接到同一端点的总数。如果(host, port, is_ssl)三者相同,则端点是相同的。如果为limit=0,则connector没有限制(默认值:0)。 如果爬虫用上协程,请求速度是非常快的,很可能会对别人服务器造成拒绝服务的攻击,所以平常使用若无需求,最好还是不要设置limit为0。
限制并发量的另一个做法(使用Semaphore)
使用Semaphore直接限制发送请求。此处只写用法,作抛砖引玉之用。也很容易用,在fetch_all_manual函数里加上Semaphore的使用即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 async def fetch (client,url ): async with client.get(url) as resp: assert resp.status == 200 text = await resp.text() return len (text) async def fetch_all_manual (urls,client ): async with asyncio.Semaphore(5 ): return await asyncio.gather(*[fetch(client, url) for url in urls]) urls = ['http://python.org/' for i in range (3 )] loop=asyncio.get_event_loop() client = aiohttp.ClientSession() results = loop.run_until_complete(fetch_all_manual(urls,client)) loop.run_until_complete(client.close()) loop.run_until_complete(asyncio.sleep(3 )) loop.close() print (results)print (type (results))
aiofiles
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport aiofilesasync def wirte_demo (): async with aiofiles.open ("text.txt" ,"w" ,encoding="utf-8" ) as fp: await fp.write("hello world " ) print ("数据写入成功" ) async def read_demo (): async with aiofiles.open ("text.txt" ,"r" ,encoding="utf-8" ) as fp: content = await fp.read() print (content) async def read2_demo (): async with aiofiles.open ("text.txt" ,"r" ,encoding="utf-8" ) as fp: async for line in fp: print (line) if __name__ == "__main__" : asyncio.run(wirte_demo()) asyncio.run(read_demo()) asyncio.run(read2_demo())
视频的爬取 程序员会想办法把用户上传好的视频进行转码(不同清晰度)做切片(ts)处理。这样既方便用户进行大跨度的调整进度条(最小延迟)。也能为公司节省大量的流量费。既然要把视频切成非常多个小碎片。那就需要有个文件来记录这些小碎片的路径。该文件⼀般为M3U文件,M3U文件中的内容经过UTF-8的编码后,就是M3U8文件。今天, 我们看到的各大视频网站平台使用的几乎都是M3U8文件。
我们首先就要找这个文件。
想要抓取一个视频:
找到m3u8 (各种手段) 通过m3u8下载到ts文件 可以通过各种手段(不仅是编程手段,甚至pr) 把ts文件合并为一个mp4文件 一些格式固定的代码 跳过其中的无效信息,去掉行中的换行、空格等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioimport aiohttpimport aiofilesasync def aio_download (up_url ): tasks = [] async with aiohttp.ClientSession() as session: async with aiofiles.open ("m3u8.txt" , mode="r" , encoding='utf-8' ) as f: async for line in f: if line.startswith("#" ): continue line = line.strip() ts_url = up_url + line task = asyncio.create_task(download_ts(ts_url, line, session)) tasks.append(task) await asyncio.wait(tasks) async def download_ts (url, name, session ): async with session.get(url) as resp: async with aiofiles.open (f"video/{name} " , mode="wb" ) as f: await f.write(await resp.content.read()) print (f"{name} 下载完毕" )
调用时:
1 2 asyncio.run(aio_download(url_up))
加密的问题 ts文件可能加密过,需要用m3u8的key.key
文件中找密钥,需要抓包确定密钥文件前面的域名,然后下载密钥文件,对ts文件解密:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from Crypto.Cipher import AES async def dec_ts (name, key ): aes = AES.new(key=key, IV=b"0000000000000000" , mode=AES.MODE_CBC) async with aiofiles.open (f"video/{name} " , mode="rb" ) as f1,\ aiofiles.open (f"video/temp_{name} " , mode="wb" ) as f2: bs = await f1.read() await f2.write(aes.decrypt(bs)) print (f"{name} 处理完毕" ) async def aio_dec (key ): tasks = [] async with aiofiles.open ("m3u8.txt" , mode="r" , encoding="utf-8" ) as f: async for line in f: if line.startswith("#" ): continue line = line.strip() task = asyncio.create_task(dec_ts(line, key)) tasks.append(task) await asyncio.wait(tasks)
一般使用from Crypto.Cipher import AES
,在m3u8文件中会有标注
合并ts文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def merge_ts (): lst = [] s = '' with open ("shdbz.m3u8" , mode="r" , encoding="utf-8" ) as f: for line in f: if line.startswith("#" ): continue line = line.strip() lst.append(f"{line} +" ) print (lst) for str in lst: s += str s = s[0 :-1 ] os.system(f"copy/b {s} xxx.mp4" )
Selenium Selenium 是一种开源工具,用于在 Web 浏览器上执行自动化测试(使用任何 Web 浏览器进行 Web 应用程序测试)。可以对抗一些反爬但是问题是比较慢。
一些基本操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from selenium.webdriver import Chromefrom selenium.webdriver.common.keys import Keysfrom selenium.webdriver.common.by import Byimport timeweb = Chrome() web.get("http://lagou.com" ) el = web.find_element(By.XPATH, '//*[@id="changeCityBox"]/p[1]/a' ) el.click() time.sleep(1 ) web.find_element(By.XPATH, '//*[@id="search_input"]' ).send_keys("python" , Keys.ENTER) web.find_element_by_xpath('//*[@id="s_position_list"]/ul/li[1]/div[1]/div[1]/div[1]/a/h3' ).click() web.switch_to.window(web.window_handles[-1 ]) job_detail = web.find_element_by_xpath('//*[@id="job_detail"]/dd[2]/div' ).text print (job_detail)web.close() web.switch_to.window(web.window_handles[0 ])
页面中iframe
如何处理 1 2 3 4 5 6 7 8 web.get("https://www.91kanju.com/vod-play/541-2-1.html" ) iframe = web.find_element(By.XPATH,'//*[@id="player_iframe"]' ) web.switch_to.frame(iframe) tx = web.find_element(By.XPATH,'//*[@id="main"]/h3[1]' ).text print (tx)
无头浏览器与下拉列表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from selenium.webdriver import Chromefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.support.select import Selectimport timefrom selenium.webdriver.common.by import Byopt = Options() opt.add_argument("--headless" ) opt.add_argument("--disbale-gpu" ) web = Chrome(options=opt) web.get("https://www.endata.com.cn/BoxOffice/BO/Year/index.html" ) time.sleep(2 ) sel_el = web.find_element(By.XPATH, '//*[@id="OptionDate"]' ) sel = Select(sel_el) for i in range (len (sel.options)): sel.select_by_index(i) time.sleep(2 ) table = web.find_element(By.XPATH,'//*[@id="TableList"]/table' ) print (table.text) print ("===================================" ) print ("运行完毕. " )web.close()
处理滑块 https://blog.csdn.net/Jeeson_Z/article/details/82047685
以B站为例,但是现在B站好像已经不用滑块了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 from selenium import webdriverfrom selenium.webdriver.support.wait import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.webdriver.common.by import Byfrom selenium.webdriver import ActionChainsfrom urllib.request import urlretrievefrom bs4 import BeautifulSoupimport refrom PIL import Imagefrom time import sleepdef init (): global url, browser, username, password, wait url = 'https://passport.bilibili.com/login' browser = webdriver.Chrome() username = '17389025490' password = 'Zhaoweining750' wait = WebDriverWait(browser, 20 ) def login (): browser.get(url) user = wait.until(EC.presence_of_element_located((By.ID, 'login-username' ))) passwd = wait.until(EC.presence_of_element_located((By.ID, 'login-passwd' ))) user.send_keys(username) passwd.send_keys(password) def get_image_info (img ): ''' :param img: (Str)想要获取的图片类型:带缺口、原始 :return: 该图片(Image)、位置信息(List) ''' soup = BeautifulSoup(browser.page_source, 'lxml' ) imgs = soup.find_all('div' , {'class' : 'gt_cut_' + img + '_slice' }) img_url = re.findall('url\(\"(.*)\"\);' , imgs[0 ].get('style' ))[0 ].replace('webp' , 'jpg' ) urlretrieve(url=img_url, filename=img + '.jpg' ) image = Image.open (img + '.jpg' ) position = get_position(imgs) return image, position def get_position (img ): ''' :param img: (List)存放多个小图片的标签 :return: (List)每个小图片的位置信息 ''' img_position = [] for small_img in img: position = {} position['x' ] = int (re.findall('background-position: (.*)px (.*)px;' , small_img.get('style' ))[0 ][0 ]) position['y' ] = int (re.findall('background-position: (.*)px (.*)px;' , small_img.get('style' ))[0 ][1 ]) img_position.append(position) return img_position def Corp (image, position ): ''' :param image:(Image)被裁剪的图片 :param position: (List)该图片的位置信息 :return: (List)存放裁剪后的每个图片信息 ''' first_line_img = [] second_line_img = [] for pos in position: if pos['y' ] == -58 : first_line_img.append(image.crop((abs (pos['x' ]), 58 , abs (pos['x' ]) + 10 , 116 ))) if pos['y' ] == 0 : second_line_img.append(image.crop((abs (pos['x' ]), 0 , abs (pos['x' ]) + 10 , 58 ))) return first_line_img, second_line_img def put_imgs_together (first_line_img, second_line_img, img_name ): ''' :param first_line_img: (List)第一行图片位置信息 :param second_line_img: (List)第二行图片信息 :return: (Image)拼接后的正确顺序的图片 ''' image = Image.new('RGB' , (260 , 116 )) offset = 0 for img in first_line_img: image.paste(img, (offset, 0 )) offset += img.size[0 ] x_offset = 0 for img in second_line_img: image.paste(img, (x_offset, 58 )) x_offset += img.size[0 ] image.save(img_name) return image def is_pixel_equal (bg_image, fullbg_image, x, y ): """ :param bg_image: (Image)缺口图片 :param fullbg_image: (Image)完整图片 :param x: (Int)位置x :param y: (Int)位置y :return: (Boolean)像素是否相同 """ bg_pixel = bg_image.load()[x, y] fullbg_pixel = fullbg_image.load()[x, y] threshold = 60 if (abs (bg_pixel[0 ] - fullbg_pixel[0 ] < threshold) and abs (bg_pixel[1 ] - fullbg_pixel[1 ] < threshold) and abs ( bg_pixel[2 ] - fullbg_pixel[2 ] < threshold)): return True else : return False def get_distance (bg_image, fullbg_image ): ''' :param bg_image: (Image)缺口图片 :param fullbg_image: (Image)完整图片 :return: (Int)缺口离滑块的距离 ''' distance = 57 for i in range (distance, fullbg_image.size[0 ]): for j in range (fullbg_image.size[1 ]): if not is_pixel_equal(fullbg_image, bg_image, i, j): return i def get_trace (distance ): ''' :param distance: (Int)缺口离滑块的距离 :return: (List)移动轨迹 ''' trace = [] faster_distance = distance * (4 / 5 ) start, v0, t = 0 , 0 , 0.2 while start < distance: if start < faster_distance: a = 1.5 else : a = -3 move = v0 * t + 1 / 2 * a * t * t v = v0 + a * t v0 = v start += move trace.append(round (move)) return trace def move_to_gap (trace ): slider = wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'gt_slider_knob' ))) ActionChains(browser).click_and_hold(slider).perform() for x in trace: ActionChains(browser).move_by_offset(xoffset=x, yoffset=0 ).perform() sleep(0.5 ) ActionChains(browser).release().perform() def main (): init() login() bg, bg_position = get_image_info('bg' ) fullbg, fullbg_position = get_image_info('fullbg' ) bg_first_line_img, bg_second_line_img = Corp(bg, bg_position) fullbg_first_line_img, fullbg_second_line_img = Corp(fullbg, fullbg_position) bg_image = put_imgs_together(bg_first_line_img, bg_second_line_img, 'bg.jpg' ) fullbg_image = put_imgs_together(fullbg_first_line_img, fullbg_second_line_img, 'fullbg.jpg' ) distance = get_distance(bg_image, fullbg_image) trace = get_trace(distance - 10 ) move_to_gap(trace) sleep(5 ) if __name__ == '__main__' : main()
处理滑块的一些问题 在处理12306的滑块时总是失败,使用selenium套件操作滑块后会出现“哎呀,出错了,点击刷新再来一次”这样的提示。
知己知彼,参见https://blog.csdn.net/weixin_44685869/article/details/105602629
网页只要设置了检查webdriver
的Javascript
方法,就很容易发现爬虫。使用的方法就是Navigator
对象的webdriver
属性,用这个属性来判断客户端是否通过WebDriver
驱动浏览器。如果监测到客户端的webdriver
属性存在,则无法继续操作获取数据。selenium
就存在WebDriver
属性。
监测结果有3种,分别是true
,false
,undefind
。
最广为人知的识别是否是selenium
的方法就是 window.navigator.webdriver
,当浏览器被打开后,js
就会给当前窗口一个window
属性,里面存放着用户的各种"信息"。
使用渲染工具有 webdriver
属性时,navigation.webdriver
的返回值时true
。反之则会返回false
或者undefind
。
了解了WebDriver
识别的原理和返回值后,我们就能相处应对的办法。既然 Web Driver
的识别依赖navigation.webdriver
的返回值,那么我们在触发Javascript
办法前将navigation.webdriver
的返回值改为false
或者undefind
,问题就解决了。
不过这种修改该属性值的办法只在当前页面有效,当浏览器打开新标签或新窗口时需要重新执行改变navigator.webdriver
值的JavaScript
代码。一般来说足够了。
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from selenium.webdriver import Chromeimport timebrower = Chrome(executable_path=r'D:\python\chromedriver_win32\chromedriver.exe' ) url = 'http://www.porters.vip/features/webdriver.html' brower.get(url) script = 'Object.defineProperty(navigator,"webdriver",{get:() => false,});' brower.execute_script(script) brower.find_element_by_css_selector('.btn.btn-primary.btn-lg' ).click() elements = brower.find_element_by_css_selector('#content' ) time.sleep(1 ) print (elements.text)brower.close()
或者在创建浏览器对象时:
1 2 3 4 5 option = Options() option.add_argument('--disable-blink-features=AutomationControlled' ) web = Chrome(options=option)
PS. 使用opencv处理缺口类滑块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import cv2GAUSSIAN_BLUR_KERNEL_SIZE = (5 , 5 ) GAUSSIAN_BLUR_SIGMA_X = 0 CANNY_THRESHOLD1 = 200 CANNY_THRESHOLD2 = 450 def get_gaussian_blur_image (image ): return cv2.GaussianBlur(image, GAUSSIAN_BLUR_KERNEL_SIZE, GAUSSIAN_BLUR_SIGMA_X) def get_canny_image (image ): return cv2.Canny(image, CANNY_THRESHOLD1, CANNY_THRESHOLD2) def get_contours (image ): contours, _ = cv2.findContours(image, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE) return contours def get_contour_area_threshold (image_width, image_height ): contour_area_min = (image_width * 0.15 ) * (image_height * 0.25 ) * 0.8 contour_area_max = (image_width * 0.15 ) * (image_height * 0.25 ) * 1.2 return contour_area_min, contour_area_max def get_arc_length_threshold (image_width, image_height ): arc_length_min = ((image_width * 0.15 ) + (image_height * 0.25 )) * 2 * 0.8 arc_length_max = ((image_width * 0.15 ) + (image_height * 0.25 )) * 2 * 1.2 return arc_length_min, arc_length_max def get_offset_threshold (image_width ): offset_min = 0.2 * image_width offset_max = 0.85 * image_width return offset_min, offset_max def main (): image_raw = cv2.imread('captcha.png' ) image_height, image_width, _ = image_raw.shape image_gaussian_blur = get_gaussian_blur_image(image_raw) image_canny = get_canny_image(image_gaussian_blur) contours = get_contours(image_canny) cv2.imwrite('image_canny.png' , image_canny) cv2.imwrite('image_gaussian_blur.png' , image_gaussian_blur) contour_area_min, contour_area_max = get_contour_area_threshold(image_width, image_height) arc_length_min, arc_length_max = get_arc_length_threshold(image_width, image_height) offset_min, offset_max = get_offset_threshold(image_width) offset = None for contour in contours: x, y, w, h = cv2.boundingRect(contour) if contour_area_min < cv2.contourArea(contour) < contour_area_max and \ arc_length_min < cv2.arcLength(contour, True ) < arc_length_max and \ offset_min < x < offset_max: cv2.rectangle(image_raw, (x, y), (x + w, y + h), (0 , 0 , 255 ), 2 ) offset = x cv2.imwrite('image_label.png' , image_raw) print ('offset' , offset) if __name__ == '__main__' : main()