#coding:utf-8 from fastapi import FastAPI, Request, Body, UploadFile,Form from fastapi import BackgroundTasks, FastAPI, File, UploadFile,Request import os import uvicorn import time # from Zipfile import zipfile import main app = FastAPI() # 报价再评审 @app.post("/quote_review_result") async def create_item(text_list=Body(None)): #json_post_raw = await request.json() try: json_post = json.dumps(eval(text_list)) except: # json_post = json.dumps({'prompt':str(text_list)}) json_post = text_list print(json_post) projectId = json_post['projectId'] projectName = json_post['projectName'] bidderUnit = json_post['bidderUnit'] quoteCheck = json_post['quoteCheck'] # try: if 1: data = main.quote_review_result(quoteCheck[0]) code = 0 ok = True msg = '成功' # except: # code = 1 # ok = False # data = {} # msg = '失败' _json = {"code":code, "ok":ok, "data":data, "msg":msg} return _json @app.post("/req_json") async def req_json(request: Request, json_data: dict): # request.json() 返回json,此时可以利用pydantic进行json数据校验并映射到一个model类实例 json_post_raw = await request.json() return json_post_raw # 获取minio上的文件 def get_minio_file(): if os.path.isfile(path) and path.endswith('.zip'): ## 解压方式1:存在乱码 # f = zipfile.ZipFile(file, mode='r') # f.extractall(target_dir) ## 解压方式2:防止乱码 with ZipFile(path, allowZip64=True) as zf: # 排除目录文件 # print("zf.filelist", zf.filelist) file_iter = (filename for filename in zf.filelist if os.path.isfile(path)) for filename in file_iter: # 编码文件名称为 utf 格式 filename.filename = decode_path(filename.filename) # 防止乱码的操作 zf.extract(filename, "./cache/" + tempdir) return 1 # 专家打分图表上传 @app.post("/quote_check_score") async def data(text_list=Body(None)): # json_post = await request.json() try: json_post = json.dumps(eval(text_list)) except: # json_post = json.dumps({'prompt':str(text_list)}) json_post = text_list try: projectId = json_post['projectId'] projectName = json_post['projectName'] # 企业列表 bidderUnit = json_post['bidderUnit'] # 不同模块评审因素 quoteCheck = json_post['quoteCheck'] # 专家打分图片列表 scoreFile = json_post['scoreFile'] except: pass # try: if 1: data = main.quote_check_score() code = 0 ok = True msg = '成功' # except: # code = 1 # ok = False # data = {} # msg = '失败' _json = {"code":code, "ok":ok, "data":data, "msg":msg} return _json #预审查、清标 @app.post("/file_upload") async def file_upload(request: Request, file: UploadFile = File(...), userId:str=Form(...), pushFilename:str=Form(...)): """ 文件上传 格式:pdf,docx,doc,txt,tar.gz,zip,7z, rar """ json_post = await request.json() projectId = json_post['projectId'] projectName = json_data['projectName'] # 企业列表 bidderUnit = json_data['bidderUnit'] # 报告类型 0 预审查 1 清标 reportFlag = json_data['reportFlag'] # 采购文件 scoreFile = json_data['buyFile'] # 投标文件 bidderFile = json_data['bidderFile'] ip_adder = request.client.host print(callback_url1, ip_adder, pushFilename) #print(userId,pushFilename) res = await file.read() date = time.strftime('%Y-%m-%d', time.localtime(time.time())) try: os.mkdir('./uploads/' + date) except: pass with open('./uploads/' + date + '/' + file.filename, "wb") as f: f.write(res) json_data = { "code":0, "name":'', "msg":"", "data":{}, "ok":True } try: 1/0 get_minio_file(detection_type, './uploads/' + date + '/' + file.filename, pushFilename, userId, date, callback_url1) except: json_data = { "code":1, "name":'', "msg":"", "data":{}, "ok":False } return json_data return json_data if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0', port=13888, workers=1)