赞
踩
尝试在我的博客中添上程序流程图,如果画的有误或有修改意见请各位大佬提出,我会加以改进的
def get_page_code(url):
with requests.get(url) as resp:
text =resp.text #获得页面的源代码
print("已经获取到源代码") # 你不要这个也行,但是我看着没有任何提示语句的程序内心很慌张
return text
找m3u8文件,直接在页面源代码中查找m3u8就行,查找快捷键:Ctrl+F
你就看到了这一行代码
让我们观察一下,m3u8地址在ifram标签中的src属性里面,我们要确认一下这个页面是否只有一个iframe标签,如果是直接全页面搜索iframe即可,经过查找发现,该页面只有一个iframe标签,那就好办了,这里你可以用xpath或者BeautifulSoup都可以,如果要用BeautifulSoup的话,需要在程序开头加上一句 from bs4 import BeautifulSoup即可,如果报错,评论区中告诉我,我尝试解决
但是这个m3u8文件的地址需要进行处理
上xpath(xpath不会的话我后期可能会写一篇博客)
def get_first_m3u8_url(code):
tree = etree.HTML(code)#创建etree对象,由于这里是HTML所以就选HTML就行
src = tree.xpath('//iframe/@src')[0]#//表示满页面的找ifame标签,@src表示获取iframe标签的src属性值,由于xpath返回的是一个列表,我们只要第一个,所以就是0
# 到了这一步我们拿到了第一层m3u8文件的地址,但需要提取
src= src.split("=")[1].strip('&id')# 真正的m3u8文件的地址在第二个元素中
print("已经获取到了第一层m3u8的地址")
return src
在第二个函数中我们已经获得到了第一层m3u8文件的地址,但是真正的m3u8文件的地址实在第二层m3u8文件中
所以还要再处理一次
def download_m3u8_file(first_m3u8_file):
print("正在下载第二层m3u8文件")
second= get_page_code(first_m3u8_file)
root = first_m3u8_file.rsplit('/',3)[0]
second = second.split()[-1]
second = root+second#拼接第二层地址
second_file = get_page_code(second)
with open("m3u8.txt",mode="w",encoding='utf-8') as f:
f.write(second_file)
print("第二层m3u8文件下载完成")
这是用协程来实现的两个函数,应该能看得懂
async def download_one(url, sem): async with sem: # 这玩意叫信号量。 可以控制并发量, 目前看 运行稳定。 应该没啥问题 for i in range(100): try: print(url, "开始工作") filename = url.split('/')[-1] # 刚刚这里有问题 async with aiohttp.ClientSession() as session: async with session.get(url) as resp: content = await resp.content.read() async with aiofiles.open(f"./待拼接的/{filename}",mode='wb') as f: await f.write(content) print(f"{filename}下载成功") break except Exception as e: print(f"网址为{url}出错了,重新尝试", e) print(f"等待{(i + 2) * 5}秒") await asyncio.sleep((i+2)*5) async def download_all(): sem = asyncio.Semaphore(10) # 10 表示最大并发量是10 也就是有10个任务可以被挂起 tasks=[] with open("m3u8.txt",mode="r",encoding='utf-8') as f: lines=f.readlines() for line in lines: line=line.strip() if "#" in line: continue else: task = asyncio.create_task(download_one(line, sem)) tasks.append(task) await asyncio.wait(tasks)
Windows上是这么做的
在cmd中输入copy /b 第一个文件(含扩展名) +第二个文件(含扩展名) 最终文件(含扩展名)
Mac上是这么干的
cat 第一个文件(含扩展名) +第二个文件(含扩展名) 最终文件(含扩展名) > 最终文件(含扩展名)
def merget(): namelist=[] with open("m3u8.txt",encoding="utf-8") as f: lines = f.readlines() for line in lines: if "#" in line: continue else: line=line.strip() line=line.split('/')[-1] namelist.append(line) print("已经将所有的ts文件名加入到了列表中,准备合并文件") os.chdir('./待拼接的') temp = [] #存放ts文件名后转成字符串的列表 n=1 for i in range(len(namelist)): temp.append(namelist[i]) if i!=0 and i%40==0: command="+".join(temp) os.system(f"copy /b {command} {n}.ts") temp=[] n+=1 command="+".join(temp)#假如有48个文件,这一步处理剩下来的文件 os.system(f"copy /b {command} {n}.ts") print("倒数第二步") n+=1 temp=[] for i in range(1,n): temp.append(f"{i}.ts") command = "+".join(temp) os.system(f"copy /b {command} hope.mp4") print("完成!!!我希望看到这一步")
import os import requests from lxml import etree import asyncio import aiohttp import aiofiles def get_page_code(url): with requests.get(url) as resp: text =resp.text print("已经获取到源代码") return text def get_first_m3u8_url(code): tree = etree.HTML(code) src = tree.xpath('//iframe/@src')[0] src= src.split("=")[1].strip('&id') print("已经获取到了第一层m3u8的地址") return src def download_m3u8_file(first_m3u8_file): print("正在下载第二层m3u8文件") second= get_page_code(first_m3u8_file) root = first_m3u8_file.rsplit('/',3)[0] second = second.split()[-1] second = root+second#拼接第二层地址 second_file = get_page_code(second) with open("m3u8.txt",mode="w",encoding='utf-8') as f: f.write(second_file) print("第二层m3u8文件下载完成") async def download_one(url, sem): async with sem: # 这玩意叫信号量。 可以控制并发量 for i in range(100): try: print(url, "开始工作") filename = url.split('/')[-1] # 刚刚这里有问题 async with aiohttp.ClientSession() as session: async with session.get(url) as resp: content = await resp.content.read() async with aiofiles.open(f"./待拼接的/{filename}",mode='wb') as f: await f.write(content) print(f"{filename}下载成功") break except Exception as e: # 这样写就能知道错哪儿了 print(f"网址为{url}出错了,重新尝试", e) print(f"等待{(i + 2) * 5}秒") await asyncio.sleep((i+2)*5) async def download_all(): sem = asyncio.Semaphore(10) # 10 表示最大并发量是10 也就是有10个任务可以被挂起 tasks=[] with open("m3u8.txt",mode="r",encoding='utf-8') as f: lines=f.readlines() for line in lines: line=line.strip() if "#" in line: continue else: task = asyncio.create_task(download_one(line, sem)) tasks.append(task) await asyncio.wait(tasks) def merget(): namelist=[] with open("m3u8.txt",encoding="utf-8") as f: lines = f.readlines() for line in lines: if "#" in line: continue else: line=line.strip() line=line.split('/')[-1] namelist.append(line) print("已经将所有的ts文件名加入到了列表中,准备合并文件") os.chdir('./待拼接的') temp = [] #存放ts文件名后转成字符串的列表 n=1 for i in range(len(namelist)): temp.append(namelist[i]) if i!=0 and i%40==0: command="+".join(temp) os.system(f"copy /b {command} {n}.ts") temp=[] n+=1 command="+".join(temp)#假如有48个文件,这一步处理剩下来的文件 os.system(f"copy /b {command} {n}.ts") print("倒数第二步") n+=1 temp=[] for i in range(1,n): temp.append(f"{i}.ts") command = "+".join(temp) os.system(f"copy /b {command} hope.mp4") print("完成!!!我希望看到这一步") if __name__ == '__main__': url = "http://www.wbdy.tv/play/67656_1_1.html" main_code=get_page_code(url) first_m3u8 = get_first_m3u8_url(main_code) print(first_m3u8) download_m3u8_file(first_m3u8) loop = asyncio.get_event_loop() loop.run_until_complete(download_all()) merget()
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。