#coding:utf-8 from fastapi import FastAPI, Request, Body, UploadFile,Form from fastapi import BackgroundTasks, FastAPI, File, UploadFile,Request import os import uvicorn import time import requests # from Zipfile import zipfile import main app = FastAPI() # 报价再评审 @app.post("/quote_review_result") async def create_item(text_list=Body(None)): #json_post_raw = await request.json() try: json_post = json.dumps(eval(text_list)) except: # json_post = json.dumps({'prompt':str(text_list)}) json_post = text_list print(json_post) projectId = json_post['projectId'] projectName = json_post['projectName'] bidderUnit = json_post['bidderUnit'] quoteCheck = json_post['quoteCheck'] # try: if 1: data = main.quote_review_result(quoteCheck[0]) code = 0 ok = True msg = '成功' # except: # code = 1 # ok = False # data = {} # msg = '失败' _json = {"code":code, "ok":ok, "data":data, "msg":msg} return _json @app.post("/req_json") async def req_json(request: Request, json_data: dict): # request.json() 返回json,此时可以利用pydantic进行json数据校验并映射到一个model类实例 json_post_raw = await request.json() return json_post_raw # 获取minio上的文件 def get_minio_file(): if os.path.isfile(path) and path.endswith('.zip'): ## 解压方式1:存在乱码 # f = zipfile.ZipFile(file, mode='r') # f.extractall(target_dir) ## 解压方式2:防止乱码 with ZipFile(path, allowZip64=True) as zf: # 排除目录文件 # print("zf.filelist", zf.filelist) file_iter = (filename for filename in zf.filelist if os.path.isfile(path)) for filename in file_iter: # 编码文件名称为 utf 格式 filename.filename = decode_path(filename.filename) # 防止乱码的操作 zf.extract(filename, "./cache/" + tempdir) return 1 # 专家打分图表上传 @app.post("/quote_check_score") async def data(text_list=Body(None)): # json_post = await request.json() try: json_post = json.dumps(eval(text_list)) except: # json_post = json.dumps({'prompt':str(text_list)}) json_post = text_list try: projectId = json_post['projectId'] projectName = json_post['projectName'] # 企业列表 bidderUnit = json_post['bidderUnit'] # 不同模块评审因素 quoteCheck = json_post['quoteCheck'] # 专家打分图片列表 scoreFile = json_post['scoreFile'] except: pass # try: if 1: data = main.quote_check_score() code = 0 ok = True msg = '成功' # except: # code = 1 # ok = False # data = {} # msg = '失败' _json = {"code":code, "ok":ok, "data":data, "msg":msg} return _json # 下载招标、投标、专家打分表图片文件 def get_file(path_name="1@/static/bm/project/20240726130404_采购文件.docx", save_file='./download'): url = 'http://222.92.111.18:8006/admin/sys-file/download' # 要下载的文件的URL url = 'http://222.92.111.18:8006/admin/sys-file/preview' json = {"path":path_name} response = requests.get(url, data=json, stream=True) # 发送GET请求,stream参数指定以流的方式下载文件 file_name = path_name.split('/')[-1] save_file_path = save_file + '/new_'+file_name # print(response.text) code = 0 if response.status_code == 200: # 检查响应状态码 try: with open(save_file_path, 'wb') as f: # 打开本地文件进行写入操作 for chunk in response.iter_content(chunk_size=1024): # 分块读取文件内容,每次读取1KB if chunk: # 检查是否有数据块可读 f.write(chunk) # 将数据块写入本地文件 f.flush() # 刷新缓冲区,确保数据写入磁盘 print('文件下载完成!') except: code = 1 else: print('下载失败,状态码:', response.status_code) code = -1 return save_file_path, code #预审查、清标 @app.post("/file_upload") async def file_upload(text_list=Body(None)): # json_post = await request.json() try: json_post = json.dumps(eval(text_list)) except: # json_post = json.dumps({'prompt':str(text_list)}) json_post = text_list print(json_post) projectId = json_post['projectId'] projectName = json_post['projectName'] bidderUnit = json_post['bidderUnit'] reportFlag = json_post['reportFlag'] # 0 预审查 1 清标 buyFile = json_post['buyFile'] #采购文件 bidderFile = json_post['bidderFile'] #投标文件 try: buy_file_path, code = get_file(buyFile) bidder_file_path, code = get_file(bidderFile) json_data = { "code":0, "name":'', "msg":"文件下载成功", "data":{}, "ok":True } except: json_data = { "code":1, "name":'', "msg":"文件下载失败", "data":{}, "ok":False } return json_data return json_data if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0', port=8888, workers=1)