from pprint import pprint import requests import json from hashlib import md5 import uvicorn from fastapi import FastAPI from typing import Optional app = FastAPI() host = "172.16.57.71" # 请求头 headers = { "token": '369ca613c7a74a58b0f95be2cfd59257', "Content-Type": "application/json", } eduback_dict = { 26:'博士后', 24:'博士', 22:'MBA/EMBA', 20:'硕士', 18:'本科', 16:'大专', 14:'中专/中技', 12:'高中', 10:'初中及以下' } def getQaAttachment(data): nodes = [] ids = [] links = [] for d in data: if not d['target']['id']: d['target']['id'] = md5(str(d['target']['entity']).encode(encoding='UTF-8')).hexdigest() if not d['target']['id'] in ids: nodes.append({"id":d['target']['id'], "name": str(d['target']['entity']), "target":True, "source":False}) ids.append(d['target']['id']) if not d['source']['id'] in ids: nodes.append({"id":d['source']['id'], "name": d['source']['entity'], "source":True, "target":False}) ids.append(d['source']['id']) links.append({'source': d['source']['id'], 'name':d["relation"], 'target': d['target']['id']}) return nodes, links def post_json(json_obj, token="1664516531417"): """ 将json数据提交到push接口 :param json_obj: :return: """ # 推送 url = 'http://{}:8284/data/api/access/push'.format(host) headers = { "token": token, # "Cookie": "JSESSIONID=40225388-b817-471b-8fc0-7afb72389712" } try: response = requests.post(url, json=json_obj, headers=headers) result = response.text logger.info(result) except: logger.error(e) return "ERROR: {} 数据推送 KG 失败!".format(json) else: return 0 @app.get('/person_to_person') def search_person(query: str, less: Optional[bool] = False): """ # 人与人的相关关系查询 # 必选参数: # query: 人名 # 可选参数 # less: 显示少量关系 """ url = 'http://:8085/mpks/api/extra/gremlin'.format(host) if less: json_obj = { # "gremlin": "g.has('name.@value', MATCH, '"+ query +"').outE(['当前公司','最高学历学校']).inV.inE('相关机构').outV.with('*').graph" "gremlin": "g.has('name.@value', MATCH, '"+ query +"').outE('相关机构').limit(8).inV.inE('相关机构').outV.with('*').graph" } else: json_obj = { "gremlin": "g.has('name.@value', MATCH, '"+ query +"').outE('相关机构').inV.inE('相关机构').outV.with('*').graph" } # 请求查询 try: r = requests.post(url, headers=headers, json=json_obj) rst = json.loads(r.text) except Exception as e: logger.error(e) return {"errno":555, "msg": "请求检索失败"} entity_array = [] nodes = [] links = [] ids = dict([]) # 查询成功 if (rst['errno'] == 0) and rst['data']: # 所有实体 for entity in rst['data']['vertices']: if entity["@id"] not in ids: ids[entity["@id"]] = entity["name"] nodes.append({ "id": entity["@id"], "name": entity["name"], "target": False if (entity["@type"] == "人才特征") else True, "source": True if (entity["@type"] == "人才特征") else False, "itemStyle": { "normal": { "color": 'red' if (entity["@type"] == "人才特征") else 'blue' } }, }) for edge in rst['data']['edges']: source = {'id': edge['@from'], 'entity': ids[edge['@from']], "refId": None, "entityIndex": None} target = {'id': edge['@to'], 'entity': ids[edge['@to']], "refId": None, "entityIndex": None} relation = edge['@label'] entity_array.append({"source": source, "relation": relation, "target": target}) links.append({'source': edge['@from'], 'name': edge['@label'], 'target': edge['@to']}) else: pprint(rst['errno']) pprint(rst['msg']) return {"errno": 0, "msg": [], "graph": entity_array, "nodes": nodes, "links": links} @app.get('/search_query') def search_query(query: str): """ # 搜索接口,模糊搜索 # 參數: # query: 查詢的關鍵詞 """ entity_array = [] base_url = 'http://{}:8085/mpks/api/search'.format(host) json_obj = { "query": query, "sort": "relevance", "needCorrect": True, "saveHistory": False } try: r = requests.post(base_url, headers=headers, json=json_obj) rst = json.loads(r.text) except Exception as e: logger.error(e) return {"errno":555, "msg": "请求检索失败"} # 查询结果 if (rst['errno'] == 0) and rst['data']: # 当前实体 source = {'id': rst['data']['results'][0]['entityList'][0]['@id'], 'entity': rst['data']['results'][0]['entityList'][0]['name'],'refId': None, "entityIndex": None} # 实体属性关系 for dic in rst['data']['results'][0]['entityList'][0]['properties']: relation = dic['key'] # 关系 if isinstance(dic['value'], dict) and ("@id" in dic['value']): target = {"id": dic['value']['@id'], "entity":dic['value']['value'], 'refId': None, "entityIndex": None} entity_array.append({"source": source, "relation": relation, "target": target}) # 属性 elif (not isinstance(dic['value'], list)) or ("@id" not in dic['value'][0]): target = {"id": None, "entity":dic['value'], 'refId': None, "entityIndex": None} entity_array.append({"source": source, "relation": relation, "target": target}) # 关系 else: for vec in dic['value']: target = {"id": vec["@id"], "entity": vec["value"], 'refId': None, "entityIndex": None} entity_array.append({"source": source, "relation": relation, "target": target}) # 边 for edge in rst['data']['results'][0]['entityList'][0]['graphData']['vertices']: relation = edge['@type'].replace('demo', '') target = {"id": edge["@id"], "entity": edge["name"], 'refId': None, "entityIndex": None} entity_array.append({"source": source, "relation": relation, "target": target}) else: pprint(rst['errno']) pprint(rst['msg']) nodes, links = getQaAttachment(entity_array) return {"errno": 0, "msg": [], "graph": entity_array, "nodes": nodes, "links": links} @app.get('/search_gremlin') def search_gremlin(query: Optional[str] = None, _id: Optional[str] = None): """ # 搜索接口,精确搜索 # 參數: # query[可选]: 使用实体名称查询 # _id [可选]: 使用实体 id 查询 # 同时填写时 _id 优先 g.has("name.@value",MATCH,"于策").outE("相关机构").inV.inE("相关机构").outV.with("*").graph """ # 请求地址 url = 'http://{}:8085/mpks/api/extra/gremlin'.format(host) # 查询语句 if _id: json_obj = { "gremlin" : "g.key('"+ _id +"').both.with('*').graph" } elif query: json_obj = { "gremlin" : "g.has('name.@value', MATCH, '"+ query +"').both.with('*').graph" } else: return {"errno": 3001, "msg": "can not get query or id", "data": []} # 请求查询 try: r = requests.post(url, headers=headers, json=json_obj) rst = json.loads(r.text) except Exception as e: logger.error(e) return {"errno":555, "msg": "请求检索失败"} entity_array = [] # 查询成功 if (rst['errno'] == 0) and rst['data']: # 当前实体 source = {'id': rst['data']['vertices'][0]['@id'], 'entity': rst['data']['vertices'][0]['name'],'refId': None, "entityIndex": None} # 属性 for relation in rst['data']['vertices'][0].keys(): if relation not in ['@context', '@del', '@edge_number', '@formattype', '@fromtype', '@fromurl', '@id', '@kbid', '@nodeid', '@semiid', '@tags', '@type', '_id', '_type', 'alias', 'appId', 'name', 'nodeId', 'tags', '教育经历', '工作经历', '项目经历', '培训和海外经历', '最高学历学校', '当前公司']: target = {'id': None, 'entity': rst['data']['vertices'][0][relation], 'refId': None, "entityIndex": None} if rst['data']['vertices'][0][relation]: entity_array.append({"source": source, "relation": relation, "target": target}) # 关系 if len(rst['data']['vertices']) > 1: for index in range(1, len(rst['data']['vertices'])): target = {'id': rst['data']['vertices'][index]['@id'], 'entity': rst['data']['vertices'][index]['name'], 'refId': None, "entityIndex": None} for edge in rst['data']['edges']: if edge['@from'] == rst['data']['vertices'][index]['@id']: entity_array.append({"source": target, "relation": edge['@label'].replace('demo', ''), "target": source}) break else: entity_array.append({"source": source, "relation": rst['data']['vertices'][index]['@type'].replace('demo', ''), "target": target}) # 边 # for edge in rst['data']['edges']: # target = {'id': edge['@to'], 'entity': None, "refId": edge['@from'], "entityIndex": source_index} # relation = edge['@label'] # entity_array.append({"source": source, "relation": relation, "target": target}) else: pprint(rst['errno']) pprint(rst['msg']) nodes, links = getQaAttachment(entity_array) return {"errno": 0, "msg": [], "graph": entity_array, "nodes": nodes, "links": links} @app.post('/update_person') def update_person(xdpost: dict): xdpost = xdpost['data'] # 最高学历 high_edu = {"schoolName": None, "major": None, "degree": None} # 机构列表 org_list = [] # 工作经历 job_list = [] for job in xdpost["hisJob"]: org_list.append(job['companyName']) job_list.append(job['companyName'] + job['industry'] + job['jobName'] + job['jobDesc']) post_json({ "@id": md5(job['companyName'].encode(encoding="UTF-8")).hexdigest(), "id": md5(job['companyName'].encode(encoding="UTF-8")).hexdigest(), "@type": "相关机构", "@contentType": "struct", "@markdel": "0", "name": [{"@value": job['companyName']}] }) post_json({ "@id": md5(job['id'].encode(encoding="UTF-8")).hexdigest(), "id": job['id'], "@type": "工作经历", "name": job['companyName'] + job['industry'] + job['jobName'] + job['jobDesc'], "@markdel": "0", "@contentType": "struct", "时间": [{"@value": job['startTime']+job['endTime']}], "公司": [{"@value": job['companyName']}], "行业": [{"@value": job['industry']}], "职位": [{"@value": job['jobName']}], "工作内容": [{"@value": job['jobDesc']}] }) # 教育经历 edu_list = [] for edu in xdpost["hisEdu"]: if (not high_edu.get("schoolName")) or (high_edu['degree'] < edu['degree']): high_edu['schoolName'] = edu['schoolName'] high_edu['major'] = edu['major'] high_edu['degree'] = edu['degree'] org_list.append(edu['schoolName']) edu_list.append(edu['startTime'] + edu['schoolName'] + edu['major'] + eduback_dict[edu['degree']]) post_json({ "@id": md5(edu['schoolName'].encode(encoding="UTF-8")).hexdigest(), "id": md5(edu['schoolName'].encode(encoding="UTF-8")).hexdigest(), "@type": "相关机构", "@contentType": "struct", "@markdel": "0", "name": [{"@value": edu['schoolName']}] }) post_json({ "@id": md5(edu['id'].encode(encoding="UTF-8")).hexdigest(), "id": edu['id'], "@type": "教育经历", "name": edu['startTime'] + edu['schoolName'] + edu['major'] + eduback_dict[edu['degree']], "@markdel": "0", "@contentType": "struct", "时间": [{"@value": edu['startTime']+edu['endTime']}], "学校": [{"@value": edu['schoolName']}], "专业": [{"@value": edu['major']}], "学历": [{"@value": edu['degree']}] }) # 项目经历 pro_list = [] for pro in xdpost["hisProject"]: pro_list.append(pro['companyName'] + pro['projectName'] + pro['projectOffice'] + pro['projectDuty']) post_json({ "@id": md5(pro['id'].encode(encoding="UTF-8")).hexdigest(), "id": pro['id'], "@type": "项目经历", "name": pro['companyName'] + pro['projectName'] + pro['projectOffice'] + pro['projectDuty'], "@markdel": "0", "@contentType": "struct", "时间": [{"@value": pro['startTime']+pro['endTime']}], "公司": [{"@value": pro['companyName']}], "项目": [{"@value": pro['projectName']}], "职位": [{"@value": pro['projectDuty']}], "成果": [{"@value": pro['projectOffice']}] }) # 海外培训经历 tra_list = [{"@value": None}] # 基本信息 json_obj = { "@id": md5(xdpost['tId'].encode(encoding='UTF-8')).hexdigest(), "id": xdpost['tId'], "@type": "人才特征", "name": xdpost['name'], "@markdel": '0', "@contentType": "struct", "姓名": [{"@value": xdpost['name']}], "年龄": [{"@value": xdpost['age']}], "性别": [{"@value": '男' if xdpost['gender'] == "0" else '女'}], "出生年月": [{"@value": xdpost['birthTime']}], "手机号码": [{"@value": xdpost['mobile']}], "电子邮箱": [{"@value": xdpost['email']}], "政治面貌": [{"@value": xdpost['politics']}], "参加工作时间": [{"@value": xdpost['workBeginTime']}], "当前职位": [{"@value": xdpost['currentJob']}], "意向职位": [{"@value": xdpost['intentJob']}], "当前年薪": [{"@value": xdpost['currentSalaryYearly']}], "当前所在城市": [{"@value": None}], "意向城市": [{"@value": None}], "意向年薪": [{"@value": xdpost['intentSalaryYearlyMax']}], "相关机构": org_list, "教育经历": edu_list, "工作经历": job_list, "项目经历": pro_list, "培训和海外经历": tra_list, "当前最高学历": [{"@value": eduback_dict[high_edu['degree']]}], "最高学历学校": [{"@value": high_edu['schoolName']}], "当前最高学历专业": [{"@value": high_edu['major']}], "语言能力": [{"@value": val['lanName']} for val in xdpost['language']], "技术职称": [{"@value": None}], "研究领域": [{"@value": val['researchName']} for val in xdpost['researchList']], "研究领域分类": [{"@value": val['researchName']} for val in xdpost['researchList']], "婚姻状况": [{"@value": None}], "特长爱好": [{"@value": None}], "人才标签": [{"@value": None}], "人才特点": [{"@value": None}], "当前行业": [{"@value": None}], "专业证书": [{"@value": None}], "入选人才": [{"@value": None}], "知识产权": [{"@value": None}], "获得荣誉及证明": [{"@value": None}], "备注信息": [{"@value": None}], "对报名岗位认识及工作设想": [{"@value": None}], "自我评价及主要工作业绩": [{"@value": None}], "当前公司": [{"@value": None}], "毕业院校分类": [{"@value": None}], "工作年限": [{"@value": None}], "专业方向大类": [{"@value": None}], "报名岗位": [{"@value": None}], } post_json(json_obj) return {"errno": 0, "msg": "Success"} if __name__ == '__main__': uvicorn.run(app=app, host='0.0.0.0', port=9000)