import uvicorn from fastapi import FastAPI from pydantic import BaseModel from hashlib import md5 from logger import LoggerHandler logger = LoggerHandler(name="kgupdate") logger.set_file_handler(filename="kgupdate.log") app = FastAPI() global cookie, base_url, pushed cookie = "JSESSIONID=40225388-b817-471b-8fc0-7afb72389712" base_url = 'http://180.76.188.39:8284/' pushed = [] eduback_dict = { 26:'博士后', 24:'博士', 22:'MBA/EMBA', 20:'硕士', 18:'本科', 16:'大专', 14:'中专/中技', 12: '高中', 10:'初中及以下' } def post_json(json_obj, token="1654940290763"): """ 将json数据提交到push接口 :param json_obj: :return: """ global pushed # 是否已推送 if json_obj['@id'] in pushed: return # 推送 url = base_url + "data/api/access/push" headers = { "token": token, "Cookie": cookie } try: response = requests.post(url, json=json_obj, headers=headers) result = response.text logger.info(result) except: logger.error(e) return "ERROR: {} 数据推送 KG 失败!".format(json) else: pushed.append(json_obj['@id']) return @app.post("/API/TEST") async def trans_push(xdpost: dict): xdpost = xdpost['data'] high_edu = {"schoolName": None, "major": None, "degree": None} org_list = [] job_list = [] for job in xdpost["hisJob"]: org_list.append(job['companyName']) job_list.append(job['companyName'] + job['industry'] + job['jobName'] + job['jobDesc']) post_json({ "@id": md5(job['companyName'].encode(encoding="UTF-8")).hexdigest(), "id": md5(job['companyName'].encode(encoding="UTF-8")).hexdigest(), "@type": "相关机构", "@contentType": "struct", "@markdel": "0", "name": [{"@value": job['companyName']}] }) post_json({ "@id": md5(job['id'].encode(encoding="UTF-8")).hexdigest(), "id": job['id'], "@type": "工作经历demo", "name": job['companyName'] + job['industry'] + job['jobName'] + job['jobDesc'], "@markdel": "0", "@contentType": "struct", "时间": [{"@value": job['startTime']+job['endTime']}], "公司": [{"@value": job['companyName']}], "行业": [{"@value": job['industry']}], "职位": [{"@value": job['jobName']}], "工作内容": [{"@value": job['jobDesc']}] }) edu_list = [] for edu in xdpost["hisEdu"]: if (not high_edu.get("schoolName")) or (high_edu['degree'] < edu['degree']): high_edu['schoolName'] = edu['schoolName'] high_edu['major'] = edu['major'] high_edu['degree'] = edu['degree'] org_list.append(edu['schoolName']) edu_list.append(edu['startTime'] + edu['schoolName'] + edu['major'] + eduback_dict[edu['degree']]) post_json({ "@id": md5(edu['schoolName'].encode(encoding="UTF-8")).hexdigest(), "id": md5(edu['schoolName'].encode(encoding="UTF-8")).hexdigest(), "@type": "相关机构", "@contentType": "struct", "@markdel": "0", "name": [{"@value": edu['schoolName']}] }) post_json({ "@id": md5(edu['id'].encode(encoding="UTF-8")).hexdigest(), "id": edu['id'], "@type": "教育经历demo", "name": edu['startTime'] + edu['schoolName'] + edu['major'] + eduback_dict[edu['degree']], "@markdel": "0", "@contentType": "struct", "时间": [{"@value": edu['startTime']+edu['endTime']}], "学校": [{"@value": edu['schoolName']}], "专业": [{"@value": edu['major']}], "学历": [{"@value": edu['degree']}] }) pro_list = [] for pro in xdpost["hisProject"]: pro_list.append(pro['companyName'] + pro['projectName'] + pro['projectOffice'] + pro['projectDuty']) post_json({ "@id": md5(pro['id'].encode(encoding="UTF-8")).hexdigest(), "id": pro['id'], "@type": "项目经历demo", "name": pro['companyName'] + pro['projectName'] + pro['projectOffice'] + pro['projectDuty'], "@markdel": "0", "@contentType": "struct", "时间": [{"@value": pro['startTime']+pro['endTime']}], "公司": [{"@value": pro['companyName']}], "项目": [{"@value": pro['projectName']}], "职位": [{"@value": pro['projectDuty']}], "成果": [{"@value": pro['projectOffice']}] }) tra_list = [] json_obj = { "@id": md5(xdpost['tId'].encode(encoding='UTF-8')).hexdigest(), "id": xdpost['tId'], "@type": "人才特征demo", "name": xdpost['name'], "@markdel": '0', "@contentType": "struct", "姓名": [{"@value": xdpost['name']}], "年龄": [{"@value": xdpost['age']}], "性别": [{"@value": '男' if xdpost['gender'] == "0" else '女'}], "出生年月": [{"@value": xdpost['birthTime']}], "手机号码": [{"@value": xdpost['mobile']}], "电子邮箱": [{"@value": xdpost['email']}], "政治面貌": [{"@value": xdpost['politics']}], "参加工作时间": [{"@value": xdpost['workBeginTime']}], "当前职位": [{"@value": xdpost['currentJob']}], "意向职位": [{"@value": xdpost['intentJob']}], "当前年薪": [{"@value": xdpost['currentSalaryYearly']}], "当前所在城市": [{"@value": None}], "意向城市": [{"@value": None}], "意向年薪": [{"@value": xdpost['intentSalaryYearlyMax']}], "相关机构": org_list, "教育经历": edu_list, "工作经历": job_list, "项目经历": pro_list, "培训和海外经历": tra_list, "当前最高学历": [{"@value": eduback_dict[high_edu['degree']]}], "最高学历学校": [{"@value": high_edu['schoolName']}], "当前最高学历专业": [{"@value": high_edu['major']}], "语言能力": [{"@value": val['lanName']} for val in xdpost['language']], "技术职称": [{"@value": None}], "研究领域": [{"@value": val['researchName']} for val in xdpost['researchList']], "研究领域分类": [{"@value": val['researchName']} for val in xdpost['researchList']], "婚姻状况": [{"@value": None}], "特长爱好": [{"@value": None}], "人才标签": [{"@value": None}], "人才特点": [{"@value": None}], "当前行业": [{"@value": None}], "专业证书": [{"@value": None}], "入选人才": [{"@value": None}], "知识产权": [{"@value": None}], "获得荣誉及证明": [{"@value": None}], "备注信息": [{"@value": None}], "对报名岗位认识及工作设想": [{"@value": None}], "自我评价及主要工作业绩": [{"@value": None}], "当前公司": [{"@value": None}], "毕业院校分类": [{"@value": None}], "工作年限": [{"@value": None}], "专业方向大类": [{"@value": None}], "报名岗位": [{"@value": None}], } return {"errno": 0, "msg": "Success"} if __name__ == '__main__': uvicorn.run(app=app, host="0.0.0.0", port=8400)