当前位置:   article > 正文

python多线程处理文件流_python 文件流处理

python 文件流处理

在做项目的时候,有个需求,要求解析前端批量传入的文件并存到数据库中,尝试许多办法都没能解决,十几个文件处理耗时差不多7s左右,最后采用基本的Threading方法解决,耗时1s,速度提升了7倍,下面我来详细解析思路。

首先这个路由处理函数使用了`async def`和`await`关键字,因为它涉及到异步操作,特别是文件上传过程。使用异步操作能够提高性能,避免阻塞整个应用程序,同时允许应用程序在文件上传的同时处理其他请求。`await file.read()`这一行代码异步地读取上传的文件内容,`await`关键字确保在文件读取完成前不会执行下一行代码,以保证异步操作的顺序执行。

  1. @router.post("/importThinks", summary="导入")
  2. async def import_thinks(files: List[UploadFile] = File(..., description="文件"),
  3. id: str = File(..., description="案例id"),
  4. creator: str = File(..., description="上传人")):
  5. logger.info(f"接收参数:{creator}{files}")
  6. try:
  7. processed_plans=[]
  8. for file in files:
  9. file_content = await file.read()
  10. filename = file.filename.lower()
  11. processed_plans.append((filename,file_content))
  12. return case_service.import_thinks(processed_plans,d, creator)
  13. except Exception as e:
  14. logging.error(e)
  15. return error_response()

接下来,我们在服务层实现import_thinks方法,这段代码是一个导入方法 `import_thinks`,它接收文件列表 `files`、案例ID `=id` 和上传者信息 `creator` 作为参数。它通过多线程的方式处理每个文件的导入操作,每个线程调用 `process_plan` 函数来处理导入,将结果存储在 `data` 列表中。

  1. def import_thinks(self, files, =id, creator):
  2. data = []
  3. threads = []
  4. t1=time.time()
  5. # 创建并启动线程
  6. for filename,file_content in files:
  7. thread = threading.Thread(target=process_plan, args=(self, =id,filename,creator,file_content,data))
  8. threads.append(thread)
  9. thread.start()
  10. # 等待所有线程完成
  11. for thread in threads:
  12. thread.join()
  13. print("耗时:",time.time()-t1,"s")
  14. return success_response(data)

最后我们实现process_plan方法

  1. def process_plan(self, id, name, creator, file,data):
  2. plan = None
  3. if name.endswith(".docx"):
  4. plan = word_dic(file)
  5. elif name.endswith(".xlsx"):
  6. plan = excel_dic(file)
  7. if plan:
  8. plan["creator"] = creator
  9. plan["id"] = id
  10. plan["create_time"] = datetime.now().strftime("%Y-%m-%d")
  11. try:
  12. result = self.get_es.insert_one('plan', document=plan)
  13. data.append({"name": name, "success": result.get("result") == "created"})
  14. except Exception as e:
  15. logging.error(e)
  16. data.append({"name": name, "success": False})
  17. else:
  18. data.append({"name": name, "success": False})
'
运行
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/927222
推荐阅读
相关标签
  

闽ICP备14008679号