模拟与暴力

REST Assured 15 - RequestSpecification Call Http方法

  返回  

day28爬虫实战总结

2021/8/21 14:31:58 浏览:

方案1:直接使用多线程

# 1.1直接使用thread
import time
from threading import Thread

def func1(num):
    time.sleep(2)
    print(f'子线程任务{num}')


t = Thread(target=func1, args=(1,))
t.start()
# join中的timeout是设置最长等待时间,如果超时子线程的任务还没有完成,那么后面的代码会直接执行
t.join(timeout=3)
print('线程任务完成')

# 1.2使用thread类的子类


class FThread(Thread):
    def __init__(self, num):
        super().__init__()
        self.num = num

    def run(self) -> None:
        time.sleep(2)
        print(f'子线程任务{self.num}')


t = FThread(1)
t.start()
t.join()
print("任务完成")

方案2.直接使用多进程

import time
from multiprocessing import Process


def func1(num):
    time.sleep(2)
    print(f'子进程任务{num}')


class FProcess(Process):
    def __init__(self, num):
        super().__init__()
        self.num = num

    def run(self) -> None:
        time.sleep(3)
        print(f'子进程任务{self.num}')


if __name__ == '__main__':
    p = Process(target=func1, args=(1,))
    p.start()
    # p.join()
    # print("进程任务完成")

    p2 = FProcess(2)
    p2.start()
# 方案一,方案二数据获取问题:直接使用多线程,多进程的数据获取问题
# 1.数据跨线程:定义全局变量,直接获取相关数据;(如果要使用全局队列,可以用线程队列:queue模块中的Queue)
# 2.数据跨进程:只能使用进程队列(multiprocessing模块中的Queue)
# 3.线程队列和进程队列用法的区别
"""
线程队列:
1)定义全局队列对象
2)添加数据:队列对象.put(数据)
3)获取数据:队列对象.get() / 队列对象.get(timeout=超时时间)

进程队列:
1)定义全局队列对象
2)添加数据:队列对象.put(数据)
3)获取数据:队列对象.get() / 队列对象.get(timeout=超时时间)
注意:队列对象在子进程中使用的时候必须通过进程任务对应的函数的参数来传递

方案3:使用线程池

from concurrent.futures import ThreadPoolExecutor, wait, as_completed, ALL_COMPLETED
import time


def func1(num):
    time.sleep(2)
    print(f'子线程任务{num}完成')
    return f'子线程任务{num}的数据'
# 创建线程池


pool = ThreadPoolExecutor(max_workers=10)
# 添加任务
# pool.submit(func1, 1)
# for x in range(2, 6):
#     pool.submit(func1, x)
ts = [pool.submit(func1, x) for x in range(1, 50)]

# pool.shutdown() - 关闭线程池不在往里面加入任务,不影响已经提交的任务的执行

# 单纯的等待线程池中所有的任务完成
# wait(ts, return_when=ALL_COMPLETED)
# print("----------所有的任务都完成了-----------")

# 4.等待任务完成的过程中实时获取数据
for t in as_completed(ts):
    print(t.result())

方案4:进程池

from multiprocessing import Pool
import time


def func1(num):
    time.sleep(2)
    print(f'子进程任务{num}开始')
    return f'子进程任务{num}的数据'


if __name__ == '__main__':
    # 1.创建进程池对象
    pool = Pool(10)

    # 2.添加任务
    # 1)同步添加 - 任务和主进程中的其它任务串行执行
    # result1 = pool.apply(func1, (0,))
    # result2 = pool.map(func1, [1, 2, 3, 4])
    # print(result1)
    # print(result2)

    # 2)异步添加
    # result = pool.apply_async(func1, (6,))
    # result2 = pool.map_async(func1, [1, 2, 3, 4, 5])
    # # 3.关闭进程池
    # pool.close()
    #
    # # 4.等待进程池中的任务都结束
    # pool.join()
    # print(result.get())
    # print(result2.get())

实战51 job详情页

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED
import requests, json, csv
from re import findall
import time
from queue import Queue
from threading import Thread


headers = {
    'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36'
}


def get_one_page(page: int):
    """获取一页岗位信息"""
    # 获取数据
    url = f'https://search.51job.com/list/000000,000000,0000,00,9,99,数据分析,2,{page}.html?lang=c&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&ord_field=0&dibiaoid=0&line=&welfare='
    response = requests.get(url, headers=headers)
    json_data = json.loads(findall(r'window.__SEARCH_RESULT__ = (.+?)</script>', response.text)[0])

    # 解析数据
    page_data = []
    for job in json_data['engine_search_result']:
        job_data = {}
        # 获取岗位相关信息
        job_data['name'] = job['job_name']
        job_data['providesalary_text'] = job['providesalary_text']
        job_data['workarea_text'] = job['workarea_text']
        job_data['company_name'] = job['company_name']
        job_data['companytype_text'] = job['companytype_text']
        job_data['workarea_text'] = job['workarea_text']
        job_data['job_href'] = job['job_href']
        job_data['jobwelf'] = job['jobwelf']

        # 将工资信息中的金额分离,并且将单位统一转换成元
        salary = findall(r'(\d+\.?\d*)-?(\d+\.?\d*)([万千])/([月年天])', job_data['providesalary_text'])
        if salary:
            salary = salary[0]
            unit = salary[2]
            time = salary[3]
            min_salary = (float(salary[0]) * (10000 if unit == '万' else 1000)) / (1 if time == '月' else 12)
            max_salary = (float(salary[1]) * (10000 if unit == '万' else 1000)) / (1 if time == '月' else 12)
        else:
            min_salary = max_salary = 0

        job_data['min_salary'] = min_salary
        job_data['max_salary'] = max_salary
        job_data['salary'] = (min_salary + max_salary) / 2
        page_data.append(job_data)
    return page_data


# 获取每个工作的详情页(模拟)
def get_details_page(data):
    print(f'请求工作的详情页:{data["job_href"]}')
    time.sleep(1)
    print(f'-----------------请求成功!------------------')

    # response = requests.get(data["job_href"])
    # response.text
    # data['详情页信息字段'] = 详情页解析出的相关数据
    data['详情页'] = '详情页数据'
    queue.put(data)


def save_data():

    writer = csv.DictWriter(
        open('files/数据分析.csv', 'a', encoding='utf-8'),
        ['name', 'providesalary_text', 'workarea_text', 'company_name',
         'companytype_text', 'workarea_text', 'job_href', 'jobwelf', 'min_salary',
         'max_salary', 'salary', '详情页']
    )
    writer.writeheader()
    while True:
        data = queue.get()
        if data == 'end':
            break
        writer.writerow(data)


if __name__ == '__main__':
    # 1.创建线程池请求每个工作的详情页
    pool2 = ThreadPoolExecutor(50)
    ts2 = []

    queue = Queue()
    save_thread = Thread(target=save_data)
    save_thread.start()

    # 2.创建线程池请求搜索结果的每一页数据
    pool1 = ThreadPoolExecutor(50)
    ts = [pool1.submit(get_one_page, page) for page in range(1, 101)]
    pool1.shutdown()

    # 3.如果请求到一个数据,就将一页数据中的50个岗位的详情请求作为任务添加到pool2中
    for t in as_completed(ts):
        for job in t.result():
            print(job['job_href'])
            t2 = pool2.submit(get_details_page, job)
            ts2.append(t2)
    pool2.shutdown()

    wait(ts2, return_when=ALL_COMPLETED)
    queue.put('end')
    print('----------------------------------全部结束-----------------------------')

京东商品评论

import requests
from bs4 import BeautifulSoup
from functools import reduce
from re import findall
from json import loads
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed

headers = {
    'cookie': '__jdv=76161171|direct|-|none|-|1628560053224; __jdu=16285600532191992332185; shshshfpb=l7hwQrGrqIi4eCr58XglTaQ%3D%3D; shshshfpa=6f4d3f44-5d90-7676-86d0-e6c4df742265-1617089801; user-key=478b38e2-62c2-4780-abdc-7ee92a3a8757; pinId=vgq34oMxHnsjt7JTMb9B0A; pin=15300022703_p; unick=jd153000xiz; _tp=SS6A2kOuyFHrS5Nm%2BHtauA%3D%3D; _pst=15300022703_p; qrsc=3; areaId=22; ipLoc-djd=22-1930-50947-0; PCSYCityID=CN_510000_510100_510107; rkv=1.0; TrackID=1erZ0XUO8xw_6hUL-f2Hc_nktnWuiTcO4mpb6IvZqKkILGf2FhGrEnjJXwSsmjbmZDcgSFyPaBnzaWacgmu38rV1O4_-kXXn13bNGTCAY6qQ8UbWIlrn9D4FEAlJ2l4Tc; thor=CCB112E41BF7B8A18299559069E1826B2FE40F79A9C40AB9CF6C7ACF4FE9B0D189CA2691BCCB844F401D55F533423E22D93DB74BC974DE5CBAD06E96E66DC8968A44F9A96268E9F0FDC91512405DD5D2CA092862CB2AA3D051F92E8B9714C2A9F110FBDA52F37F63703ECAFFD70A5E993920298C08ABB5DEC447DD3F4611CAE31BB210BC0953C98497CFCDD5B086F499; ceshi3.com=201; 3AB9D23F7A4B3C9B=MBL5WIIFBJTQE7QBGVZP75S4LKWL4F6HAICGTLHP5C7ASWPR3ARXWOZP5CCBRG5MWZV2B7IB6VAIGDPHH4XMIDMXGI; __jda=122270672.16285600532191992332185.1628560053.1628838707.1629443811.8; __jdb=122270672.8.16285600532191992332185|8.1629443811; __jdc=122270672; shshshfp=3e4645d35360fc979933ecbd8a88e64d; shshshsID=d26b51263e90f337994443223e764969_5_1629444224406',
    'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36'
}


def get_goods_list(page):
    url = f'https://search.jd.com/Search?keyword=%E7%94%B5%E8%84%91&wq=%E7%94%B5%E8%84%91&pvid=f1269a60000642a7bdda4078a12138d6&page={page}&s=56&click=0'
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.text, 'lxml')
    all_goods_li = soup.select('.gl-warp>li')
    all_goods_id = []
    for goods_li in all_goods_li:
        all_goods_id.append(goods_li.attrs['data-sku'])
    return all_goods_id


def get_one_page_comments(goods_id: str, page: int):
    url = f'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId={goods_id}&score=0&sortType=5&page={page}&pageSize=10&isShadowSku=0&fold=1'
    response = requests.get(url, headers=headers)
    json_str = findall(r'fetchJSON_comment98\((.+)\)', response.text)[0]
    comment_info = loads(json_str)
    comments = comment_info['comments']
    for x in comments:
        print(x['content'])


def get_goods_comments(goods_id: str):
    url = f'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId={goods_id}&score=0&sortType=5&page=1&pageSize=10&isShadowSku=0&fold=1'
    response = requests.get(url, headers=headers)
    json_str = findall(r'fetchJSON_comment98\((.+)\)', response.text)[0]
    comment_info = loads(json_str)
    comment_summary = comment_info['productCommentSummary']
    count = reduce(lambda x, y: x+comment_summary[f'score{y}Count'], range(1, 6), 0)
    for x in range(count//10+1):
        t = pool.submit(get_one_page_comments, goods_id, x)
        ts.append(t)

    pool.shutdown()


if __name__ == '__main__':
    # 获取评论信息的线程池
    pool = ThreadPoolExecutor(50)
    ts = []

    # 获取数据
    all_goods_id = get_goods_list(1)
    print(all_goods_id)
    get_goods_comments(all_goods_id[0])

    wait(ts, return_when=ALL_COMPLETED)

联系我们

如果您对我们的服务有兴趣,请及时和我们联系!

服务热线:18288888888
座机:18288888888
传真:
邮箱:888888@qq.com
地址:郑州市文化路红专路93号