import asyncio from datetime import datetime from string import Template from bilibili_api import live import ffmpy3 class Recorder: def __init__(self,room_display_id): self.ROOM_DISPLAY_ID=room_display_id async def __getInfoAndUrl(self,l): return await l.get_room_info(), await l.get_room_play_url(live.ScreenResolution.FOUR_K) def start(self, outputNameTemplate: str = '【${qn_desc}】${name} - ${datetime}', log: bool = True): OUTPUT_NAME_TEMPLATE=Template(outputNameTemplate) l = live.LiveRoom(room_display_id=self.ROOM_DISPLAY_ID) info, play_url = asyncio.get_event_loop().run_until_complete(self.__getInfoAndUrl(l)) anchor_info=info["anchor_info"]["base_info"] current_qn = play_url["current_qn"] quality_descriptions = play_url["quality_description"] durl = play_url["durl"] current_qn_desc = "未知清晰度" for quality_description in quality_descriptions: if(current_qn == quality_description["qn"]): current_qn_desc = quality_description["desc"] url = durl[len(durl)-1]["url"] outputName = OUTPUT_NAME_TEMPLATE.safe_substitute({ 'name': anchor_info["uname"], 'qn_desc': current_qn_desc, 'datetime': datetime.now().isoformat() }) f_handler=open("./logs/"+outputName+'.log', 'w') if log else asyncio.subprocess.PIPE self.ff = ffmpy3.FFmpeg( inputs={url:" -v debug"}, outputs={"./videos/"+outputName+".flv":"-c copy -y"} ) self.process = self.ff.run_async(stdout=f_handler, stderr=f_handler) print(self.ff.cmd) asyncio.get_event_loop().run_until_complete(self.process) asyncio.get_event_loop().run_until_complete(self.ff.wait()) print(self.process) def stop(self): self.process.terminate()