# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2022-07-07 12:59:42 # @Last Modified by: privacy # @Last Modified time: 2022-07-18 14:10:56 # import pdb import re import json import requests from requests.adapters import HTTPAdapter import pdfplumber from docx import Document path = "d:\\desktop\\社招简历模板.docx" class Social(object): """docstring for Social""" def __init__(self): super(Social, self).__init__() self.keywords = [ '姓名', '性别', '出生日期', '一寸照片', '民族', '出生地', '政治面貌(加入时间)', '参加工作时间', '健康状况', '外语水平', '初始学历、专业', '最高学历、专业', '初始学历毕业院校及毕业时间', '最高学历毕业院校及毕业时间', '专业技术资格(取得时间)', '职业技能等级(取得时间)', '熟悉专业有何专长', '工作单位', '现任职务', '任职时间', '提职时间', '意向岗位', '联系电话', '学习经历', '起止时间', '学校', '专业', '学历', '学位', '研究方向', '是否全日制', '培训经历', '培训类型', '机构', '内容', '成绩', '证书名称', '工作经历', '职务', '部门', '证明人', '备注', '对报名岗位认识及工作设想', '自我评价及主要工作业绩', '获得职业资格证书情况', '获得日期', '名称', '证书编码/文号', '授予单位', '奖惩情况', '项目', '时间', '项目单位', '证明材料', '主要家庭成员及社会关系', '称谓', '出生年月', '政治面貌', '工作单位及职务', '其他情况说明', '诚信承诺', '社会招聘工作办公室资格审查意见' ] self.json_obj = self.get_translate() def get_translate(self): # 转译数据库字段名 with open("./resources/translate.json", "r", encoding="utf-8") as ff: json_obj = json.load(ff) return json_obj def parse_line(self, line): result = [] key = None for cell in line: if cell and ''.join(cell.split()) in self.keywords: key = ''.join(cell.split()) elif cell and key: schema = {key:cell} result.append(schema) key = None return result # 解析word def parse_word_layout(self, path): result = [] doc = Document(path) lo = {} for _table in doc.tables[:]: for i, row in enumerate(_table.rows[:]): row_content = [] for cell in row.cells[:]: c = cell.text if c not in row_content: row_content.append(c) lo[len(lo.keys())] = row_content kwln = -1# 关键词行长度 kwline = None# 关键词行 for key in lo.keys(): for val in lo[key]:# 通过全关键词,判断此行是否为关键词行 if val and ''.join(val.split()) not in self.keywords:# 有非关键字元素,非关键词行,判断是否为关键词行元素 perc = 0# 行内关键词数量 for c in lo[key]: if c and (''.join(c.split()) in self.keywords):# 找到此行有关键词 perc += 1 if c and (''.join(c.split()) in self.keywords) and (perc > len(lo[key])/3):# 关键词数量超过1/3,判断此行非关键词行元素 perc = 0# 清空行内关键词数 result.extend(self.parse_line(lo[key]))# 添加并解析普通行级元素 break else:# 关键词行元素 if len(kwline) != len(lo[key]): break schema = dict() for key, val in zip(kwline, lo[key]):# 合并关键词行和行元素 if key: schema[key] = val result.append(schema) break break else: # print("{}:此行为关键词行!".format(lo[key])) if len(lo[key])>2: try: kwline = [''.join(cell.split()) for cell in lo[key]] except Exception as e: kwline = lo[key] kwln = len(lo[key]) return result # 解析pdf def parse_pdf_layout(self, path): result = [] lo = {} with pdfplumber.open(path) as pdf: for page in pdf.pages: for table in page.extract_tables(): for line in table: # lo[len(lo.keys())] = [cell for cell in line if cell] lo[len(lo.keys())] = line kwln = -1 kwline = None for key in lo.keys(): # pdb.set_trace() for val in lo[key]:# 通过全关键词,判断此行是否为关键词行 if val and ''.join(val.split()) not in self.keywords:# 有非关键字元素,非关键词行,判断是否为关键词行元素 # pdb.set_trace() for c in lo[key] or len(lo[key])!=kwln: # pdb.set_trace() if c and ''.join(c.split()) in self.keywords:# 非关键词行元素 result.extend(self.parse_line(lo[key])) break else:# 关键词行元素 schema = dict() for key, val in zip(kwline, lo[key]): if key: schema[key] = val if val else key result.append(schema) break break else: kwline = [] for cell in lo[key]: if cell: kwline.append(''.join(cell.split())) else: kwline.append(cell) kwln = len(lo[key]) return result # 格式化数据 def formatter(self, datalist): result = dict() for d in datalist: if len(d) == 1: for key in d.keys(): result[key] = d[key] else: for k in list(d.keys()): if k == "".join(d[k].split()): d.pop(k) if result.get(k): result[k].append(d) else: result[k] = [d] ### 时间格式化 if result.get("出生年月"): dates = re.findall(r'\d+' , result["出生年月"]) if len(dates) == 1: result["出生年月"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["出生年月"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["出生年月"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) if result.get("任职时间"): dates = re.findall(r'\d+' , result["任职时间"]) if len(dates) == 1: result["任职时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["任职时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["任职时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) if result.get("参加工作时间"): dates = re.findall(r'\d+' , result["参加工作时间"]) if len(dates) == 1: result["参加工作时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["参加工作时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["参加工作时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) if result.get("最高学历毕业院校及毕业时间"): dates = re.findall(r'\d+' , result["最高学历毕业院校及毕业时间"]) ws = re.findall(r'\w+' , result["最高学历毕业院校及毕业时间"]) if len(ws) > 0: result["最高学历毕业院校"] = ws[0] if len(dates) == 1: result["最高学历毕业时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["最高学历毕业时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["最高学历毕业时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) result.pop("最高学历毕业院校及毕业时间") if result.get("初始学历毕业院校及毕业时间"): dates = re.findall(r'\d+' , result["初始学历毕业院校及毕业时间"]) ws = re.findall(r'\w+' , result["初始学历毕业院校及毕业时间"]) if len(ws) > 0: result["初始学历毕业院校"] = ws[0] if len(dates) == 1: result["初始学历毕业时间"] = "{:4d}-01-01".format(int(dates[0])) elif len(dates) == 2: result["初始学历毕业时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) elif len(dates) == 3: result["初始学历毕业时间"] = "{:4d}-{:02d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2])) result.pop("初始学历毕业院校及毕业时间") if result.get("学习经历"): for idx, edu in enumerate(result["学习经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["学习经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("培训经历"): for idx, edu in enumerate(result["培训经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["培训经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("工作经历"): for idx, edu in enumerate(result["工作经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["工作经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("项目经历"): for idx, edu in enumerate(result["项目经历"]): if edu.get("起止时间"): dates = re.findall(r'\d+' , edu["起止时间"]) if len(dates) == 4: result["项目经历"][idx]["起止时间"] = "{:4d}-{:02d}~{:4d}-{:02d}".format(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3])) if result.get("获得职业资格证书情况"): for idx, edu in enumerate(result["获得职业资格证书情况"]): if edu.get("获得日期"): dates = re.findall(r'\d+' , edu["获得日期"]) if len(dates) == 2: result["获得职业资格证书情况"][idx]["获得日期"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) if result.get("奖惩情况"): for idx, edu in enumerate(result["奖惩情况"]): if edu.get("时间"): dates = re.findall(r'\d+' , edu["时间"]) if len(dates) == 2: result["奖惩情况"][idx]["时间"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) if result.get("主要家庭成员及社会关系"): for idx, fam in enumerate(result["主要家庭成员及社会关系"]): if fam.get("出生年月"): dates = re.findall(r'\d+' , fam["出生年月"]) if len(dates) == 2: result["主要家庭成员及社会关系"][idx]["出生年月"] = "{:4d}-{:02d}-01".format(int(dates[0]), int(dates[1])) normal = self.json_obj["base"] itenormal = self.json_obj["base"] edunormal = self.json_obj["tal_his_edu"] jobnormal = self.json_obj["tal_his_job"] tranornal = self.json_obj["tal_training_experience"] cetnormal = self.json_obj["tal_vocational_qualification_certificate"] rewnormal = self.json_obj["tal_reward_punishment"] family = self.json_obj["tal_family_social_relation"] for key in normal.keys(): if result.get(key): result[normal[key]] = result[key] result.pop(key) for idx in range(len(result['学习经历'])): for key in edunormal.keys(): if result['学习经历'][idx].get(key): result['学习经历'][idx][edunormal[key]] = result['学习经历'][idx][key] result['学习经历'][idx].pop(key) for idx in range(len(result['工作经历'])): for key in jobnormal.keys(): if result['工作经历'][idx].get(key): result['工作经历'][idx][jobnormal[key]] = result['工作经历'][idx][key] result['工作经历'][idx].pop(key) for idx in range(len(result['培训经历'])): for key in tranornal.keys(): if result['培训经历'][idx].get(key): result['培训经历'][idx][tranornal[key]] = result['培训经历'][idx][key] result['培训经历'][idx].pop(key) for idx in range(len(result['获得职业资格证书情况'])): for key in cetnormal.keys(): if result['获得职业资格证书情况'][idx].get(key): result['获得职业资格证书情况'][idx][cetnormal[key]] = result['获得职业资格证书情况'][idx][key] result['获得职业资格证书情况'][idx].pop(key) for idx in range(len(result['奖惩情况'])): for key in rewnormal.keys(): if result['奖惩情况'][idx].get(key): result['奖惩情况'][idx][rewnormal[key]] = result['奖惩情况'][idx][key] result['奖惩情况'][idx].pop(key) for idx in range(len(result['主要家庭成员及社会关系'])): for key in family.keys(): if result['主要家庭成员及社会关系'][idx].get(key): result['主要家庭成员及社会关系'][idx][family[key]] = result['主要家庭成员及社会关系'][idx][key] result['主要家庭成员及社会关系'][idx].pop(key) tit = { "基本信息":"base", "职业发展管理":"intent_job", "学习经历":"tal_his_edu", "工作经历":"tal_his_job", "项目经历":"tal_his_project", "培训经历":"tal_training_experience", "奖惩情况":"tal_reward_punishment", "语言能力":"tal_language", "获得职业资格证书情况":"tal_vocational_qualification_certificate", "专业技能":"tal_professional_tech_certificate", "主要家庭成员及社会关系":"tal_family_social_relation", "其他情况说明":"intro" } for key in tit.keys(): if result.get(key): result[tit[key]] = result[key] result.pop(key) return result # 推送后端 def push_back(self, result): url = "http://192.168.1.110:9999/talent/getResumeData" session = requests.Session() session.mount('http://', HTTPAdapter(max_retries = 3)) try: headers = { 'contentType':'Application/json' } response = session.post(url=url, headers=headers, json={"ResumeData": result}, timeout=10) print(response.text) except Exception as e: print(e) def predict(self, path): if path.endswith(".docx"): result = self.formatter(self.parse_word_layout(path)) self.push_back(result) print(self.formatter(self.parse_word_layout(path))) elif path.endswith(".pdf"): result = self.formatter(self.parse_pdf_layout(path)) self.push_back(result) print(self.formatter(self.parse_pdf_layout(path))) if __name__ == '__main__': s = Social() s.predict(path)