# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2022-07-07 13:12:17 # @Last Modified by: privacy # @Last Modified time: 2022-07-18 13:50:39 # 内部人才市场简历模板 import re import json import requests from requests.adapters import HTTPAdapter import pdfplumber from docx import Document # path = "d:\\desktop\\内部人才市场简历模板.docx" path = "d:\\desktop\\内部人才市场简历模板.pdf" class Inner(object): """docstring for Inner""" def __init__(self): super(Inner, self).__init__() self.keywords = ["姓名", "性别", "出生日期", "民族", "籍贯", "健康状况", "政治面貌", "参加工作时间", "外语水平", "专业技术资格(取得时间)", "计算机水平", "熟悉专业有何专长", "工作单位", "现任职务", "任职时间", "联系电话", "对报名岗位认识及工作", "对报名岗位认识及工作设想", "意向地区", "意向岗位", "意向单位", "意向专业", "职业证书", "资格等级", "取得日期", "学校/培训机构", "专业", "起始时间", "毕业时间", "姓名", "职业", "与本人关系"] self.json_obj = self.get_translate() def get_translate(self): # 转译数据库字段名 with open("./resources/translate.json", "r", encoding="utf-8") as ff: json_obj = json.load(ff) return json_obj def parse_line(self, line): result = [] key = None for cell in line: if cell and ''.join(cell.split()) in self.keywords: key = ''.join(cell.split()) elif cell and key: schema = {key:cell} result.append(schema) key = None return result # 解析word def parse_word_layout(self, path): result = [] doc = Document(path) lo = {} tables = doc.tables for _table in tables[:]: for i, row in enumerate(_table.rows[:]): row_content = [] for cell in row.cells[:]: c = cell.text row_content.append(c) lo[len(lo.keys())] = row_content kwln = -1 kwline = None for key in lo.keys(): # pdb.set_trace() for val in lo[key]:# 通过全关键词,判断此行是否为关键词行 if val and ''.join(val.split()) not in self.keywords:# 有非关键字元素,非关键词行,判断是否为关键词行元素 # pdb.set_trace() for c in lo[key]: # pdb.set_trace() if c and ''.join(c.split()) in self.keywords:# 非关键词行元素 result.extend(self.parse_line(lo[key])) break else:# 关键词行元素 schema = dict() for key, val in zip(kwline, lo[key]): if key: schema[key] = val if "学校/培训机构" in schema.keys(): schema["学习经历"] = "学习经历" elif "与本人关系" in schema.keys(): schema["家庭成员"] = "家庭成员" elif "意向地区" in schema.keys(): schema["职业发展管理"] = "职业发展管理" elif "职业证书" in schema.keys(): schema["职业资格证书"] = "职业资格证书" result.append(schema) break break else: # print("此行为关键词行") kwline = [''.join(cell.split()) for cell in lo[key]] kwln = len(lo[key]) job = {"工作经历":"工作经历"} flag = None for p in doc.paragraphs: text = p.text.replace(":", ":") if ":" in text: text = re.sub(r'(\w+)\W{0,2}:', r'\n\1:', text) for line in text.split("\n"): if line.strip(): i = line.split(":") if job.get(i[0].strip()): result.append(job) job = {"工作经历":"工作经历"} job[i[0].strip()] = i[1].strip() flag = i[0].strip() elif flag == "工作描述": job["工作描述"] += '\n' + text.strip() else: result.append(job) return result # 解析pdf def parse_pdf_layout(self, path): result = [] lo = {} with pdfplumber.open(path) as pdf: for page in pdf.pages: for table in page.extract_tables(): for line in table: # lo[len(lo.keys())] = [cell for cell in line if cell] lo[len(lo.keys())] = line kwln = -1 kwline = None for key in lo.keys(): # pdb.set_trace() for val in lo[key]:# 通过全关键词,判断此行是否为关键词行 if val and ''.join(val.split()) not in self.keywords:# 有非关键字元素,非关键词行,判断是否为关键词行元素 # pdb.set_trace() for c in lo[key]: # pdb.set_trace() if c and ''.join(c.split()) in self.keywords:# 非关键词行元素 result.extend(self.parse_line(lo[key])) break if c == "对报名岗位\n认 识及工作": print(''.join(c.split())) break else:# 关键词行元素 schema = dict() for key, val in zip(kwline, lo[key]): if key: schema[key] = val if "学校/培训机构" in schema.keys(): schema["学习经历"] = "学习经历" elif "与本人关系" in schema.keys(): schema["家庭成员"] = "家庭成员" elif "意向地区" in schema.keys(): schema["职业发展管理"] = "职业发展管理" elif "职业证书" in schema.keys(): schema["职业资格证书"] = "职业资格证书" result.append(schema) break break else: # print("此行为关键词行") kwline = [''.join(cell.split()) for cell in lo[key]] kwln = len(lo[key]) job = {"工作经历":"工作经历"} flag = None with pdfplumber.open(path) as pdf: for page in pdf.pages: for predict in page.extract_words(): # print(predict['text']) text = predict['text'].replace(":", ":") if ":" in text: text = re.sub(r'(\w+)\W{0,2}:', r'\n\1:', text) for line in text.split("\n"): if line.strip(): i = line.split(":") if job.get(i[0].strip()): result.append(job) job = {"工作经历":"工作经历"} job[i[0].strip()] = i[1].strip() flag = i[0].strip() elif flag == "工作描述": job["工作描述"] += '\n' + text.strip() else: result.append(job) return result # 格式化数据 def formatter(self, datalist): result = dict() for d in datalist: if len(d) == 1: for key in d.keys(): result[key] = d[key] else: for k in list(d.keys()): if k == "".join(d[k].split()): d.pop(k) if result.get(k): result[k].append(d) else: result[k] = [d] normal = self.json_obj["base"] itenormal = self.json_obj["base"] edunormal = self.json_obj["tal_training_experience"] jobnormal = self.json_obj["tal_his_job"] cetnormal = self.json_obj["tal_vocational_qualification_certificate"] family = self.json_obj["tal_family_social_relation"] for key in normal.keys(): if result.get(key): result[normal[key]] = result[key] result.pop(key) for idx in range(len(result['职业发展管理'])): for key in itenormal.keys(): if result['职业发展管理'][idx].get(key): result['职业发展管理'][idx][itenormal[key]] = result['职业发展管理'][idx][key] result['职业发展管理'][idx].pop(key) for idx in range(len(result['学习经历'])): for key in edunormal.keys(): if result['学习经历'][idx].get(key): result['学习经历'][idx][edunormal[key]] = result['学习经历'][idx][key] result['学习经历'][idx].pop(key) for idx in range(len(result['工作经历'])): for key in jobnormal.keys(): if result['工作经历'][idx].get(key): result['工作经历'][idx][jobnormal[key]] = result['工作经历'][idx][key] result['工作经历'][idx].pop(key) for idx in range(len(result['职业资格证书'])): for key in cetnormal.keys(): if result['职业资格证书'][idx].get(key): result['职业资格证书'][idx][cetnormal[key]] = result['职业资格证书'][idx][key] result['职业资格证书'][idx].pop(key) for idx in range(len(result['家庭成员'])): for key in family.keys(): if result['家庭成员'][idx].get(key): result['家庭成员'][idx][family[key]] = result['家庭成员'][idx][key] result['家庭成员'][idx].pop(key) tit = { "基本信息":"base", "职业发展管理":"intent_job", "学习经历":"tal_training_experience", "工作经历":"tal_his_job", "项目经历":"tal_his_project", "培训经历":"tal_training_experience", "获奖情况":"tal_reward_punishment", "语言能力":"tal_language", "职业资格证书":"tal_vocational_qualification_certificate", "专业技能":"tal_professional_tech_certificate", "家庭成员":"tal_family_social_relation" } for key in tit.keys(): if result.get(key): result[tit[key]] = result[key] result.pop(key) return result # 推送后端 def push_back(self, result): url = "http://192.168.1.110:9999/talent/getResumeData" session = requests.Session() session.mount('http://', HTTPAdapter(max_retries = 3)) try: headers = { 'contentType':'Application/json' } response = session.post(url=url, headers=headers, json={"ResumeData": result}, timeout=10) print(response.text) except Exception as e: print(e) def predict(self, path): if path.endswith(".docx"): result = self.formatter(self.parse_word_layout(path)) self.push_back(result) print(self.formatter(self.parse_word_layout(path))) elif path.endswith(".pdf"): result = self.formatter(self.parse_pdf_layout(path)) self.push_back(result) print(self.formatter(self.parse_pdf_layout(path))) if __name__ == "__main__": i = Inner() i.predict(path)