# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2022-07-07 13:12:17 # @Last Modified by: privacy # @Last Modified time: 2022-07-18 14:57:29 # 内部人才市场简历模板 import re import json import requests from requests.adapters import HTTPAdapter import pdfplumber from docx import Document # path = "d:\\desktop\\内部人才市场简历模板.docx" path = "d:\\desktop\\内部人才市场简历模板.pdf" class Inner(object): """docstring for Inner""" def __init__(self): super(Inner, self).__init__() self.keywords = ["姓名", "性别", "出生日期", "民族", "籍贯", "健康状况", "政治面貌", "参加工作时间", "外语水平", "专业技术资格(取得时间)", "计算机水平", "熟悉专业有何专长", "工作单位", "现任职务", "任职时间", "联系电话", "对报名岗位认识及工作", "对报名岗位认识及工作设想", "意向地区", "意向岗位", "意向单位", "意向专业", "职业证书", "资格等级", "取得日期", "学校/培训机构", "专业", "起始时间", "毕业时间", "姓名", "职业", "与本人关系"] self.json_obj = self.get_translate() def get_translate(self): # 转译数据库字段名 with open("./resources/translate.json", "r", encoding="utf-8") as ff: json_obj = json.load(ff) return json_obj def parse_line(self, line): result = [] key = None for cell in line: if cell and ''.join(cell.split()) in self.keywords: key = ''.join(cell.split()) elif cell and key: schema = {key:cell} result.append(schema) key = None return result # 解析word def parse_word_layout(self, path): result = [] doc = Document(path) lo = {} tables = doc.tables for _table in tables[:]: for i, row in enumerate(_table.rows[:]): row_content = [] for cell in row.cells[:]: c = cell.text row_content.append(c) lo[len(lo.keys())] = row_content kwln = -1 kwline = None for key in lo.keys(): # pdb.set_trace() for val in lo[key]:# 通过全关键词,判断此行是否为关键词行 if val and ''.join(val.split()) not in self.keywords:# 有非关键字元素,非关键词行,判断是否为关键词行元素 # pdb.set_trace() for c in lo[key]: # pdb.set_trace() if c and ''.join(c.split()) in self.keywords:# 非关键词行元素 result.extend(self.parse_line(lo[key])) break else:# 关键词行元素 schema = dict() for key, val in zip(kwline, lo[key]): if key: schema[key] = val if "学校/培训机构" in schema.keys(): schema["学习经历"] = "学习经历" elif "与本人关系" in schema.keys(): schema["家庭成员"] = "家庭成员" elif "意向地区" in schema.keys(): schema["职业发展管理"] = "职业发展管理" elif "职业证书" in schema.keys(): schema["职业资格证书"] = "职业资格证书" result.append(schema) break break else: # print("此行为关键词行") kwline = [''.join(cell.split()) for cell in lo[key]] kwln = len(lo[key]) job = {"工作经历":"工作经历"} flag = None for p in doc.paragraphs: text = p.text.replace(":", ":") if ":" in text: text = re.sub(r'(\w+)\W{0,2}:', r'\n\1:', text) for line in text.split("\n"): if line.strip(): i = line.split(":") if job.get(i[0].strip()): result.append(job) job = {"工作经历":"工作经历"} job[i[0].strip()] = i[1].strip() flag = i[0].strip() elif flag == "工作描述": job["工作描述"] += '\n' + text.strip() else: result.append(job) return result # 解析pdf def parse_pdf_layout(self, path): result = [] lo = {} with pdfplumber.open(path) as pdf: for page in pdf.pages: for table in page.extract_tables(): for line in table: # lo[len(lo.keys())] = [cell for cell in line if cell] lo[len(lo.keys())] = line kwln = -1 kwline = None for key in lo.keys(): # pdb.set_trace() for val in lo[key]:# 通过全关键词,判断此行是否为关键词行 if val and ''.join(val.split()) not in self.keywords:# 有非关键字元素,非关键词行,判断是否为关键词行元素 # pdb.set_trace() for c in lo[key]: # pdb.set_trace() if c and ''.join(c.split()) in self.keywords:# 非关键词行元素 result.extend(self.parse_line(lo[key])) break if c == "对报名岗位\n认 识及工作": print(''.join(c.split())) break else:# 关键词行元素 schema = dict() for key, val in zip(kwline, lo[key]): if key: schema[key] = val if "学校/培训机构" in schema.keys(): schema["学习经历"] = "学习经历" elif "与本人关系" in schema.keys(): schema["家庭成员"] = "家庭成员" elif "意向地区" in schema.keys(): schema["职业发展管理"] = "职业发展管理" elif "职业证书" in schema.keys(): schema["职业资格证书"] = "职业资格证书" result.append(schema) break break else: # print("此行为关键词行") kwline = [''.join(cell.split()) for cell in lo[key]] kwln = len(lo[key]) job = {"工作经历":"工作经历"} flag = None with pdfplumber.open(path) as pdf: for page in pdf.pages: for predict in page.extract_words(): # print(predict['text']) text = predict['text'].replace(":", ":") if ":" in text: text = re.sub(r'(\w+)\W{0,2}:', r'\n\1:', text) for line in text.split("\n"): if line.strip(): i = line.split(":") if job.get(i[0].strip()): result.append(job) job = {"工作经历":"工作经历"} job[i[0].strip()] = i[1].strip() flag = i[0].strip() elif flag == "工作描述": job["工作描述"] += '\n' + text.strip() else: result.append(job) return result # 格式化数据 def formatter(self, datalist): result = dict() for d in datalist: if len(d) == 1: for key in d.keys(): result[key] = d[key] else: for k in list(d.keys()): if k == "".join(d[k].split()): d.pop(k) if result.get(k): result[k].append(d) else: result[k] = [d] if result.get("外语水平"): data = re.findall(r'(\w+[语话])', result["外语水平"]) if dates: result["外语水平"] = data if result.get("专业技术资格(取得时间)"): dates = re.findall(r'\d+', result["专业技术资格(取得时间)"]) for i in dates: result["专业技术资格(取得时间)"] = result["专业技术资格(取得时间)"].replace(i, "") names = re.findall(r'\w+', result["专业技术资格(取得时间)"]) if len(dates) == 1: result["专业技术资格(取得时间)"] = [{"时间": "{:4d}-01-01".format(int(dates[0])),"专业技术资格":names}] elif len(dates) == 2: result["专业技术资格(取得时间)"] = [{"时间": "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])),"专业技术资格":names}] elif len(dates) == 3: result["专业技术资格(取得时间)"] = [{"时间": "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])),"专业技术资格":names}] ### 时间格式化 if result.get("出生年月"): dates = re.findall(r'\d+' , result["出生年月"]) if len(dates) == 1: result["出生年月"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["出生年月"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["出生年月"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) if result.get("任职时间"): dates = re.findall(r'\d+' , result["任职时间"]) if len(dates) == 1: result["任职时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["任职时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["任职时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) if result.get("参加工作时间"): dates = re.findall(r'\d+' , result["参加工作时间"]) if len(dates) == 1: result["参加工作时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["参加工作时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["参加工作时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) if result.get("最高学历毕业院校及毕业时间"): dates = re.findall(r'\d+' , result["最高学历毕业院校及毕业时间"]) ws = re.findall(r'\w+' , result["最高学历毕业院校及毕业时间"]) if len(ws) > 0: result["最高学历毕业院校"] = ws[0] if len(dates) == 1: result["最高学历毕业时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["最高学历毕业时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["最高学历毕业时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) result.pop("最高学历毕业院校及毕业时间") if result.get("初始学历毕业院校及毕业时间"): dates = re.findall(r'\d+' , result["初始学历毕业院校及毕业时间"]) ws = re.findall(r'\w+' , result["初始学历毕业院校及毕业时间"]) if len(ws) > 0: result["初始学历毕业院校"] = ws[0] if len(dates) == 1: result["初始学历毕业时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["初始学历毕业时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["初始学历毕业时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) result.pop("初始学历毕业院校及毕业时间") if result.get("学习经历"): for idx, edu in enumerate(result["学习经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["学习经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("培训经历"): for idx, edu in enumerate(result["培训经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["培训经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("工作经历"): for idx, edu in enumerate(result["工作经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["工作经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("项目经历"): for idx, edu in enumerate(result["项目经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["项目经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("获得职业资格证书情况"): for idx, edu in enumerate(result["获得职业资格证书情况"]): if edu.get("获得日期"): dates = re.findall(r'\d+' , edu["获得日期"]) if len(dates) == 2: result["获得职业资格证书情况"][idx]["获得日期"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) if result.get("奖惩情况"): for idx, edu in enumerate(result["奖惩情况"]): if edu.get("时间"): dates = re.findall(r'\d+' , edu["时间"]) if len(dates) == 2: result["奖惩情况"][idx]["时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) if result.get("主要家庭成员及社会关系"): for idx, fam in enumerate(result["主要家庭成员及社会关系"]): if fam.get("出生年月"): dates = re.findall(r'\d+' , fam["出生年月"]) if len(dates) == 2: result["主要家庭成员及社会关系"][idx]["出生年月"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) normal = self.json_obj["base"] itenormal = self.json_obj["base"] edunormal = self.json_obj["tal_training_experience"] jobnormal = self.json_obj["tal_his_job"] cetnormal = self.json_obj["tal_vocational_qualification_certificate"] family = self.json_obj["tal_family_social_relation"] for key in normal.keys(): if result.get(key): result[normal[key]] = result[key] result.pop(key) for idx in range(len(result['职业发展管理'])): for key in itenormal.keys(): if result['职业发展管理'][idx].get(key): result['职业发展管理'][idx][itenormal[key]] = result['职业发展管理'][idx][key] result['职业发展管理'][idx].pop(key) for idx in range(len(result['学习经历'])): for key in edunormal.keys(): if result['学习经历'][idx].get(key): result['学习经历'][idx][edunormal[key]] = result['学习经历'][idx][key] result['学习经历'][idx].pop(key) for idx in range(len(result['工作经历'])): for key in jobnormal.keys(): if result['工作经历'][idx].get(key): result['工作经历'][idx][jobnormal[key]] = result['工作经历'][idx][key] result['工作经历'][idx].pop(key) for idx in range(len(result['职业资格证书'])): for key in cetnormal.keys(): if result['职业资格证书'][idx].get(key): result['职业资格证书'][idx][cetnormal[key]] = result['职业资格证书'][idx][key] result['职业资格证书'][idx].pop(key) for idx in range(len(result['家庭成员'])): for key in family.keys(): if result['家庭成员'][idx].get(key): result['家庭成员'][idx][family[key]] = result['家庭成员'][idx][key] result['家庭成员'][idx].pop(key) tit = { "基本信息":"base", "职业发展管理":"intent_job", "学习经历":"tal_training_experience", "工作经历":"tal_his_job", "项目经历":"tal_his_project", "培训经历":"tal_training_experience", "获奖情况":"tal_reward_punishment", "语言能力":"tal_language", "职业资格证书":"tal_vocational_qualification_certificate", "专业技能":"tal_professional_tech_certificate", "家庭成员":"tal_family_social_relation" } for key in tit.keys(): if result.get(key): result[tit[key]] = result[key] result.pop(key) return result # 推送后端 def push_back(self, result): url = "http://192.168.1.110:9999/talent/getResumeData" session = requests.Session() session.mount('http://', HTTPAdapter(max_retries = 3)) try: headers = { 'contentType':'Application/json' } response = session.post(url=url, headers=headers, json={"ResumeData": result}, timeout=10) print(response.text) except Exception as e: print(e) def predict(self, path): if path.endswith(".docx"): result = self.formatter(self.parse_word_layout(path)) self.push_back(result) print(self.formatter(self.parse_word_layout(path))) elif path.endswith(".pdf"): result = self.formatter(self.parse_pdf_layout(path)) self.push_back(result) print(self.formatter(self.parse_pdf_layout(path))) if __name__ == "__main__": i = Inner() i.predict(path)