欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 使用python多线程高效采集书籍小说

使用python多线程高效采集书籍小说

2025/11/6 5:09:01 来源:https://blog.csdn.net/shamqu/article/details/143841839  浏览:    关键词:使用python多线程高效采集书籍小说

前面sham学着用php和js来采集,但是效率非常慢,于是通过ai得到了python版,这里记录下。
注:Sham目前还不会python,纯记录备忘。另只用于学习,请勿随意采集他人网站

直接上代码

import asyncio
import aiohttp
import re
from bs4 import BeautifulSoup
import osbook_api = '书籍网站地址'
filter_key = '需要采集的小说类别,比如完本小说'
start_id = 1
#创建文件夹
def ensure_folder_exists(folder_path):#如果书本文件夹不存在,就创建if not os.path.exists(folder_path):os.makedirs(folder_path)print(f"文件夹 '{folder_path}' 已创建。")# 获取小说书籍主页面信息
async def get_book_info(url, bookid, session):global book_apiheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}try:async with session.get(url, headers=headers) as response:soup = BeautifulSoup(await response.text(), 'html.parser')chapters = []# 获取书名booktitle = soup.find('h1').text.strip()#这里替换特殊字符,防止倒是创建文件夹有问题book_title = re.sub(r'[\/:*?"<>|]', "_", booktitle)#拼接书名,前面加id是为了后续方便判断是否已经采集book_name = f"{bookid}_{book_title}"# 获取状态book_status = soup.find('div', class_='small').find_all('span')[1].text.replace('状态:', '').strip()# 获取书籍类别信息path_div = soup.find('div', class_='path wap_none')if path_div:contents = path_div.get_text(strip=True, separator=' ')parts = contents.split('>')book_category = parts[1].strip() if len(parts) > 1 else "未知分类"else:book_category = "未知分类"#这里是判断只采集{filter_key}设置的内容,通过分类来判断是否跳过,可以根据自己要求替换成比如状态等  if book_category != f"{filter_key}":print(f"{book_name} 不是{filter_key},跳过")return None, [], book_status, Noneelse:#这里执行判断创建文件夹ensure_folder_exists(f"{book_name}")# 获取作者信息,通过class=small的div中,span标签内内容,同时替换掉”作者:",以获取作者是谁#这个要根据采集的页面的的html标签来自定义author = soup.find('div', class_='small').find_all('span')[0].text.replace('作者:', '').strip()# 获取更新时间,方法同上update_time = soup.find('div', class_='small').find_all('span', class_='last')[0].text.replace('更新:', '').strip()# 获取最新章节,方法同上latest_chapter = soup.find('div', class_='small').find_all('span', class_='last')[1].find('a').text.strip()# 获取简介,方法同上,这里加[0]是只获取第一个dd标签内容dd_tag = soup.find('div', class_='intro').find_all('dd')[0]# 提取<dd>标签中的所有文本all_text = dd_tag.get_text()# 找到<span class="noshow">标签noshow_span = dd_tag.find('span', class_='noshow')# 提取<span class="noshow">标签之前的文本if noshow_span:# 获取<span class="noshow">标签在<dd>标签中的索引位置noshow_index = all_text.index(noshow_span.get_text())# 提取<noshow>标签之前的文本desc_txt = all_text[:noshow_index].strip()else:desc_txt = all_text.strip()#print(f'简介: {desc_txt}')#保存书本信息到txtwith open(f"{book_name}/book_desc.txt", 'w', encoding='utf-8') as files:files.write(f'书名:{book_name}\n类别:{book_category}\n封面:cover.jpg\n作者:{author}\n状态:{book_status}\n更新时间:{update_time}\n最新章节:{latest_chapter}\n简介:{desc_txt}')print(f"已保存书本信息")# 获取封面图片链接并下载到本地cover_image_url = soup.find('div', class_='cover').find('img')['src']cover_save_path = f"{book_name}/cover.jpg"await download_cover_image(cover_image_url, cover_save_path, session)# 获取章节列表#这里加idx是当前采集的网站文章列表页最上面会有最新章节,会导致第一个文件是最新章节的情况idx = 0#循环获取a标签内的内容for chapter_link in soup.find_all('a', href=True):#判断如果href中包含htm链接,同时包含book/,证明是章节链接,跳过js等其他链接if '.htm' in chapter_link['href'] and 'book/' in chapter_link['href']:#这里就是起那面加idx的作用,当第一个链接不是第一章时,跳过,然后idx加1if idx == 0 and '1.htm' not in chapter_link['href']:print(f"跳过章节: {chapter_link}(不包含 '/1.htm')")idx += 1  # 增加索引以指向下一个链接continue  # 跳过当前循环迭代#这里是或者章节链接中的章节名chapter_title = re.sub(r'[\/:*?"<>|]', "_", chapter_link.text.strip())#这里是章节链接,因为是站内链接,所以前面加chapter_url = book_api + chapter_link['href']chapters.append((chapter_title, chapter_url))#保存章节    #获取章节列表数组,只保留章节标题formatted_chapters = [f"{title}" for title, url in chapters]#转成用换行符分隔的字符串string_content = "\n".join(formatted_chapters)# 将字符串内容写入到章节列表,这个列表是为了方便后面按顺序读取章节用with open(f"{book_name}/chapter_list.txt", 'w', encoding='utf-8') as file:file.write(string_content)print(f"已保存章节列表。")       #返回数据return book_name, chapters, book_status, book_category#错误信息except Exception as e:print(f"获取书籍信息失败: {e}")return None, [], None, None# 下载封面图片
async def download_cover_image(url, save_path, session, retries=3):for attempt in range(retries):try:async with session.get(url, timeout=10) as response:if response.status == 200:with open(save_path, 'wb') as file:file.write(await response.read())print(f"封面图片已下载到 {save_path}")returnelse:print(f"封面图片下载失败,状态码: {response.status}")except Exception as e:print(f"下载封面图片时出错(尝试 {attempt + 1}/{retries} 次): {e}")await asyncio.sleep(2)print(f"封面图片下载失败,已尝试 {retries} 次")# 获取章节内容
async def get_chapter_content(url, session):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}async with session.get(url, headers=headers) as response:soup = BeautifulSoup(await response.text(), 'html.parser')#方法参考上面,获取div标签中id为chaptercontent里面的内容content_div = soup.find('div', id='chaptercontent')if content_div:#将html中的br替换成txt文件中的换行for br in content_div.find_all('br'):br.replace_with('\n')#获取所有行内容lines = content_div.get_text().splitlines()#这里过滤掉包含指定字符的行,用于替换掉广告等filtered_lines = [line for line in lines if '广告' not in line and '点此报' not in line]return "\n".join(filtered_lines)else:return ""# 处理单本书籍
async def process_book(book_id, session):global book_apibook_url = f'{book_api}/book/{book_id}/'book_name, chapters, book_status, book_category = await get_book_info(book_url, book_id, session)#当书本类型为指定筛选类型时执行后续操作if book_category == f"{filter_key}" and book_name and chapters:for chapter_title, chapter_url in chapters:chapter_path = f"{book_name}/{chapter_title}.txt"#如果已经存在章节txt同时不为空时,表示已经采集过了,跳过if os.path.exists(chapter_path) and os.path.getsize(chapter_path) > 0:print(f"章节 {chapter_title} 已存在,跳过")continueprint(f"正在采集章节: {chapter_title}")#获取章节内容content = await get_chapter_content(chapter_url, session)#这里再次判断是否已经存在书籍文件夹ensure_folder_exists(book_name)#打开并写入当前章节txtwith open(chapter_path, 'w', encoding='utf-8') as file:file.write(content)print(f"已保存章节: {chapter_title}")#当完成章节采集后,更新已采集的书本列表,这个用于获取已有书籍清单(不过有时会缺失或重复,建议后续全部采集完后通过获取books内文件名来重新生成if os.path.exists(f"已采集_in_{book_category}.txt"):with open(f"已采集_in_{book_category}.txt", 'a', encoding='utf-8') as file:file.write(f"{book_name}\n")else:with open(f"已采集_in_{book_category}.txt", 'w', encoding='utf-8') as file:file.write(f"{book_name}\n")# 主循环,批量处理书籍,保证始终有100本在下载
async def main():#如果存在正在采集的idtxt,则获取里面的id,这个用于当采集中断时判断采集到哪了(这里存的是当前第100个任务书籍,如果中途断了,需要将id改成往前100本,避免中间章节缺失)global start_idif os.path.exists("正在采集的id.txt"):with open("正在采集的id.txt", 'r', encoding='utf-8') as file:start_id = int(file.read().strip())#定义aiohttpasync with aiohttp.ClientSession() as session:current_id = start_idtasks = []# 保持队列中始终有100本书在下载while current_id < 180000:if len(tasks) < 100:task = asyncio.create_task(process_book(current_id, session))tasks.append((task, current_id))current_id += 1continue# 获取已完成的任务并移除非完本任务done, pending = await asyncio.wait([task for task, _ in tasks], return_when=asyncio.FIRST_COMPLETED)for completed_task in done:tasks = [(task, id) for task, id in tasks if task != completed_task]# 更新已采集的书本IDwith open("正在采集的id.txt", 'w', encoding='utf-8') as file:file.write(str(current_id))# 运行异步主程序
asyncio.run(main())

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词