#!/usr/bin/env python # coding: utf-8 import os import sys import re import subprocess from pprint import pprint import logging logging.basicConfig(format='%(asctime)s: %(name)s: %(levelname)s: %(filename)s: %(funcName)s: %(lineno)d: %(message)s', level=logging.INFO) import pandas as pd from docx import Document from docx.shared import Inches from pdfminer.high_level import extract_pages from pdfminer.layout import LTTextContainer, LTChar, LTLine, LAParams, LTTextBox, LTFigure, LTImage, LTText, LTAnno, LTTextLine, LTTextLineHorizontal from pdfminer.pdfdocument import PDFDocument from pdfminer.pdfpage import PDFPage from pdfminer.pdfparser import PDFParser from pdfminer.converter import PDFPageAggregator from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter import pdfplumber from paddlenlp import Taskflow from rich.console import Console console = Console() # import uvicorn # from fastapi import FastAPI # app = FastAPI() ner = Taskflow("ner", mode='fast') ner_tag = Taskflow("ner") global block, block_rev block = { "个人信息":1, "基本信息":1, "个人简历":1, "基基本本信信息息":1, "基本信息基本信息":1, "基本信息文本内容":1, "求职意向":2, "求职意向求职意向":2, "期望工作文本内容":2, "教育背景":3, "教育经历":3, "教教育育经经历历":3, "教育经历教育经历":3, "教育经历文本内容":3, "学历学位":3, "工作经验":4, "主要工作内容与职责":4, "工作方面":4, "实习经历":4, "工作经历":4, "工工作作经经历历":4, "工作经历工作经历":4, "工作经历文本内容":4, "项目经历":5, "项目经验":5, "科研项目经历":5, "项项目目经经历历":5, "项目经历项目经历":5, "研究生参与代表性项目":5, "项目经历文本内容":5, "专业技能":6, "个人技能":6, "专业/外语技能":6, "技能素质":6, "个人技能文本内容":6, "自我评价":7, "个人简介":7, "个人评价":7, "自我描述":7, "自自我我评评价价":7, "自我评价自我评价":7, "自我评价文本内容":7, "兴趣爱好":8, "兴趣爱好文本内容":8, "语言及方言":9, "语言能力":9, "英语能力":9, "语语言言能能力力":9, "语言能力语言能力":9, "语言技能文本内容":9, "证书":10, "所获证书文本内容":10, "获得奖励":11, "获奖经历":11, "获奖情况":11, "获获奖奖经经历历":11, "获奖经历获奖经历":11, "获奖情况及社会活动":11, "校内奖励":11, "校内活动&奖励":11, "所获奖励文本内容":11,"奖惩情况":11, "培训":12, "培训经历":12, "培培训训经经历历":12, "培训经历文本内容":12, "家庭成员":13, "家家庭庭成成员员":13, "家庭成员家庭成员":13, "主要家庭成员及社会关系":13, "社会活动":"other", "实践经验":"other", "社会活动及社会实践":"other", "近三年年度考核结果":"other", "其他意愿":"other", } block_rev = {1:"基本信息", 2:"求职意向", 3:"教育经历", 4:"工作经历", 5:"项目经历", 6:"专业技能", 7:"自我评价", 8:"兴趣爱好", 9:"语言能力", 10:"证书", 11:"获奖情况", 12:"培训经历", 13:"家庭成员", "other":"其他"} # 基本信息(已完成) def get_base_info(lines): logging.info(lines) schema = { '姓名': None, } for line in [' '.join(' '.join(lines).split('\n'))]: line = line.replace(r'[ ]{5,}','\n') w = re.sub(r'[\W]+(\w[::])[\W]{0,}\w', r'\1', line) for i in w.split(): if ':' in i: try: key, val = i.split(':') schema[key] = val except Exception as e: logging.error(e) if not schema.get('姓名'): schema['姓名'] = re.search(r'[姓名::]{3,}(\w{2,4})', w).group(1) if re.search(r'[姓名::]{3,}(\w{2,4})', w) else None if not schema.get('姓名'): for word, tag in ner_tag(w): if tag == "人物类_实体": schema['姓名'] = word if not schema.get('性别'): schema['性别'] = re.search(r'[男女]', w).group() if re.search(r'[男女]', w) else None if not schema.get('婚姻状况'): schema['婚姻状况'] = re.search(r'[已未]婚', w).group() if re.search(r'[已未]婚', w) else None if not schema.get('电子邮箱'): schema['电子邮箱'] = re.search(r'([.\w]+@[.\w]+)', w).group() if re.search(r'([.\w]+@[.\w]+)', w) else None if not schema.get('政治面貌'): schema['政治面貌'] = re.search(r'[预备中共党团员群众无派人士]{2,6}', w).group() if re.search(r'[预备中共党团员群众无派人士]{2,6}', w) else None if not schema.get('手机号码'): schema['手机号码'] = re.search(r'\W(1[\d]{10})\W', w).group(1) if re.search(r'\W(1[\d]{10})\W', w) else None # if not schema.get('籍贯'): # schema['籍贯'] = re.search(r'[籍贯::]{3,}(\w{2,5})', w).group(1) if re.search(r'[籍贯::]{3,}(\w{2,})', w) else None # if not schema.get('出生年月'): # schema['出生年月'] = re.search(r'\d{4}[./年\-]\d{1,2}[月]', w).group() if re.search(r'\d{4}[./年\-]\d{1,2}[月]', w) else None # if not schema.get('当前职位'): # schema['当前职位'] = re.search(r'[当前职位: ]{3,}(\w)+', w).group() if re.search(r'[当前职位: ]{3,}(\w)+', w) else None # if not schema.get('参加工作时间'): # schema['参加工作时间'] = re.search(r'[参加工作事件:]{3,}(\d{4}[./年\-]\d{1,2}[月])', w).group(1) if re.search(r'[参加工作事件:]{3,}(\d{4}[./年\-]\d{1,2}[月])', w) else None return {key:value for key, value in schema.items() if value} # 求职意向(已完成) def get_job_intention(lines): logging.info(lines) schema = {} for line in lines: regex = re.compile(r'\W{0,3}[::]\s+') line = regex.sub(':', line) for i in line.split(): if ":" in i: try: key, val = i.split(":") schema[key] = val except Exception as e: logging.error(e) return schema # 教育经历 (已停用) # ner + 分词 (判断学校,时间,学历) 专业需要单独处理。 def get_edu_list_old(lines): logging.info(lines) job_list = [] job_dict = {'edu_time_beg':'', 'edu_time_end':'', 'edu_name':'','edu_leval':'','edu_domain':'', 'edu_statue':0} re_txt = '\d{4,4}.\d{1,2}.?\040{0,2}[\-–至-\—~]\040{0,2}\d{4,4}.\d{1,2}[月]?|\d+\.\d+\-至今|\d+年\d+月\-\d+年\d+月|\d+年\d+月\-\~|\d+年\d+月[\-\~]至今|\d+-\d+\040{0,2}[\~至]\040{0,2}\d+-\d+|\d+-\d+\~|\d+-\d+\~至今|\d+-\d+\040{0,2}至今|^\d{4,4}.\d{1,2}|19\d{2,2}.|20\d{2,2}.' re_txt_1 = '\d{4,4}.\d{1,2}.?\040{0,2}[\-–至-\—~]\040{0,2}\d{4,4}.\d{1,2}[月]?|\d+\.\d+\-至今|\d+年\d+月\-\d+年\d+月|\d+年\d+月\-\~|\d+年\d+月[\-\~]至今|\d+-\d+\040{0,2}[\~至]\040{0,2}\d+-\d+|\d+-\d+\~|\d+-\d+\~至今|\d+-\d+\040{0,2}至今' nums = [] for i in range(len(lines)): if re.findall(re_txt, lines[i]): nums.append(i) nums.append(len(lines)) edu_level = {'本科':18, "大专":17, "博士研究生":20, "学士":18, "博士":20, "硕士":19, "研究生":19, "博后":21, '博士后':21} year_dict = {18:4, 17:3,20:3,19:3,21:2} edu_dict = {18:'本科', 17:'大专',20:'博士研究生',19:'硕士',21:'博士后'} edu_list = [] for i in range(1, len(nums[:])): job_dict = {'edu_time_beg':'', 'edu_time_end':'', 'edu_name':'','edu_leval':'','edu_domain':''} data_list = lines[nums[i-1]:nums[i]] if len(data_list) > 1 and data_list[1] and data_list[1][-1] == '|' and data_list[0][-1] != '|': data_list[0] = data_list[0] + data_list[1] data_list[1] = '' if len(data_list) > 2 and data_list[2] and data_list[2][-1] == '|' and data_list[0][-1] != '|' and '|' in str(data_list[0]) and data_list[1] and data_list[1][-1] != '|': data_list[0] = data_list[0] + data_list[1] + data_list[2] data_list[1] = '' data_list[2] = '' if '' in data_list: data_list.remove('') data_line = ' '.join(data_list) data_line = re.sub('[\|]', ' ', data_line) data_line = re.sub('-{3,}', '', data_line) ner_data = ner(''.join(data_list[:2])) org = '' time_list = [] for jj in range(1, len(ner_data)): if ner_data[jj][1] == ner_data[jj-1][1]: ner_data[jj] = list(ner_data[jj]) ner_data[jj][0] = ner_data[jj-1][0] + ner_data[jj][0] ner_data[jj-1] = ('','') for _ in ner_data: if _[1] == 'ORG' and not org: org = _[0].strip() elif _[1] == 'TIME' and len(_[1]) >= 4: time_list.append(_[0]) #TIME # print(data_line) _list_data = re.split('\040+',data_line) top_level = 18 remove_list = [] logging.info(_list_data) logging.info(time_list) for ii in range(len(_list_data)): for t in time_list: if t in _list_data[ii]: _list_data[ii] = '' break for i in range(len(_list_data)): #if org in _list_data[i]: # _list_data[i] = '' if re.findall('^\d{4,4}', _list_data[i]): _list_data[i] = '' _data = re.findall('本科|学士|硕士|博士研究生|博士后|博后|博士|研究生|大专', _list_data[i]) if not _data: continue top_level = edu_level[_data[0]] _list_data[i] = '' break #remove_list.append(i) logging.info(_list_data) job_time = re.findall(re_txt_1, data_list[0]) if job_time: job_dict['edu_time'] = job_time[0] else: job_dict['edu_time'] = '' _nums = re.findall('\d+', job_dict['edu_time']) if len(_nums) >= 4: job_dict['edu_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['edu_time_end'] = '%s-%02d'%(_nums[2], int(_nums[3])) job_dict['edu_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) elif len(_nums) == 2: job_dict['edu_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') job_dict['edu_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['edu_time_end'] = '%s'%('至今') elif len(time_list) == 2: nums_1 = re.findall('\d+', time_list[0]) nums_2 = re.findall('\d+', time_list[1]) nums_1.append('09') nums_2.append('07') job_dict['edu_time_beg'] = '%s-%02d'%(nums_1[0], int(nums_1[1])) try: job_dict['edu_time_end'] = '%s-%02d'%(nums_2[0], int(nums_2[1])) except: job_dict['edu_time_end'] = None try: job_dict['edu_time'] = '%s-%02d~%s-%02d'%(nums_1[0], int(nums_1[1]), nums_2[0], int(nums_2[1])) except: job_dict['edu_time'] = '%s-%02d~今'%(nums_1[0], int(nums_1[1])) elif len(time_list) == 1: _nums = re.findall('\d+', time_list[0]) if '毕业' in data_list[0]: _nums.append('06') _nums.insert(0, '09') _nums.insert(0, str(int(_nums[1]) - year_dict[top_level])) job_dict['edu_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) job_dict['edu_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['edu_time_end'] = '%s-%02d'%(_nums[2], int(_nums[3])) else: _nums.append('09') job_dict['edu_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') job_dict['edu_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['edu_time_end'] = '%s'%('至今') job_dict['edu_leval'] = edu_dict[top_level] if org: job_dict['edu_name'] = org else: job_dict['edu_name'] = '' edu_domain = '' for i in range(len(_list_data)): if org in _list_data[i]: continue if not _list_data[i] and '专业' in _list_data[i]: edu_domain = _list_data[i] if not edu_domain: for i in range(len(_list_data)): if org in _list_data[i]: continue if _list_data[i] and len(_list_data[i]) >= 3: edu_domain = _list_data[i] break if not edu_domain: for i in range(len(_list_data)): if org in _list_data[i]: for j in range(i+1, len(_list_data)): if _list_data[i] and len(_list_data[j]) >= 2: edu_domain = _list_data[j] break break job_dict['edu_domain'] = edu_domain if len(job_list) ==0: job_list.append(job_dict) else: if job_dict in job_list: continue if not job_dict['edu_time']: continue if int(job_dict['edu_time'][:4]) > int(job_list[-1]['edu_time'][:4]): job_list = [job_dict] + job_list else: job_list.append(job_dict) continue data_list[0] = re.sub(job_time[0], '', data_list[0]) _list = re.split('\|\040+', data_list[0]) #print(_list) if len(_list) == 1: __list = re.split('\040+', data_list[0]) job_dict['edu_name'] = __list[1].strip() job_dict['edu_domain'] = __list[2].strip() job_dict['edu_leval'] = __list[3].strip() else: #if job_dict['edu_leval'] not in if len(_list) > 3: job_dict['edu_name'] = _list[2].strip() job_dict['edu_domain'] = _list[3].strip() job_dict['edu_leval'] = _list[1].strip() else: job_dict['edu_leval'] = _list[0].strip() job_dict['edu_name'] = _list[1].strip() job_dict['edu_domain'] = _list[2].strip() if '硕士' in _list[0] or '研究生' in _list[0]: job_dict['edu_leval'] = '硕士' elif '博士' in _list[0]: job_dict['edu_leval'] = '博士' elif '本科' in _list[0]: job_dict['edu_leval'] = '本科' elif '学士' in _list[0]: job_dict['edu_leval'] = '本科' # print(job_dict) if len(job_list) ==0: job_list.append(job_dict) else: if job_dict in job_list: continue if int(job_dict['edu_time'][:4]) > int(job_list[-1]['edu_time'][:4]): job_list = [job_dict] + job_list else: job_list.append(job_dict) #edu_list.append(job_dict['edu_time'] + job_dict['edu_name'] + job_dict['edu_domain'] + job_dict['edu_leval']) #if job_list[0]['edu_leval'] not in ['硕士', '博士', '本科', '博后'] and len(job_list[0]['edu_leval']) > 5: # job_list[0]['edu_leval'] = '本科' return job_list # 教育经历改 (已完成) def get_edu_list(lines): logging.info(lines) edu_list = [{"Time":None, "startTime":None, "endTime":None, "edu_name":None, "edu_domain":None, "edu_level":None}] regex_time = re.compile(r'((\d{4})[年\W]{1,2}(\d{1,2})[月\W]?[\d]{0,2})[至到\W]+((\d{4})[年\W]{1,2}(\d{1,2})[月\W]?)?([今])?|(\d{4})[至\W]+([\d今]{4})') regex_end = re.compile(r'毕业时间[\w\W]{0,5}(\d{4})[\W年]?(\d{0,2})[月\W]?') regex_level = re.compile(r'[大本专科硕博士研究生后]{2,}') regex_domain = re.compile(u'[\u4E00-\u9FA5]{2,10}', re.UNICODE) count = 0 for line in lines: line = line.replace("学士","本科").replace("专业","").replace("学位","") for cell in re.split(r'[·\|\t]', line): if not cell.strip(): continue flags = 0 edu_time = regex_time.search(cell) edu_end_time = regex_end.search(cell) edu_level = regex_level.search(cell) edu_domain = regex_domain.search(cell) # 标准时间格式 if edu_time: # 提交信息 if edu_list[count].get("Time") and edu_list[count].get("edu_name"): edu_list.append({"Time":None, "startTime":None, "endTime":None, "edu_name":None, "edu_domain":None, "edu_level":None}) count += 1 edu_list[count]["startTime"] = '{:4d}-{:02d}'.format(int(edu_time.group(2)),int(edu_time.group(3))) # 年月日 if edu_time.group(5) != None: edu_list[count]["endTime"] = '{:4d}-{:02d}'.format(int(edu_time.group(5)),int(edu_time.group(6))) edu_list[count]["Time"] = '{:4d}-{:02d}~{:4d}-{:02d}'.format(int(edu_time.group(2)),int(edu_time.group(3)),int(edu_time.group(5)),int(edu_time.group(6))) # 只有年 elif edu_time.group(8) != None: edu_list[count]["Time"] = '{:4d}~{:4d}'.format(int(edu_time.group(8)),int(edu_time.group(9))) edu_list[count]["startTime"] = '{:4d}'.format(int(edu_time.group(8))) edu_list[count]["endTime"] = '{:4d}'.format(int(edu_time.group(9))) # 至今类 else: edu_list[count]["endTime"] = edu_time.group(7) edu_list[count]['Time'] = '{:4d}-{:02d}~{}'.format(int(edu_time.group(2)),int(edu_time.group(3)),edu_time.group(7)) flags = 1 # 只有毕业时间 elif edu_end_time: # 提交信息 if edu_list[count].get("endTime") and edu_list[count].get("edu_name"): edu_list.append({"Time":None, "startTime":None, "endTime":None, "edu_name":None, "edu_domain":None, "edu_level":None}) count += 1 # 年月 if edu_end_time.group(2): edu_list[count]["Time"] = '{:4d}-{:02d}~{:4d}-{:02d}'.format(int(edu_end_time.group(1)),int(edu_end_time.group(2)),int(edu_end_time.group(1))-3,int(edu_end_time.group(2))) edu_list[count]["endTime"] = '{:4d}-{:02d}'.format(int(edu_end_time.group(1)),int(edu_end_time.group(2))) # 只有年 elif edu_end_time.group(1): edu_list[count]["Time"] = '{:4d}~{:4d}'.format(int(edu_end_time.group(1)),int(edu_end_time.group(1))-3) edu_list[count]["endTime"] = '{:4d}'.format(int(edu_end_time.group(1))) # 学历 if (not edu_list[count].get("edu_level")) and edu_level: edu_list[count]["edu_level"] = edu_level.group(0) # WordTag 识别 学校/专业 for word, tag in ner_tag(cell): if (not edu_list[count].get("edu_name")) and (tag == "组织机构类_教育组织机构"): edu_list[count]["edu_name"] = word.strip() flags = 1 elif (not edu_list[count].get("edu_domain")) and (tag in "_术语类型"): edu_list[count]["edu_domain"] = word.strip() elif edu_list[count].get("edu_name") and edu_list[count].get("edu_domain"): break # LAC 识别 学校 else: for word, tag in ner(cell): if (tag == "ORG"): edu_list[count]["edu_name"] = word flags = 1 break # 未识别成功时填充专业 if (not (edu_level or flags or edu_list[count].get("edu_domain"))) and edu_domain: edu_list[count]["edu_domain"] = edu_domain.group(0) # 剔除时间不存在、学校不存在的列 if (not edu_list[-1].get("Time")) or (not edu_list[-1].get("edu_name")): edu_list.pop() return edu_list # 工作经历 (已完成) # ner + 分词 机构信息,人物身份信息,时间 工作内容区分判断 # 其中,时间是判断是否下一份工作情况的主要标识符之一。字符数量 # 时间类 数量词 def get_job_list(lines): logging.info(lines) job_list = [] re_txt = '\d{4,4}\040{0,2}.\d+\040{0,2}.?\040{0,2}[\-–至-\—~]{1,2}\040{0,2}\d{4,4}\040{0,2}.\040{0,2}\d+.?|\d{4,4}.\d+.?\040{0,2}[\-–-—]{0,2}\040{0,2}至?今|\d{4,4}.\d+.?\040{0,2}[\-–-]{1,2}\040{0,2}现在|\d{4,4}年\d+月\-\d{4,4}年\d+月|\d{4,4}年\d+月\-\~|\d{4,4}年\d+月[\-\~-]至今|\d{4,4}-\d+\040{0,2}[-\~至]\040{0,2}\d{4,4}-\d+|\d{4,4}-\d+\~|\d{4,4}-\d+\[~-]至今|\d{4,4}-\d+\040{0,2}至今' nums = [] for i in range(len(lines)): #print(lines[i]) #print(lines[i], re.findall(re_txt, lines[i]), re.findall('\||\040{1,}', lines[i])) if re.findall(re_txt, lines[i].replace(' ', '')) and re.findall('\||\040{1,}', lines[i]): nums.append(i) continue if re.findall(re_txt, lines[i].replace(' ', '')[:20]): nums.append(i) continue if len(lines[i].strip().replace(' ', '')) > 50: continue year_list = re.findall('19\d{2,2}.\d{1,2}|20\d{2,2}.\d{1,2}', lines[i]) if len(year_list) >= 2: nums.append(i) elif len(year_list) == 1 and '至今' in lines[i]: nums.append(i) nums.append(len(lines)) # logging.info(nums) logging.info('get_job_list :{}'.format(nums)) for i in range(1, len(nums[:])): job_dict = {'job_time':'', 'job_leval':'','job_company':'','job_content':''} data_list = lines[nums[i-1]:nums[i]] if '' in data_list: data_list.remove('') org = '' person_professor_list = [] org_index = -1 end_index = 3 job_time = re.findall(re_txt, data_list[0]) if not job_time: year_list = re.findall('19\d{2,2}.\d{1,2}|20\d{2,2}.\d{1,2}', data_list[0]) if len(year_list) >= 2: job_time = ['-'.join(year_list)] elif len(year_list) == 1 and '至今' in lines[i]: job_time = [year_list[0] + '-' + '至今'] if not job_time: regex = re.compile(r'((\d{4})[年\W]+(\d{1,2})[\W]?[\w]?)[至到\W]+((\d{4})[年\W]+(\d{1,2})[\W]?[\w]?)?([今])?') job_time = [re.search(regex, data_list[0]).group(0)] job_dict['job_time'] = job_time[0] _nums = re.findall('\d+', job_dict['job_time']) #print(_nums) if len(_nums) >= 4: job_dict['job_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) elif len(_nums) == 2: job_dict['job_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') data_list[0] = re.sub(job_time[0], '', data_list[0]) data_list[0] = data_list[0].strip() ner_list = [] for i in range(len(data_list[:3])): if '工作' in data_list[i][:4] and (re.findall(':|\:', data_list[i])): end_index = i break if not re.findall('\040|\||/', data_list[i]) and org: end_index = i break if len(data_list[i]) > 80: end_index = i break if data_list[i]: ner_data = ner_tag(data_list[i].strip()) else: continue ner_list.append(ner_data) for x in ner_data: if x[1] == '人物类_概念' and len(x[0]) > 2: person_professor_list.append(x[0].strip()) elif x[1] == '组织机构类_企事业单位' or x[1] == '组织机构类_教育组织机构': if not org: org = re.split('\040|\|/', x[0].strip())[0] org_index = i if not org: for i in range(len(ner_list)): ner_data = ner_list[i] for x in ner_data: if x[1] == '组织机构类': org = re.split('\040|\|/', x[0].strip())[0] break if not person_professor_list: for i in range(len(ner_list)): ner_data = ner_list[i] for x in ner_data: if x[1] == '人物类_概念': person_professor_list = [re.split('\040|\|/', x[0].strip())[0]] break data_line = ' '.join(data_list[:end_index]) data_line = re.sub('\||/', ' ', data_line) _list_data = re.split('\040+',data_line) if len(_list_data) == 1: end_index = 0 if not person_professor_list: for x in range(len(_list_data)): if re.findall('经理|工程师|会计|董事长|总监|秘书|主管|处长|局长|主任|讲师|教授', _list_data[x][-4:]): person_professor_list.append(_list_data[x]) if not org: for x in range(len(_list_data)): if len(_list_data[x]) < 4: _list_data[x] = '' elif person_professor_list and re.findall('|'.join(person_professor_list), _list_data[x]): _list_data[x] = '' elif '经理' == _list_data[x][-2:]: _list_data[x] = '' for x in range(len(_list_data)): if _list_data[x]: org = _list_data[x] break if not person_professor_list: for x in range(len(_list_data)): if org in _list_data[x]: for j in range(x+1, len(_list_data)): if _list_data[j]: person_professor_list = [_list_data[j]] break break #print(org, person_professor_list, job_time) job_dict['job_company'] = org job_dict['job_leval'] = ' '.join(person_professor_list) job_dict['job_content'] = re.sub('工工作作内内容容::|工工作作内内容容::|工工作作内内容容', '工作内容:', ''.join(data_list[end_index:])) job_dict['job_content'] = re.sub('/', '-', job_dict['job_content']) job_list.append(job_dict) continue if len(data_list) > 1 and data_list[1] and data_list[1][-1] == '|':# and data_list[0] and data_list[0][-1] != '|': data_list[0] = data_list[0] + data_list[1] data_list[1] = '' elif len(data_list) > 2 and data_list[2] and data_list[2][-1] == '|' and data_list[0][-1] != '|' and '|' in str(data_list[0]) and data_list[1] and data_list[1][-1] != '|': data_list[0] = data_list[0] + data_list[1] + data_list[2] data_list[1] = '' data_list[2] = '' elif len(data_list) > 1 and data_list[1] and '工作职责:' in data_list[2]: data_list[0] = data_list[0] + data_list[1] data_list[1] = '' elif len(data_list) > 1 and '工作职责:' in data_list[3]: data_list[0] = data_list[0] + data_list[1] + data_list[2] data_list[1] = '' data_list[2] = '' job_time = re.findall(re_txt, data_list[0]) job_dict['job_time'] = job_time[0] _nums = re.findall('\d+', job_dict['job_time']) #print(_nums) if len(_nums) >= 4: job_dict['job_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) elif len(_nums) == 2: job_dict['job_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') data_list[0] = re.sub(job_time[0], '', data_list[0]) data_list[0] = data_list[0].strip() data_list[0] = re.sub('历任:', ' ', data_list[0]) _list = data_list[0].split('|') if len(_list) == 1: __list = re.split('\040{2,}', data_list[0]) #print(__list) job_dict['job_leval'] = __list[1].strip() job_dict['job_company'] = __list[0].strip() else: job_dict['job_leval'] = _list[0].strip() job_dict['job_company'] = _list[1].strip() if '职级:' in data_list[1:]: data_list.remove('职级:') job_dict['job_content'] = re.sub('工工作作内内容容::|工工作作内内容容::|工工作作内内容容', '工作内容:', ''.join(data_list[1:])) job_dict['job_content'] = re.sub('/', '-', job_dict['job_content']) #print(job_dict) job_list.append(job_dict) return job_list # 项目经历 (已完成) # 项目名称未知 def get_pro_list(lines): logging.info(lines) pro_list = [{"Time":None,"startTime":None,"endTime":None,"pro_name":None,"job_leval":None,"job_company":None,"content":None,},] regex = re.compile(r'((\d{4})[年\W]+(\d{1,2})[\W]?[\w]?)[至到\W]+((\d{4})[年\W]+(\d{1,2})[\W]?[\w]?)?([今])?') re_con = re.compile(r'负责内容(.*?)') re_na = re.compile(r'\W(.*?项目)\W') count = 0 for line in lines: regex_time = regex.search(line) regex_content = re_con.search(line) regex_name = re_na.search(line) if regex_time: if pro_list[count].get("Time"): pro_list.append({"Time":None,"startTime":None,"endTime":None,"pro_name":None,"job_leval":None,"job_company":None,"content":None,}) count += 1 pro_list[count]["startTime"] = '{:4d}-{:02d}'.format(int(regex_time.group(2)),int(regex_time.group(3))) if regex_time.group(5) != None: pro_list[count]["endTime"] = '{:4d}-{:02d}'.format(int(regex_time.group(5)),int(regex_time.group(6))) pro_list[count]["Time"] = '{:4d}-{:02d}~{:4d}-{:02d}'.format(int(regex_time.group(2)),int(regex_time.group(3)),int(regex_time.group(5)),int(regex_time.group(6))) else: pro_list[count]["endTime"] = regex_time.group(7) pro_list[count]['Time'] = '{:4d}-{:02d}~{}'.format(int(regex_time.group(2)),int(regex_time.group(3)),regex_time.group(7)) elif regex_name and (not pro_list[count].get("job_name")): pro_list[count]["pro_name"] = regex_name.group() elif pro_list[count].get("content"): pro_list[count]["content"] += line else: try: for word, tag in ner_tag(line): if (not pro_list[count].get("job_leval")) and (tag == "人物类_概念"): pro_list[count]["job_leval"] = word if (not pro_list[count].get("job_company")) and (tag in "组织机构类_企事业单位"): pro_list[count]["job_company"] = word except Exception as e: logging.error(e) pro_list[count]["content"] = line return pro_list # 培训经历 (已完成) # ner + 分词 (机构名) 培训项目 时间 def get_cultivate_list(lines): logging.info(lines) job_list = [] re_txt = '\d{4,4}.\d{1,2}.?\040{0,2}[\-–至-\—~]\040{0,2}\d{4,4}.\d{1,2}[月]?|\d+\.\d+\-至今|\d+年\d+月\-\d+年\d+月|\d+年\d+月\-\~|\d+年\d+月[\-\~]至今|\d+-\d+\040{0,2}[\~至]\040{0,2}\d+-\d+|\d+-\d+\~|\d+-\d+\~至今|\d+-\d+\040{0,2}至今|^\d{4,4}.\d{1,2}|\d{4,4}.' re_txt_1 = '\d{4,4}.\d{1,2}.?\040{0,2}[\-–至-\—~]\040{0,2}\d{4,4}.\d{1,2}[月]?|\d+\.\d+\-至今|\d+年\d+月\-\d+年\d+月|\d+年\d+月\-\~|\d+年\d+月[\-\~]至今|\d+-\d+\040{0,2}[\~至]\040{0,2}\d+-\d+|\d+-\d+\~|\d+-\d+\~至今|\d+-\d+\040{0,2}至今' nums = [] for i in range(len(lines)): if re.findall(re_txt, lines[i].replace(' ', '')) and re.findall('\||\040{1,}', lines[i]): nums.append(i) continue if re.findall(re_txt, lines[i].replace(' ', '')[:20]): nums.append(i) if len(lines[i].strip().replace(' ', '')) > 50: continue nums.append(len(lines)) year_dict = {18:4, 17:3,20:3,19:3,21:2,22:1} for i in range(1, len(nums[:])): job_dict = {'cultivate_time':'', 'cultivate_time_beg':'', 'cultivate_time_end':'', 'cultivate_name':'','cultivate_leval':'','cultivate_content':''} data_list = lines[nums[i-1]:nums[i]] data_line = ' '.join(data_list) data_line = re.sub('[\|\t]', ' ', data_line) data_line = re.sub('-{3,}', '', data_line) ner_data = ner(''.join(data_list[:2])) org = '' time_list = [] for _ in ner_data: if _[1] == 'ORG' and not org: org = _[0].strip() elif _[1] == 'TIME' and len(_[1]) >= 4: time_list.append(_[0]) #TIME logging.info(data_line) _list_data = re.split('\040+', data_line) top_level = 22 end_index = 0 remove_list = [] if len(_list_data) <= 2: end_index = 0 #continue job_time = re.findall(re_txt_1, data_list[0]) if job_time: job_dict['cultivate_time'] = job_time[0] data_list[0] = re.sub(job_time[0], '', data_list[0]) else: job_dict['cultivate_time'] = '' for t in time_list: data_list[0] = re.sub(t, '', data_list[0]) _list = data_list[0].split('|') if len(_list) >= 2: job_dict['cultivate_name'] = _list[0].strip() job_dict['cultivate_leval'] = _list[1].strip() end_index = 1 _nums = re.findall('\d+', job_dict['cultivate_time']) if len(_nums) >= 4: job_dict['cultivate_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['cultivate_time_end'] = '%s-%02d'%(_nums[2], int(_nums[3])) job_dict['cultivate_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) elif len(_nums) == 2: job_dict['cultivate_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') job_dict['cultivate_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['cultivate_time_end'] = '%s'%('至今') elif len(time_list) == 2: nums_1 = re.findall('\d+', time_list[0]) nums_2 = re.findall('\d+', time_list[1]) nums_1.append('09') nums_2.append('07') job_dict['cultivate_time_beg'] = '%s-%02d'%(nums_1[0], int(nums_1[1])) job_dict['cultivate_time_end'] = '%s-%02d'%(nums_2[0], int(nums_2[1])) job_dict['cultivate_time'] = '%s-%02d~%s-%02d'%(nums_1[0], int(nums_1[1]), nums_2[0], int(nums_2[1])) elif len(time_list) == 1: _nums = re.findall('\d+', time_list[0]) if '获得' in data_list[0]: _nums.append('01') _nums.insert(0, '01') _nums.insert(0, str(int(_nums[1]) - year_dict[top_level])) job_dict['cultivate_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) job_dict['cultivate_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['cultivate_time_end'] = '%s-%02d'%(_nums[2], int(_nums[3])) else: _nums.append('01') job_dict['cultivate_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') job_dict['cultivate_time_beg'] = '%s-%02d'%(_nums[0], int(_nums[1])) job_dict['cultivate_time_end'] = '%s'%('至今') job_dict['cultivate_content'] = re.sub('培培训训内内容容::|培培训训内内容容::|培培训训内内容容', '培训内容:', ''.join(data_list[end_index:])) if not job_dict['cultivate_name']: job_dict['cultivate_name'] = org logging.info(job_dict) job_list.append(job_dict) continue ''' #print(nums) for i in range(1, len(nums[:])): job_dict = {'cultivate_time':'', 'cultivate_name':'','cultivate_leval':'','cultivate_content':''} data_list = lines[nums[i-1]:nums[i]] if '' in data_list: data_list.remove('') if len(data_list) > 1 and data_list[1] and data_list[1][-1] == '|' and data_list[0][-1] != '|': data_list[0] = data_list[0] + data_list[1] data_list[1] = '' job_time = re.findall(re_txt_1, data_list[0]) job_dict['cultivate_time'] = job_time[0] _nums = re.findall('\d+', job_dict['cultivate_time']) if len(_nums) >= 4: job_dict['cultivate_time'] = '%s-%02d~%s-%02d'%(_nums[0], int(_nums[1]), _nums[2], int(_nums[3])) elif len(_nums) == 2: job_dict['cultivate_time'] = '%s-%02d~%s'%(_nums[0], int(_nums[1]), '至今') data_list[0] = re.sub(job_time[0], '', data_list[0]) _list = data_list[0].split('|') if len(_list) >= 2: job_dict['cultivate_name'] = _list[0].strip() job_dict['cultivate_leval'] = _list[1].strip() job_dict['cultivate_content'] = re.sub('培培训训内内容容|培培训训内内容容::|培培训训内内容容::', '培训内容:', ''.join(data_list[1:])) else: job_dict['cultivate_content'] = re.sub('培培训训内内容容|培培训训内内容容::|培培训训内内容容::', '培训内容:', ''.join(data_list[0:])) #print(job_dict) ''' return job_list # 语言能力 def get_lag_list(lines): logging.info(lines) job_list = [] re_lan = re.compile(r'(\w+[语话])') lag_dict = {'lag_name':'', 'lag_leval':""} for l in lines: if not l.strip(): continue lag_name = re.search(re_lan, l) if lag_name and lag_name.group(1): if lag_dict['lag_name']: job_list.append(lag_dict) lag_dict['lag_name'] = lag_name.group(1) return job_list # 家庭情况 def get_fam_list(lines): job_list = [] fam_dict = {} for l in lines: if not l.strip(): continue ls = l.split('|') if len(ls) == 1: continue fam_dict = {'fam_name':"",'fam_company':"",'fam_lable':"","fam_status":"", 'fam_job':""} fam_dict["fam_lable"] = ls[0].strip() fam_dict["fam_name"] = ls[1].strip() flag = 0 if re.findall('\d岁|\d{4,5}', ls[2]): flag = 1 fam_dict["fam_company"] = ls[flag+2].strip() fam_dict["fam_job"] = ls[flag+3].strip() fam_dict["fam_status"] = ls[flag+4].strip() #print(fam_dict) job_list.append(fam_dict) return job_list # 证书情况 时间+证书名称 (已完成) def get_cet_list(lines): logging.info(lines) job_list = [] re_txt = '\d+年\d+月|\d+-\d+|\d+\.\d+' lines_word = ' '.join(lines) lines = re.findall('\d+年\d+月|\d+-\d+|\d+\.\d+', lines_word) nums = [] for x in range(len(lines) - 1): _index = lines_word.index(lines[x]) _end_index = lines_word.index(lines[x+1]) l = lines_word[_index : _end_index] if not l.strip(): continue lines_word = lines_word[_end_index:] job_time = re.findall(re_txt, l) cet_dict = {'cet_name':'','cet_time':""} if job_time: cet_dict['prize_time'] = job_time[0] l = re.sub(job_time[0], '', l) else: continue ls = re.split('\||\040+|\t+', l) logging.info(ls) for l in ls: if len(l) <= 3: continue cet_dict['prize_name'] = l.strip() break job_list.append(cet_dict) return job_list # 获奖情况 时间+获奖名称 (已完成) def get_prize_list(lines): logging.info(lines) job_list = [] re_txt = '\d+年\d+月|\d+-\d+|\d{4,4}.\d{1,2}' lines_word = ' '.join(lines) lines = re.findall('\d+年\d+月|\d{4,4}-\d+|\d{4,4}.\d{1,2}', lines_word) nums = [] for x in range(len(lines) - 1): _index = lines_word.index(lines[x]) _end_index = lines_word.index(lines[x+1]) l = lines_word[_index : _end_index] if not l.strip(): continue lines_word = lines_word[_end_index:] job_time = re.findall(re_txt, l) cet_dict = {'prize_name':'','prize_time':""} if job_time: cet_dict['prize_time'] = job_time[0] l = re.sub(job_time[0], '', l) else: continue ls = re.split('\||\040+|\t+', l) logging.info(ls) for l in ls: if len(l) <= 3: continue cet_dict['prize_name'] = l.strip() break logging.info(cet_dict) job_list.append(cet_dict) return job_list # Linux doc 文件处理 def doc2pdf_linux(docPath, pdfPath): """ 允许的文档格式:doc,docx 仅在linux平台下可以 需要在linux中下载好libreoffice """ # 注意cmd中的libreoffice要和linux中安装的一致 cmd = 'libreoffice --headless --convert-to pdf'.split() + [docPath] + ['--outdir'] + [pdfPath] # cmd = 'libreoffice6.2 --headless --convert-to pdf'.split() + [docPath] p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE) p.wait(timeout=30) # 停顿30秒等待转化 stdout, stderr = p.communicate() if stderr: raise subprocess.SubprocessError(stderr) # Win32 doc 文件处理 def doc2pdf(docPath, pdfPath, system): """ 注意使用绝对路径 pdf的生成只写路径,不写名字 """ docPathTrue = os.path.abspath(docPath) # bugfix - searching files in windows/system32 if system == "Linux": return doc2pdf_linux(docPathTrue, pdfPath) # txt 纯文本解析(已完成) def parse_txt(path): with open(path, 'r', encoding='utf-8') as fp: data = fp.read() global block, block_rev chun = 1 page = {1: []} if len(data.split("\n")) <= 2: for line in data.split("\n"): line = line.replace("\xa0", "").replace("【","").replace("】","").replace("教育/培训","教育经历") for word in line.split(): if word in block.keys(): chun = block[word] page[chun] = [] elif word: page[chun].append(word) else: for line in data.split("\n"): line = line.replace("\xa0", "").replace("【","").replace("】","").replace("教育/培训","教育经历") regex = re.compile(u'[\u3000]+',re.UNICODE) line = regex.sub('', line) if line in block.keys(): chun = block[line] page[chun] = [] elif line: page[chun].append(line) result_data = [] for key in page.keys(): for index, func in zip([1, 2, 3, 4, 5, 9, 10, 11, 12], [get_base_info, get_job_intention, get_edu_list, get_job_list, get_pro_list, get_lag_list, get_cet_list, get_prize_list, get_cultivate_list]): if key == index: result_data.append({block_rev[index]:func(page[index])}) console.print(result_data) # 纯文本 word 解析 def read_from_word(doc): para_text = [] for para in doc.paragraphs: para_text.append(para.text) global block, block_rev chun = 1 page = {1: []} for line in para_text: regex = re.compile(u'[\uF000-\uF0FF]+',re.UNICODE) line = regex.sub('', line) if line in block.keys(): chun = block[line] page[chun] = [] elif line: page[chun].append(line) result_data = [] for key in page.keys(): for index, func in zip([1, 2, 3, 4, 5, 10, 11, 12], [get_base_info, get_job_intention, get_edu_list, get_job_list, get_pro_list, get_cet_list, get_prize_list, get_cultivate_list]): if key == index: result_data.append({block_rev[index]:func(page[index])}) console.print(result_data) # 提取 word 表格(已完成) def check_word(path): doc = Document(path) tables = doc.tables if not tables: logging.info("this is raw text") read_from_word(doc) logging.info("this is a Table") prk = {"姓名":1, "性别":1, "出生年月":1, "民族":1, "籍贯":1, "户籍地":1, "政治面貌":1, "参加工作时间":1, "健康状况":1, "专业技术资格":1, "外语水平":9, "熟悉专业有何专长":8, "学历学位":1, "工作单位":1, "现任职务":1, "任职时间":1, "提职时间":1, "联系电话":1, "邮箱地址":1, "称谓":13, "工作单位及职务":1, "毕业时间、院校及专业":3,} block = { "个人信息":1, "基本信息":1, "个人简历":1, "基基本本信信息息":1, "基本信息基本信息":1, "基本信息文本内容":1, "求职意向":2, "求职意向求职意向":2, "期望工作文本内容":2, "教育背景":3, "教育经历":3, "教教育育经经历历":3, "教育经历教育经历":3, "教育经历文本内容":3, "学历学位":3, "工作经验":4, "主要工作内容与职责":4, "工作方面":4, "实习经历":4, "工作经历":4, "工工作作经经历历":4, "工作经历工作经历":4, "工作经历文本内容":4, "项目经历":5, "项目经验":5, "科研项目经历":5, "项项目目经经历历":5, "项目经历项目经历":5, "研究生参与代表性项目":5, "项目经历文本内容":5, "专业技能":6, "个人技能":6, "专业/外语技能":6, "技能素质":6, "个人技能文本内容":6, "自我评价":7, "个人简介":7, "个人评价":7, "自我描述":7, "自自我我评评价价":7, "自我评价自我评价":7, "自我评价文本内容":7, "兴趣爱好":8, "兴趣爱好文本内容":8, "语言及方言":9, "语言能力":9, "英语能力":9, "语语言言能能力力":9, "语言能力语言能力":9, "语言技能文本内容":9, "证书":10, "所获证书文本内容":10, "获得奖励":11, "获奖经历":11, "获奖情况":11, "获获奖奖经经历历":11, "获奖经历获奖经历":11, "获奖情况及社会活动":11, "校内奖励":11, "校内活动&奖励":11, "所获奖励文本内容":11,"奖惩情况":11, "培训":12, "培训经历":12, "培培训训经经历历":12, "培训经历文本内容":12, "家庭成员":13, "家家庭庭成成员员":13, "家庭成员家庭成员":13, "主要家庭成员及社会关系":13, "社会活动":"other", "实践经验":"other", "社会活动及社会实践":"other", "近三年年度考核结果":"other", "其他意愿":"other", } chun = 1 page = {1: []} regex = re.compile(r'(\(\w{2,8}\))?((\w{2,8}))?') for table in tables: lo = {} # 存储每一行去重后的数据 for row in range(0, len(table.rows)): row_list = [] for col in range(0, len(table.row_cells(row))): # 提取row行的全部列数据 row_list.append(regex.sub("", table.cell(row,col).text.replace(" ","").replace(":", ":").replace("学历\n学位","学历学位"))) # 去除字符串中的特殊字符,并添加到临时列表中 lo[row] = (sorted(set(row_list), key=row_list.index)) # 在不变顺序的前提下,去除List中的重复项 # 去除空项 for key in lo.keys(): if "" in lo[key]: lo[key].remove("") for _, line in lo.items(): if (line[0] in block.keys()) or (line[0] in prk.keys()): # 包含大类目名 if line[0] in block.keys(): # 指向当前类目 chun = block[line[0]] if not page.get(chun): page[chun] = [] # 去除类目名 line = '\n'.join(line[1:]) # 包含小类目 elif line[0] in prk.keys(): # 指向当前类目 chun = prk[line[0]] if not page.get(chun): page[chun] = [] # 不去除 line = '\n'.join(line) else: line = '\n'.join(line) # 标准化小类目 for k in prk.keys(): line = line.replace(k+"\n", k+":") page[chun].extend(line.split()) result_data = [] for key in page.keys(): for index, func in zip([1, 2, 3, 4, 5, 10, 11, 12], [get_base_info, get_job_intention, get_edu_list, get_job_list, get_pro_list, get_cet_list, get_prize_list, get_cultivate_list]): if key == index: result_data.append({block_rev[index]:func(page[index])}) console.print(result_data) # pdf 解析句子(已完成) def parse_line_layout(layout): texts = [] """解析页面内容,一行一行的解析""" # bbox: # x0:从页面左侧到框左边缘的距离。 # y0:从页面底部到框的下边缘的距离。 # x1:从页面左侧到方框右边缘的距离。 # y1:从页面底部到框的上边缘的距离 for textbox in layout: if isinstance(textbox, LTTextBox) or isinstance(textbox, LTTextLine): for char in textbox: if isinstance(char, LTTextLineHorizontal): texts.append([char.bbox[0], char.bbox[3], char.get_text().strip()]) # 按行排序 texts.sort(key=lambda x:-x[1]) global block, block_rev chun = 1 page = {1: []} for _, _, line in texts: regex = re.compile(u'[\uF000-\uF0FF]+',re.UNICODE) line = regex.sub('', line) if line in block.keys(): chun = block[line] page[chun] = [] elif line: page[chun].append(line) return page # pdf 样式解析(已完成) def read_from_pdf(path): result = {} with open(path, 'rb') as in_file: parser = PDFParser(in_file) # 用文件对象来创建一个pdf文档分析器 doc: PDFDocument = PDFDocument(parser) # 创建pdf文档 rsrcmgr = PDFResourceManager() # 创建PDF,资源管理器,来共享资源 # 创建一个PDF设备对象 laparams = LAParams() device = PDFPageAggregator(rsrcmgr, laparams=laparams) # 创建一个PDF解释其对象 interpreter = PDFPageInterpreter(rsrcmgr, device) # 循环遍历列表,每次处理一个page内容 # doc.get_pages() 获取page列表 interpreter = PDFPageInterpreter(rsrcmgr, device) # 处理文档对象中每一页的内容 # doc.get_pages() 获取page列表 # 循环遍历列表,每次处理一个page的内容 # 这里layout是一个LTPage对象 里面存放着 这个page解析出的各种对象 一般包括LTTextBox, LTFigure, LTImage, LTTextBoxHorizontal 等等 想要获取文本就获得对象的text属性, for page in PDFPage.create_pages(doc): logging.info('================ 新页面 ================') interpreter.process_page(page) layout = device.get_result() r = parse_line_layout(layout) for key in r.keys(): if result.get(key): result[key].extend(r[key]) else: result[key] = r[key] block_rev = {1:"基本信息",2:"求职意向",3:"教育经历",4:"工作经历",5:"项目经历",6:"专业技能",7:"自我评价",8:"兴趣爱好",9:"语言能力",10:"证书",11:"获奖情况",12:"培训经历",13:"家庭成员","other":"其他"} result_data = [] for key in result.keys(): for index, func in zip([1, 2, 3, 4, 5, 9, 10, 11, 12], [get_base_info, get_job_intention, get_edu_list, get_job_list, get_pro_list, get_lag_list, get_cet_list, get_prize_list, get_cultivate_list]): if key == index: result_data.append({block_rev[index]: func(result[index])}) console.print(result_data) # pdf 表格解析 () def parse_table_from_pdf(path): global block, block_rev result = {} with pdfplumber.open(path) as pdf: for page in pdf.pages: key = None for table in page.extract_tables(): for line in table: for word in line: if not key: key = word else: result[key] = word key = None for key in block.keys(): if result.get(key): logging.info({key: result[key]}) console.print(result) # for key in result.keys(): # for index, func in zip([1, 2, 3, 4, 5, 10, 11, 12], [get_base_info, get_job_intention, get_edu_list, get_job_list, get_pro_list, get_cet_list, get_prize_list, get_cultivate_list]): # if (key in block.keys()) and (block[key] == index): # console.print(block_rev[index]) # try: # console.print(func(result[index]), justify="left") # except Exception as e: # logging.error(e) # break # else: # console.print({key: result[key]}) # break return None # 检测 pdf 格式 (已完成) def check_pdf(path): """ # 输入: # pdf 文件路径 # 输出: # 文件包含元素 [Word, Table] """ rst = [] for page_layout in extract_pages(path): for element in page_layout: if isinstance(element, LTFigure): for cell in element: if isinstance(cell, LTChar): rst.append("Table") break elif isinstance(element, LTTextContainer): rst.append("Word") return set(rst) # 检测传入格式(已完成) def detection_type(path, system): # 传入目录 if os.path.isdir(path): for filename in os.listdir(path): filename = os.path.join(path, filename) # 传入为 doc logging.info(filename) if filename.endswith('.doc') and not filename.startswith('.~'): doc2pdf(docPath = filename, pdfPath = './pdf', system=system) newfile = './pdf/' + os.path.splitext(os.path.split(newfile)[-1])[0] + '.pdf' if os.path.exists(newfile): rst = check_pdf(filename) if "Table" in rst: parse_table_from_pdf(filename) pass if "Word" in rst: read_from_pdf(filename) # 传入为 docx elif os.path.isfile(filename) and filename.endswith('.docx'): check_word(filename) # 传入为 pdf if os.path.isfile(filename) and filename.endswith('.pdf'): rst = check_pdf(filename) if "Table" in rst: parse_table_from_pdf(filename) pass if "Word" in rst: read_from_pdf(filename) # 传入为 txt elif os.path.isfile(filename) and filename.endswith('.txt'): parse_txt(filename) # 传入为 doc elif os.path.isfile(path) and path.endswith('.doc'): doc2pdf(docPath = path, pdfPath = './pdf', system=system) newfile = './pdf/' + os.path.splitext(os.path.split(newfile)[-1])[0] + '.pdf' if os.path.exists(newfile): rst = check_pdf(filename) if "Table" in rst: parse_table_from_pdf(filename) pass if "Word" in rst: read_from_pdf(filename) # 传入为 docx elif os.path.isfile(path) and path.endswith('.docx'): check_word(path) # 传入为 pdf elif os.path.isfile(path) and path.endswith('.pdf'): rst = check_pdf(path) if "Table" in rst: parse_table_from_pdf(path) if "Word" in rst: read_from_pdf(path) # 传入为 txt elif os.path.isfile(path) and path.endswith('.txt'): parse_txt(path) return None if __name__ == '__main__': import platform system = platform.system() if (system == "Windows"): logging.info("Windows") elif (system == "Linux"): logging.info("Linux") else: logging.error("Unnot support this system") # try: # detection_type(sys.argv[1], system) # except Exception as e: # logging.error(e) detection_type(sys.argv[1], system) # detection_type('w1.pdf', system)