123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import uvicorn
- from typing import Dict,List,Any
- from fastapi import FastAPI
- from pydantic import BaseModel
- from hashlib import md5
- app =FastAPI()
- # 各种知识集构建
- global cookie, base_url, willpush, pushed, pushlen
- willpush = True
- cookie = "JSESSIONID=40225388-b817-471b-8fc0-7afb72389712"
- base_url = 'http://180.76.188.39:8284/'
- pushed = []
- pushlen = 0
- def post_json(json_obj, token="1654940290763"):
- """
- 将json数据提交到push接口
- :param json_obj:
- :return:
- """
- global willpush, pushed, pushlen
- # 是否已推送
- if json_obj['@id'] in pushed:
- print(len(pushed))
- return ''
- # 推送
- pushed.append(json_obj['@id'])
- url = base_url + "data/api/access/push"
- headers = {
- "token": token,
- "Cookie": cookie
- }
- # print(url)
- if pushlen < 300:
- return ""
- else:
- response = requests.post(url, json=json_obj, headers=headers)
- result = response.text
- print(result)
- pass
- if json_obj["@type"] == "相关机构":
- print(json_obj)
- time.sleep(0.1)
- return ""
- eduback_dict = {
- 26:'博士后',
- 24:'博士',
- 22:'MBA/EMBA',
- 20:'硕士',
- 18:'本科',
- 16:'大专',
- 14:'中专/中技',
- 12: '高中',
- 10:'初中及以下'
- }
- @app.post("/API/TEST")
- async def trans_push(xdpost: dict):
- xdpost = xdpost['data']
- high_edu = {"schoolName": None,"major": None,"degree": None}
- org_list = []
- job_list = []
- for job in xdpost["hisJob"]:
- org_list.append(job['companyName'])
- job_list.append(job['companyName'] + job['industry'] + job['jobName'] + job['jobDesc'])
- post_json({
- "@id": md5(job['companyName'].encode(encoding="UTF-8")).hexdigest(),
- "id": md5(job['companyName'].encode(encoding="UTF-8")).hexdigest(),
- "@type": "相关机构",
- "@contentType": "struct",
- "@markdel": "0",
- "name": [{"@value": job['companyName']}]
- })
- post_json({
- "@id": md5(job['id'].encode(encoding="UTF-8")).hexdigest(),
- "id": job['id'],
- "@type": "工作经历demo",
- "name": job['companyName'] + job['industry'] + job['jobName'] + job['jobDesc'],
- "@markdel": "0",
- "@contentType": "struct",
- "时间": [{"@value": job['startTime']+job['endTime']}],
- "公司": [{"@value": job['companyName']}],
- "行业": [{"@value": job['industry']}],
- "职位": [{"@value": job['jobName']}],
- "工作内容": [{"@value": job['jobDesc']}]
- })
- edu_list = []
- for edu in xdpost["hisEdu"]:
- if (not high_edu.get("schoolName")) or (high_edu['degree'] < edu['degree']):
- high_edu['schoolName'] = edu['schoolName']
- high_edu['major'] = edu['major']
- high_edu['degree'] = edu['degree']
- org_list.append(edu['schoolName'])
- edu_list.append(edu['startTime'] + edu['schoolName'] + edu['major'] + eduback_dict[edu['degree']])
- post_json({
- "@id": md5(edu['schoolName'].encode(encoding="UTF-8")).hexdigest(),
- "id": md5(edu['schoolName'].encode(encoding="UTF-8")).hexdigest(),
- "@type": "相关机构",
- "@contentType": "struct",
- "@markdel": "0",
- "name": [{"@value": edu['schoolName']}]
- })
- post_json({
- "@id": md5(edu['id'].encode(encoding="UTF-8")).hexdigest(),
- "id": edu['id'],
- "@type": "教育经历demo",
- "name": edu['startTime'] + edu['schoolName'] + edu['major'] + eduback_dict[edu['degree']],
- "@markdel": "0",
- "@contentType": "struct",
- "时间": [{"@value": edu['startTime']+edu['endTime']}],
- "学校": [{"@value": edu['schoolName']}],
- "专业": [{"@value": edu['major']}],
- "学历": [{"@value": edu['degree']}]
- })
- pro_list = []
- for pro in xdpost["hisProject"]:
- pro_list.append(pro['companyName'] + pro['projectName'] + pro['projectDesc'] + pro['projectDuty'])
- post_json({
- "@id": md5(pro['id'].encode(encoding="UTF-8")).hexdigest(),
- "id": pro['id'],
- "@type": "项目经历demo",
- "name": pro['companyName'] + pro['projectName'] + pro['projectDesc'] + pro['projectDuty'],
- "@markdel": "0",
- "@contentType": "struct",
- "时间": [{"@value": pro['startTime']+pro['endTime']}],
- "公司": [{"@value": pro['companyName']}],
- "项目": [{"@value": pro['projectName']}],
- "职位": [{"@value": pro['projectDuty']}],
- "成果": [{"@value": pro['projectDesc']}]
- })
- tra_list = []
- json_obj = {
- "@id": md5(xdpost['tId'].encode(encoding='UTF-8')).hexdigest(),
- "id": xdpost['tId'],
- "@type": "人才特征demo",
- "name": xdpost['name'],
- "@markdel": '0',
- "@contentType": "struct",
- "姓名": [{"@value": xdpost['name']}],
- "年龄": [{"@value": xdpost['age']}],
- "性别": [{"@value": '男' if xdpost['gender'] == "0" else '女'}],
- "出生年月": [{"@value": xdpost['birthTime']}],
- "手机号码": [{"@value": xdpost['mobile']}],
- "电子邮箱": [{"@value": xdpost['email']}],
- "政治面貌": [{"@value": xdpost['politics']}],
- "参加工作时间": [{"@value": xdpost['workBeginTime']}],
- "当前职位": [{"@value": xdpost['currentJob']}],
- "意向职位": [{"@value": xdpost['intentJob']}],
- "当前年薪": [{"@value": xdpost['currentSalaryYearly']}],
- "当前所在城市": [{"@value": xdpost['livingCityName']}],
- "意向城市": [{"@value": xdpost['intentCityName']}],
- "意向年薪": [{"@value": xdpost['intentSalaryYearlyMax']}],
- "相关机构": org_list,
- "教育经历": edu_list,
- "工作经历": job_list,
- "项目经历": pro_list,
- "培训和海外经历": tra_list,
- "当前最高学历": [{"@value": eduback_dict[high_edu['degree']]}],
- "最高学历学校": [{"@value": high_edu['schoolName']}],
- "当前最高学历专业": [{"@value": high_edu['major']}],
- "语言能力": [{"@value": val['lanName']} for val in xdpost['language']],
- "技术职称": [{"@value": val['name']} for val in xdpost['skills']],
- "研究领域": [{"@value": val['researchName']} for val in xdpost['researchList']],
- "研究领域分类": [{"@value": val['researchName']} for val in xdpost['researchList']],
- "婚姻状况": [{"@value": None}],
- "特长爱好": [{"@value": None}],
- "人才标签": [{"@value": None}],
- "人才特点": [{"@value": None}],
- "当前行业": [{"@value": None}],
- "专业证书": [{"@value": None}],
- "入选人才": [{"@value": None}],
- "知识产权": [{"@value": None}],
- "获得荣誉及证明": [{"@value": None}],
- "备注信息": [{"@value": None}],
- "对报名岗位认识及工作设想": [{"@value": None}],
- "自我评价及主要工作业绩": [{"@value": None}],
- "当前公司": [{"@value": None}],
- "毕业院校分类": [{"@value": None}],
- "工作年限": [{"@value": None}],
- "专业方向大类": [{"@value": None}],
- "报名岗位": [{"@value": None}],
- }
- return {"errno": 0, "msg": "Success"}
- if __name__ == '__main__':
- uvicorn.run(app=app, host="0.0.0.0", port=8400)
|