# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2024-09-03 11:24:56 # @Last Modified by: privacy # @Last Modified time: 2024-09-04 11:07:49 from fastapi import FastAPI from pydantic import BaseModel from celery.result import AsyncResult from celery_tasks import celery_app # from celery_tasks.commonprocess import add from celery_tasks.commonprocess import bidding_factor, test_all_files from celery_tasks.project_loc import extract_project tags_metadata = [ { "name": "file", "description": "解析PDF文件" }, { "name": "factor", "description": "解析详审因素" }, { "name": "result", "description": "获取异步任务结果" } ] app = FastAPI(openapi_tags=tags_metadata) class RequestModel(BaseModel): table_list: list @app.post('/') def root(request: RequestModel): # task = add.delay(12, 12) task = extract_project.apply_async(kwargs={'table_list': request.table_list}) return {"message": f"Task {task.id} Start!"} @app.post('/factor', tags=['factor']) def get_factor(request: RequestModel): task = bidding_factor.apply_async(kwargs={'table_list': request.table_list}) return {"message": f"Task {task.id} Start!"} @app.get('/result', tags=['result']) def back(taskid): result = AsyncResult(id=taskid, app=celery_app) if result.successful(): val = result.get() return "执行完成,结果:%s" % val else: return '正在处理中...' @app.get('/file', tags=['file']) def process_file(proj_name: str): task = test_all_files.apply_async(kwargs={'proj_name': proj_name}) return {"message": f"Task {task.id} Start!"}