爬虫学习总结之web端爬虫

爬虫Web Spider

Posted by MZ on August 22, 2022

爬虫学习总结(web端爬虫)

2022年本科毕业那段时间,在家自学了一段时间爬虫,不过是跟着B站视频学习的,不是很系统,如今研一暑假又买了崔庆才的《Python3网络爬虫开发实战 第二版》进行学习,不过只学了web端中自己感兴趣的部分,现在做一下总结。(很多说法可能和书中类似,也可以直接去看崔庆才的博客

爬虫介绍

爬虫(Web Crawler),也被称为网络爬虫、网络蜘蛛、网络机器人等,是一种自动化的技术,用于从互联网上获取信息并将其收集、提取、存储或分析。爬虫可以模拟人类在网络上浏览和检索信息的行为,但它们是自动化的程序,能够在短时间内处理大量的数据。

我们可以把互联网比作一张大网,而爬虫(即网络爬虫)便是在网上爬行的蜘蛛。把网的节点比作一个个网页,爬虫爬到这就相当于访问了该页面,就能把网页上的信息提取出来。我们可以把节点间的连线比作网页与网页之间的链接关系,这样蜘蛛通过一个节点后,可以顺着节点连线继续爬行到达下一个节点,即通过一个网页继续获取后续的网页,这样整个网的节点便可以被蜘蛛全部爬行到,网站的数据就可以被抓取下来了。

简而言之,爬虫可以帮助我们快速把网站上的信息快速提取并保存下来。

爬虫流程

先说一下爬虫的大致流程,基本上所有的爬虫都有这4个小部分:1、发送请求,2、获取响应,3、解析数据,4、保存数据。

不管是什么样的爬虫,基本上就是这四部分,只不过有的爬虫会设置一些字段或者构造一些加密数据来对抗那些反爬手段,但这些都是为了更好的完成这4个部分。

①发送请求

爬虫首先要做的就是模拟浏览器向服务器发送请求,这个请求可以是静态Html请求,也可以是Ajax请求,只要浏览器能发送的,爬虫就可以模仿。一般的Http请求用到urllib、requests库等。

②获得响应

爬虫模拟发送的请求成功会获得服务器的响应,请求Html就响应Html文件,请求Ajax就返回Json字符串数据。

③解析数据

根据得到的Html和Json数据分析出自己想要的数据。解析Html一般用正则表达式、节点属性、CSS选择器、Xpath等解析,python中的Beautiful Soup、pyquery、lxml库都封装了这些解析规则;而Json数据不用解析,直接从中提取自己想要的数据即可。

④保存数据

解析得到的数据可以保存到自己想要的任意格式文件以及数据库中,例如CSV、TXT、Excel和JSON等文件以及Mysql、MongoDB等数据库中。

简单爬虫之爬取Html

先介绍一种最简单的入门级爬虫,就是爬取静态渲染的页面,而且该页面数据和Html源代码数据是一样的,没有经过反爬手段处理。典型案例就是爬取豆瓣电影top250了,不过这里贴另一个崔庆才自己写的案例网站:ssr1

接下来粗略的写一下爬取的过程:先打开浏览器的开发者工具,然后找到Network,刷新页面找到网站的请求。Html一般是第一个。然后用python库(urllib或requests)get方法模拟发送请求,获取的源代码利用python库(Beautiful Soup、pyquery、lxml等)进行解析,最后找一个自己喜欢的方式保存或打印数据出来就行。源代码如下:

#电影数据网站,无反爬,数据通过服务端渲染,适合基本爬虫练习。
import requests
import re
import logging
from urllib.parse import urljoin
import json
from os import makedirs
from os.path import exists
import multiprocessing

RESULTS_DIR = 'results'
exists(RESULTS_DIR) or makedirs(RESULTS_DIR)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center/page/'
#请求url获取html文本
def askUrl(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.text
    else: 
        return
#解析列表页(正则表达式)
def parse_index(html):
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern, html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL, item)
        logging.info('get detail url %s', detail_url)
        yield detail_url
#解析详情页(正则表达式)
def parse_detail(html):
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
    published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
    cover = re.search(cover_pattern, html).group(1).strip() if re.search(cover_pattern, html) else None
    name = re.search(name_pattern, html).group(1).strip() if re.search(name_pattern, html) else None
    categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else []
    published_at = re.search(published_at_pattern, html).group(1) if re.search(published_at_pattern, html) else None
    drama = re.search(drama_pattern, html).group(1).strip() if re.search(drama_pattern, html) else None
    score = float(re.search(score_pattern, html).group(1).strip()) if re.search(score_pattern, html) else None
    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }
#保存数据
def save_data(data):
    name = data.get('name')
    data_path = f'{RESULTS_DIR}/{name}.json'
    json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, indent=2)


def main(page):
    url = BASE_URL+str(page)
    index_html = askUrl(url)
    detail_urls = parse_index(index_html)
    for detail_url in detail_urls:
        detail_html = askUrl(detail_url)
        data = parse_detail(detail_html)
        save_data(data)

#多进程开启
if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pages = range(1, 11)
    pool.map(main, pages)
    pool.close()
    pool.join()

简单爬虫之爬取Ajax

再介绍一种相对简单的入门级爬虫,爬取动态渲染的页面,也就是Ajax请求,这里贴一个崔庆才自己写的案例网站:spa1

接下来粗略的写一下爬取的过程:先打开浏览器的开发者工具,然后找到Network,刷新页面找到网站的请求。找到请求类型为xhr的就是Ajax请求,看看响应数据哪一个是自己想要的请求。然后用python库(urllib或requests)get方法模拟发送请求,获得的直接是JSON数据可以直接提取,最后找一个自己喜欢的方式保存或打印数据出来就行。源代码如下:

#电影数据网站,无反爬,数据通过 Ajax 加载,页面动态渲染,适合 Ajax 分析和动态页面渲染爬取。
import requests
import logging
import json

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

INDEX_URL = 'https://spa1.scrape.center/api/movie/?limit={limit}&offset={offset}'
DETAIL_URL = 'https://spa1.scrape.center/api/movie/{id}/'
LIMIT = 10
def scrape_index(page):
    url = INDEX_URL.format(limit=LIMIT, offset=LIMIT*(page - 1))
    return scrape_api(url)

def scrape_api(url):
    logging.info('scraping %s...', url)
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json()
        logging.error('get error status code %s while scraping %s', response.status_code, url)
    except requests.RequestException:
        logging.error('error occured while scraping %s', url)

def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    return scrape_api(url)

def main():
    for page in range(1, LIMIT + 1):
        index_content = scrape_index(page=page)
        for item in index_content.get('results'):
            id = item.get('id')
            detail_content = scrape_detail(id)
            logging.info('%s,%s,%s',detail_content.get('name'),detail_content.get('alias'),detail_content.get('score'))

if __name__ == '__main__':
    main()

复杂爬虫之爬取Html

接下来是复杂一点的Html,为什么复杂呢?因为加入了反爬手段,目前我只知道字体反爬数据Html反爬的一种,其实也不算Html反爬,而是CSS反爬,它是利用CSS的字体样式,把Html源代码的字体表示和网页中字体表示不一样,这样如果你想要获得网页中的字体,但你解析的是源代码中的数据,所以结果和预期不符。

举例:

​ 网页上现在数字为:0123456789

​ 源代码字体样式修改为:0756389124

复杂爬虫之爬取Ajax

最后介绍一种反爬手段相对较多,并且比较复杂多变的js逆向爬取。

大部分网站做反爬手段一般会给请求加一个参数例如token、sign等,而且这个参数是实时的,不是一成不变的。这个参数一般是在JavaScript代码里的逻辑生成的,所以我们要去研究js代码,而网站维护人员不想我们破解掉这个加密逻辑,就采用JavaScript压缩、混淆、加密等方式让我们无法阅读js代码。

不过尽管有这么多问题阻碍我们请求数据,还是有很多解决办法的,例如浏览器强大的功能、Hook技术、反混淆等,然我们获取请求不再那么困难。

因为我们最重要的是模拟请求,所以流程一般为第一步找到请求,观察是否有加密数据;然后第二步根据加密数据找到加密入口,这里可以用断点、全局搜索、Hook技术等找到加密入口;第三步观察这个入口的js代码是否经过加密、压缩、混淆等手段,然后进行反向借密,反混淆等;第四步利用python或者nodejs或者浏览器运行js加密逻辑,获得加密数据填充到请求中,模拟发送。发送成功后,后面的就简单多了。这里贴一个js逆向案例:spa6

案例代码:

import requests
import time
import hashlib
import base64
from typing import List, Any

INDEX_URL = 'https://spa6.scrape.center/api/movie?limit={limit}&offset={offset}&token={token}'
DETAIL_URL = 'https://spa6.scrape.center/api/movie/{id}/?token={token}'
LIMIT = 10
OFFSET = 0
SECRET = 'ef34#teuq0btua#(-57w1q5o5--j@98xygimlyfxs*-!i-0-mb'

def get_token(args: List[Any]):
    timestamp = str(int(time.time()))
    args.append(timestamp)
    sign = hashlib.sha1(','.join(args).encode('utf-8')).hexdigest()
    return base64.b64encode(','.join([sign,timestamp]).encode('utf-8')).decode('utf-8')

args =['/api/movie']
token = get_token(args)
index_url = INDEX_URL.format(limit=LIMIT,offset=OFFSET,token=token)
response = requests.get(index_url)
# print(response.json())
result = response.json()
for item in result['results']:
    id = item['id']
    encrypt_id = base64.b64encode((SECRET+str(id)).encode('utf-8')).decode('utf-8')
    args = [f'/api/movie/{encrypt_id}']
    token = get_token(args=args)
    detail_url = DETAIL_URL.format(id=encrypt_id,token=token)
    response = requests.get(detail_url)
    print(response.json())