# # -*- coding: utf-8 -*- # # @Author: privacy # # @Date: 2024-09-03 11:24:56 # # @Last Modified by: privacy # # @Last Modified time: 2024-12-23 14:38:28 # import os # import time # import zipfile # import requests # from pydantic import BaseModel # from celery.result import AsyncResult # from fastapi import FastAPI, UploadFile, File, Body # from celery_tasks import celery_app # from celery_tasks.commonprocess import bidding_factor, test_all_files # from celery_tasks.project_loc import extract_project # from celery_tasks.commonprocess import full_func # app = FastAPI() # @app.get('/result') # def back(taskid): # result = AsyncResult(id=taskid, app=celery_app) # if result.successful(): # val = result.get() # return "执行完成,结果:%s" % val # else: # return '正在处理中...' # # 检测编码(已完成) # def decode_path(path): # '''zipfile解压出现乱码,将乱码的路径编码为UTF8''' # try: # path_name = path.decode('utf-8') # except Exception: # path_name = path.encode('437').decode('gbk') # path_name = path_name.encode('utf-8').decode('utf-8') # return path_name # # 下载招标、投标、专家打分表图片文件 # def get_file(path_name="1@/static/bm/bid_check_bidder/20240808151412_投标文件25mb.zip", save_file='./download'): # url = 'http://192.168.1.111:9999/admin/sys-file/download' # 要下载的文件的URL # url = 'http://192.168.1.111:9999/admin/sys-file/preview' # json = {"path": path_name} # os.makedirs(save_file, exist_ok=True) # code = 0 # try: # response = requests.get(url, data=json, stream=True) # 发送GET请求,stream参数指定以流的方式下载文件 # except Exception: # print('下载失败,状态码:', response.status_code) # code = 1 # file_name = path_name.split('/')[-1] # save_file_path = save_file + '/new_' + file_name # if response.status_code == 200: # 检查响应状态码 # with open(save_file_path, "wb") as fp: # fp.write(response.content) # print('文件下载完成!') # if code != 0: # 下载失败抓取 # return save_file_path, code # if os.path.isfile(save_file_path) and save_file_path.endswith('.zip'): # # 解压方式2:防止乱码 # tempdir = time.strftime("%Y_%m_%dT%H_%M_%S") # os.makedirs('./cache/' + tempdir, exist_ok=True) # file_path_list = [] # try: # with zipfile.ZipFile(save_file_path, allowZip64=True) as zf: # # 排除目录文件 # file_iter = (filename for filename in zf.filelist if os.path.isfile(save_file_path)) # for filename in file_iter: # # 编码文件名称为 utf 格式 # filename.filename = decode_path(filename.filename) # 防止乱码的操作 # zf.extract(filename, "./cache/" + tempdir) # if filename.filename[-1] == '/': # continue # file_path_list.append("./cache/" + tempdir + '/' + filename.filename) # except Exception as e: # print(e) # return file_path_list, code # return save_file_path, code # # 预审查、清标 # @app.post("/file_upload") # async def file_upload(text_list=Body(None)): # """ # { # 'bidderUnit': '杭州华新机电工程有限公司', # 'bidderFile': '1@/static/bm/bid_pre_check/20240924172133_三峡投标文件新-华新(1).zip', # 'buyFile': '1@/static/bm/project/20240924171822_三峡电站左岸厂房桥机远程智能化操作研发与实施重新招标.pdf', # 'reportFlag': '0', # 'projectName': '华新', # 'projectId': '2024-9-24-0' # } # """ # try: # json_post = eval(text_list) # except Exception: # json_post = text_list # buyFile = json_post['buyFile'] # 采购文件 # bidderFile = json_post['bidderFile'] # 投标文件 # try: # buy_file_path, code = get_file(buyFile) # bidder_file_path, code = get_file(bidderFile) # json_data = { # "code": 0, # "name": '', # "msg": "文件下载成功", # "data": {}, # "ok": True # } # except Exception: # json_data = { # "code": 1, # "name": '', # "msg": "文件下载失败", # "data": {}, # "ok": False # } # return json_data