# 在pdf_miner的基础上进行优化 # 标准包导入 import os import re import json import re import shutil import pandas as pd import pdb import base64 from io import BytesIO from pprint import pprint from paddleocr import PPStructure, draw_structure_result, save_structure_res from pypdf import PdfReader from pdf2image import convert_from_path # 第三方包导入 import numpy as np import pandas as pd import cv2 import torch import glob import logging import requests import time import datetime import subprocess from tqdm import tqdm from tooklit import RefPageNumberResolver from get_info import PdfExtractAttr from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json from PIL import Image from pdfminer.image import ImageWriter from tooklit import remove_red_seal, remove_blue_seal # tools function def create_logger(log_path): """ 将日志输出到日志文件和控制台 """ logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s') # 创建一个handler,用于写入日志文件 file_handler = logging.FileHandler( filename=log_path, mode='w') file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) # 创建一个handler,用于将日志输出到控制台 console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(formatter) logger.addHandler(console) return logger # 页面信息缓存 class PageBuffer(): def __init__(self): self.page_cache = {} # 查询某一页的信息属性 def query(self, page): if self.page_cache.get(page, -1) == -1: return None page_info = self.page_cache[page] return page_info class SealAgent(): def __init__(self, url, headers): self.url = url self.headers = headers def get_content(self, image_path): f = open(image_path, 'rb') img = base64.b64encode(f.read()) params = {"image":img} try: response = requests.post(url=self.url, data=params, headers=self.headers) return response.json() except: logger.info(f"当前图像:{image_path}在印章识别ocr接口中网络不稳定 ...") def seal_parse(self, image_path): meta = { "firm_seals": [], "indiv_seals": [] } content = self.get_content(image_path=image_path) seal_num = content["result_num"] seal_result = content["result"] if seal_num == 0: return meta for seal_info in seal_result: seal_type = seal_info["type"] seal_content = seal_info["major"]["words"].strip().replace(' ', '') top = seal_info["location"]["top"] left = seal_info["location"]["left"] width = seal_info["location"]["width"] height = seal_info["location"]["height"] if '公司' in seal_content: meta['firm_seals'].append( { "seal_type": seal_type, "firm_name": seal_content } ) else: meta['indiv_seals'].append({ "seal_type": seal_type, "indiv_name": seal_content }) return meta # ocr外部接口 class OcrAgent(): def __init__(self, url): self.url = url self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)' # 不同类型证书资质正则 self.re_dict = { "business_license" : r'营业执照', "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$', "production_license": r'\b[\u4e00-\u9fff]*许可证\b', "qualtifications" : r'\b[\u4e00-\u9fff]*证书', "proof": r'\b[\u4e00-\u9fff]*证明', } # 字迹阈值 self.sign_threshold = 0.05 self.font_threshold = 39 # 集成印章ocr def integrate_sealagent(self, url, headers): self.sealagent = SealAgent(url=url, headers=headers) # 获取图像的ocr信息 def get_content(self, image_path): try: with open(image_path, 'rb') as image_file: files = {"file": ("image.jpg", image_file, "image/jpeg")} response = requests.post(self.url, files=files) return response.json() except: raise ValueError(f"传入图像{image_path}已损坏") def judge_pro(self, image_path: str, firm_name: str): # 以下实现要求image_path的路径如下例所示: # ./test/page-0.jpg image_name = image_path.split('/')[-1] logger.info(f'processing img: {image_name}') page_number = image_name.split('-')[-1].split('.')[0] response_item = { "qualtified": None, # 是否为证书 "matched": None, # 是否出现匹配的公司名称 "license_name": None, # 证书名 "license_page": page_number, # 证书所在页 "start_datetime": None, # 有效起始时间 "end_datetime": None # 有效终止时间 } content = self.get_content(image_path=image_path) image_info = content["rawjson"]["ret"] # 必须包含公司名称信息 if not self.search(image_info=image_info, key_list=[firm_name]): return None else: response_item['matched'] = True # 是否匹配营业执照或资质证书 for key, format in self.re_dict.items(): if key == 'business_license': match_name = self.re_match(image_info=image_info, format=format) else: match_name = self.re_search(image_info=image_info, format=format) if match_name and key == 'business_license': response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_license_datetime(image_info=image_info, response_item=response_item) return response_item elif match_name: response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item) return response_item return response_item # 判断图像是否为某公司的营业执照或资质证书信息,并返回提取到的信息 def judge(self, image_path: str, firm_name: str): # 以下实现要求image_path的路径如下例所示: # ./test/image_page_12_0.jpg # 12代表当前图像在pdf中的第12页 # 0代表当前图像为该页提取的第1张图像 image_prefix = image_path.split('/')[-1] logger.info(f'processing img: {image_prefix}') page_number = image_prefix.split('_')[-2] response_item = { "qualtified": None, # 是否为证书 "matched": None, # 是否出现匹配的公司名称 "license_name": None, # 证书名 "license_page": page_number, # 证书所在页 "start_datetime": None, # 有效起始时间 "end_datetime": None # 有效终止时间 } content = self.get_content(image_path=image_path) image_info = content["rawjson"]["ret"] # 必须包含公司名称信息 if not self.search(image_info=image_info, key=firm_name): return None else: response_item['matched'] = True # 是否匹配营业执照或资质证书 for key, format in self.re_dict.items(): if key == 'business_license': match_name = self.re_match(image_info=image_info, format=format) else: match_name = self.re_search(image_info=image_info, format=format) if match_name and key == 'business_license': response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_license_datetime(image_info=image_info, response_item=response_item) return response_item elif match_name: response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item) return response_item return response_item # 资质证书有效期定位 def find_certificate_datetime(self, image_info, response_item): # keyword start_keywords = ['颁发日期', '发证日期', '生效日期'] end_keywords = ['终止日期'] priority_keywords = ['有效期', '使用期限', '有效日期'] keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期'] # re format format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?' special_format = r'\d{4}-\d{1,2}-\d{1,2}' # 判断是否存在日期关键字 flag = False keyword_dict = {} for info in image_info: word = info['word'] left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] for keyword in keywords_list: # 该证书存在日期关键字 if keyword in word: flag = True charset_list = info['charset'] for char_dc in charset_list: if char_dc['word'] == keyword[-1]: right = char_dc['rect']['left'] + char_dc['rect']['width'] keyword_dict[keyword] = { "left": left, "top": top, "right": right } if flag: for info in image_info: word = info['word'] if '年' in word or re.search(r'\d', word): left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] if '年' in word: find_list = re.findall(pattern=format, string=word) else: find_list = re.findall(pattern=special_format, string=word) # logger.info(f'word {word} has find_list{find_list}') # if self.check: # pdb.set_trace() if len(find_list) == 1: find_string = find_list[0] if '至' in find_string: start_prefix = find_string.split('至')[0].replace('自', '') end_prefix = find_string.split('至')[-1] if '年' in start_prefix: response_item['start_datetime'] = start_prefix if end_prefix != '': response_item['end_datetime'] = end_prefix return response_item # 不存在{至}的情况下通过位置和已有期限关键字来分配日期 else: for k, k_info in keyword_dict.items(): k_left = k_info['left'] k_right = k_info['right'] k_top = k_info['top'] # 捕获关键字 if left == k_left: if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None: response_item['end_datetime'] = find_string elif k in start_keywords and response_item['start_datetime'] is None: response_item['start_datetime'] = find_string break elif left >= k_right and top >= k_top: if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None: response_item['end_datetime'] = find_string elif k in start_keywords and response_item['start_datetime'] is None: response_item['start_datetime'] = find_string elif len(find_list) == 2: start_prefix = find_list[0].replace('自', '') end_prefix = find_list[-1].replace('至', '') if response_item['start_datetime'] is None: response_item['start_datetime'] = start_prefix if response_item['end_datetime'] is None: response_item['end_datetime'] = end_prefix else: logger.info(f'wrong word: {word} ...') else: continue return response_item # 营业执照有效期定位 def find_license_datetime(self, image_info, response_item): for info in image_info: word = info['word'] # id if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20): response_item['id'] = word if word.isdigit() else word[5:] elif bool(re.match(self.datetime_re, word)): split = word.split('至') start_datetime = split[0] end_datetime = split[-1] response_item['start_datetime'] = start_datetime response_item['end_datetime'] = end_datetime elif word == '长期': response_item['start_datetime'] = response_item['end_datetime'] = '长期' return response_item # 在目录中找到正文pos右侧对应的数字标签 def digit_label(self, image_info, pos: dict): gold_left = pos['left'] gold_right = pos['right'] gold_top = pos['top'] gold_bottom = pos['bottom'] # 判断字符串中是否包含数字 def contain_digit(word): for c in word: if c.isdigit(): return True return False mini_distance = 10000 mini_word = "" for info in image_info: word = info['word'] left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] right = left + width bottom = top + height if contain_digit(word=word) and left >= gold_left: distance = abs(top - gold_top) if distance < mini_distance: mini_distance = distance mini_word = word # 提取最终的mini_word label_page = None if '.' in mini_word: label_page = mini_word.split('.')[-1] elif mini_word.isdigit(): label_page = mini_word return label_page # 在image_info中搜寻word中包含key_list的内容,并打包信息返回 def pack_search(self, image_info, key_list): meta = [] for info in image_info: word = info['word'].strip().replace(' ', '') left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] right = left + width bottom = top + height for key in key_list: if key in word: meta.append({ "word": word, "contain_key": key, "bbox": { "left": left, "right": right, "top": top, "bottom": bottom, "width": width, "height": height } }) return meta # 在image_info中搜寻word中包含key_list的内容 def search(self, image_info, key_list): for info in image_info: word = info['word'].strip().replace(' ', '') for key in key_list: if key in word: return True return False # 精确匹配key_list中的内容 def exact_search(self, image_info, key_list): meta = [] for info in image_info: word = info['word'].strip().replace(' ', '') for key in key_list: if key == word: height = info['rect']['height'] meta.append({ "keyword": word, "font_size": height }) return meta # 在image_info中使用re.search搜寻满足{format}正则的信息 def re_search(self, image_info, format): for info in image_info: word = info['word'] match = re.search(format, word) if match: return match.group(0) return False # 在image_info中使用re.match搜寻满足{format}正则的信息 def re_match(self, image_info, format): for info in image_info: word = info['word'] match = re.match(format, word) if match: return word return False # 用于识别固定位置是否有公司法人签名或公司盖章 def signature_recognition(self, image_path: str): # 先调用接口判断公司盖章 meta = self.sealagent.seal_parse(image_path=image_path) if len(meta["firm_seals"]) > 0 or len(meta["indiv_seals"]) > 0: logger.info("检测到当前页面具备印章 ...") return True keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:'] key_pos = {} image_prefix = os.path.dirname(image_path) image_name = image_path.split('/')[-1][:-4] removed_red_image_name = image_name + '_red_roi' + image_path.split('/')[-1][-4:] removed_blue_image_name = image_name + '_blue_roi' + image_path.split('/')[-1][-4:] red_ink_image_name = image_name + '_red_ink' + image_path.split('/')[-1][-4:] blue_ink_image_name = image_name + '_blue_ink' + image_path.split('/')[-1][-4:] removed_red_image_path = os.path.join(image_prefix, removed_red_image_name) removed_blue_image_path = os.path.join(image_prefix, removed_blue_image_name) red_ink_image_path = os.path.join(image_prefix, red_ink_image_name) blue_ink_image_path = os.path.join(image_prefix, blue_ink_image_name) if not os.path.exists(removed_red_image_path): removed_red_seal_img = remove_red_seal(image_path=image_path) cv2.imwrite(removed_red_image_path, removed_red_seal_img) else: removed_red_seal_img = cv2.imread(removed_red_image_path) if not os.path.exists(removed_blue_image_path): removed_blue_seal_img = remove_blue_seal(image_path=image_path) cv2.imwrite(removed_blue_image_path, removed_blue_seal_img) else: removed_blue_seal_img = cv2.imread(removed_blue_image_path) red_content = self.get_content(image_path=removed_red_image_path) red_image_info = red_content["rawjson"]["ret"] blue_content = self.get_content(image_path=removed_blue_image_path) blue_image_info = blue_content["rawjson"]["ret"] def identify(image_info, input_img, out_path): for info in image_info: word = info['word'].replace(' ', '') left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] right = left + width bottom = top + height for keyword in keywords: if keyword in word: key_pos[keyword] = { "word": word, "left": left, "right": right, "top": top, "bottom": bottom } break # 如果不存在"投标函"、"法定代表人"等关键字,则返回False if len(key_pos) == 0: return False # 定位到法定代表人所在位置 # import pdb; pdb.set_trace() if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \ ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)): if key_pos.get('法定代表人或其委托代理人:') is not None: l_info = key_pos['法定代表人或其委托代理人:'] l_cnt = 13 l_string = '法定代表人或其委托代理人:' else: l_info = key_pos['法定代表人:'] l_cnt = 6 l_string = '法定代表人:' if key_pos.get('(法定代表人CA电子印章)') is not None: r_info = key_pos['(法定代表人CA电子印章)'] r_string = '(法定代表人CA电子印章)' elif key_pos.get('(法定代表人CA电子印章或签字)') is not None: r_info = key_pos['(法定代表人CA电子印章或签字)'] r_string = '(法定代表人CA电子印章或签字)' else: r_info = key_pos['(签字)'] r_string = '(签字)' # 此时签名应在两者之间 l = l_info['right'] l_word = l_info['word'] r = r_info['left'] r_word = r_info['word'] t = max(l_info['top'], r_info['top']) b = min(l_info['bottom'], r_info['bottom']) - 5 if l_word[-l_cnt:] != l_string or r_word != r_string: return True else: black_ratio = self.ink_recognition( input_img=input_img, out_path=out_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None): # 此时签名应已包含 if key_pos.get('(法定代表人CA电子印章)') is not None: key = key_pos['(法定代表人CA电子印章)'] elif key_pos.get('(法定代表人CA电子印章或签字)') is not None: key = key_pos['(法定代表人CA电子印章或签字)'] elif key_pos.get('(签字)') is not None: key = key_pos['(签字)'] key_word = key['word'] key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '') if key_word != '': return True return False elif key_pos.get('法定代表人:') is not None: # 此时签名在右边或已包含 word = key_pos['法定代表人:']['word'] l = key_pos['法定代表人:']['left'] r = l + 100 t = key_pos['法定代表人:']['top'] b = key_pos['法定代表人:']['bottom'] - 5 if word[-6:] != '法定代表人:': return True else: black_ratio = self.ink_recognition( input_img=input_img, out_path=out_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False elif key_pos.get('法定代表人或其委托代理人:') is not None: # 此时签名在右边或已包含 word = key_pos['法定代表人或其委托代理人:']['word'] l = key_pos['法定代表人或其委托代理人:']['left'] r = l + 100 t = key_pos['法定代表人或其委托代理人:']['top'] b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5 if word[-13:] != '法定代表人或其委托代理人:': return True else: black_ratio = self.ink_recognition( input_img=input_img, out_path=out_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False else: return False return identify(red_image_info, removed_red_seal_img, red_ink_image_path) \ or identify(blue_image_info, removed_blue_seal_img, blue_ink_image_path) # 用于判断固定位置的长方形框内是否存在签名字迹 # 用于识别图像固定位置黑色字迹所占比例,并将该位置的图像截取保存 def ink_recognition(self, input_img, out_path, meta: dict): left = meta["left"] right = meta["right"] top = meta["top"] bottom = meta["bottom"] crop_img = input_img[top:bottom, left:right, :] cv2.rectangle(input_img, (left, top), (right, bottom), (255, 255, 0), 2) # 绿色框,线宽为2 test_path = out_path[:-4] + '*' + out_path[-4:] if crop_img is None or crop_img.size == 0: logger.info("Error: crop_img is empty") return 0.0 else: cv2.imwrite(out_path, crop_img) cv2.imwrite(test_path, input_img) gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY) thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU) filter_condition = int(thresh * 0.90) _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV) total_pixels = black_thresh.size black_pixels = np.count_nonzero(black_thresh) black_ratio = black_pixels / total_pixels return black_ratio # 用于判别字体大小 def font_judge(self, kw_search_meta): if len(kw_search_meta) == 0: # 即未搜寻到关键字,非相关页 return False for meta in kw_search_meta: keyword = meta["keyword"] font_size = meta["font_size"] logger.info(f"keyword:{keyword} has font_size: {font_size}") if font_size >= self.font_threshold: return True # 基于paddlepaddle的table ocr接口 def table_parse(self, image_path: str, save_folder: str = ''): table_engine = PPStructure(show_log=True) img = cv2.imread(image_path) result = table_engine(img) expectation = { "table": { "title": [], "title_confidence": [], "content": [], "content_confidence": [], }, "figure": { "content": [], "content_confidence": [], "caption": [], "caption_confidence": [], }, "page_numbers": [], "others": [] } for res in result: if res['type'] == 'title' or res['type'] == 'table_caption': if len(res['res']) > 0: expectation['table']['title_confidence'].append(res['res'][0]['confidence']) expectation['table']['title'].append(res['res'][0]['text']) elif res['type'] == 'table': expectation['table']['content_confidence'].append(res['score']) expectation['table']['content'].append(pd.read_html(res['res']['html'])[0].values.tolist()) elif res['type'] == 'figure': expectation['figure']['content_confidence'].append(res['score']) expectation['figure']['content'].append(res['res']) elif res['type'] == 'figure_caption': expectation['figure']['caption_confidence'].append(res['score']) expectation['figure']['caption'].append(res['res']) else: expectation['others'].append(res) if save_folder: # 存储为save_folder/save_name save_structure_res(result, save_folder, os.path.basename(image_path).split('.')[0]) return expectation # 提供pdf解析,并基于提取文本信息进行位置匹配 class PdfMatcher(PdfExtractAttr): # file_path为提供的pdf文件路径 def __init__(self, file_path: str): super(PdfMatcher, self).__init__( file_path=file_path ) # 投标书路径 self.document = file_path # 投标书名称 self.bid_name = file_path.split('/')[-1][:-4] # 投标书数据文件夹 self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name) # 公司名称 self.firm_name = file_path.split('/')[-2] # title list title_path = os.path.join(self.bid_dir, "title.json") # image list # self.image_dir = os.path.join(self.bid_dir, "extracted_images") # if (not os.path.exists(title_path)) or (not os.path.exists(self.image_dir)): # os.makedirs(self.image_dir, exist_ok=True) if not os.path.exists(title_path): self.main_parse(pdf_path=file_path, title_path=title_path) # self.main_parse(pdf_path=file_path, title_path=title_path, image_dir=self.image_dir) self.title = load_json(title_path) # outline list outline_path = os.path.join(self.bid_dir, "outlines.json") self.outline = self.parse_outline(out_path=outline_path) # text list text_path = os.path.join(self.bid_dir, "all_texts.json") self.details = self.parse_text(out_path=text_path) # table list table_path = os.path.join(self.bid_dir, "all_tables.json") if os.path.exists(table_path): self.table = load_json(table_path) else: self.table = self.parse_table_pro(table_path=table_path) # image format # self.image_format = "image_page_{}*" # image filter threshold self.start_threshold = 10 self.distance_threshold = 6 self.search_threshold = 20 # total pages self.total_pages = self.count_pages() # 证书正则 self.license_dict = { "business_license" : r'营业执照', "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$', "production_license": r'\b[\u4e00-\u9fff]*许可证\b', "qualtifications" : r'\b[\u4e00-\u9fff]*证书', "proof": r'\b[\u4e00-\u9fff]*证明', } # 在title中找寻包含keyword的信息 # digit_limit表明是否使用数字限制 def search_in_title(self, keyword, digit_limit=False): meta = [] digits = "一二三四五六七八九十" for title_block in self.title: block_text = title_block['text'].replace(' ', '').strip() if digit_limit: if keyword in block_text: # 确保keyword左右不包含digit中的内容 cnt = block_text.find(keyword) length = len(keyword) check_left = cnt - 1 check_right = cnt + length if (check_left >= 0 and block_text[check_left] in digits) or (check_right < len(block_text) and block_text[check_right] in digits): continue else: if keyword in block_text: meta.append({ "page_number": title_block["page_number"], "text": block_text }) return meta # 在outline中找寻包含keywords的信息 def search_in_outline(self, keyword): meta = [] for outline_block in self.outline: block_text = outline_block['text'].replace(' ', '').strip() if keyword in block_text: meta.append({ "page_number": outline_block["page_number"], "text": block_text }) return meta # 用于定位营业执照、资质证书的页面范围 def search_license_interval(self, necessity_interval=None): '''定位营业执照、资质证书的区间范围''' # 通过关键字模糊定位 keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件'] search_interval = [] license_pages = [] # locate in title.json left_pos = -1 # 左指针 right_pos = -1 # 右指针 for title_block in self.title: block_text = title_block['text'].replace(' ', '').strip() # TODO 先进行证书正则判断 ''' for key, format in self.license_dict.items(): match = re.search(format, block_text) if match: license_pages.append(title_block['page_number']) ''' # 先进行左区间判定 if left_pos != -1 and '证书' not in block_text: right_pos = title_block['page_number'] search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 for keyword in keywords: if keyword in block_text: # 先进行模糊的outline定位 center_page = None if '.' in block_text: center_page = block_text.split('.')[-1] if center_page.isdigit(): center_page = eval(center_page) left_pos = min(title_block['page_number'], center_page) else: left_pos = title_block['page_number'] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 right_pos = -1 # locate in outlines.json if len(self.outline) > 0: for outline_block in self.outline: if left_pos != -1: right_pos = outline_block["page_number"] right_pos = right_pos if right_pos is not None else -1 search_interval.append((left_pos, right_pos)) left_pos = -1 outline_text = outline_block['title'].strip() for keyword in keywords: if keyword in outline_text: if outline_block["page_number"] is not None: left_pos = outline_block["page_number"] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) if necessity_interval is not None: search_interval += necessity_interval # 搜寻区间合并 search_interval.sort() logger.info(f"search_interval: {search_interval} ...") merge_interval = [] if len(search_interval) > 0: left = -1 right = -1 for interval in search_interval: l, r = interval if r < l: continue # 初始化 if left == -1 and right == -1: left = l right = r elif l <= right and r > right: right = r elif l <= right: continue else: merge_interval.append((left, right)) left = l right = r merge_interval.append((left, right)) return merge_interval # 用于定位相关业绩的页面范围 def search_perf_info(self, ): flag = False keywords = ['资格审查资料','资格审查材料'] meta = { "perf_page_number": -1, "qual_page_number": set(), "table": None } # 先从表格数据中查询是否直接提取到相关业绩表信息 for table_block in self.table: page_number = table_block["page_numbers"] table_name = table_block["table_name"] table_name = table_name.strip().replace("\n", "").replace(" ", "") if ('类似' in table_name) and (('项目' in table_name) or ('业绩' in table_name)): flag = True meta["perf_page_number"] = page_number meta["table"] = table_block["table"] break if flag: return meta # 从outlines中模糊匹配 for outline_block in self.outline: page_number = outline_block["page_number"] text = outline_block["title"] text = text.strip().replace("\n", "").replace(" ", "") for keyword in keywords: if keyword in text: qual_page = page_number meta["qual_page_number"].add(qual_page) if ('类似' in text) and (('项目' in text) or ('业绩' in text)): flag = True meta["perf_page_number"] = page_number break if flag: return meta # 从title中模糊匹配 for title_block in self.title: page_number = title_block["page_number"] text = title_block["text"] text = text.strip().replace("\n", "").replace(" ", "") for keyword in keywords: if keyword in text: qual_page = page_number meta["qual_page_number"].add(qual_page) if ('类似' in text) and (('项目' in text) or ('业绩' in text)): flag = True meta["perf_page_number"] = page_number break return meta # 返回可能为营业执照或资质证书的图像集 def find_candidate_images(self): candidate_images = set() merge_intervals = self.search_license_interval() logger.info(f"merge_intervals: {merge_intervals}") for interval in merge_intervals: start_page, end_page = interval if start_page <= self.start_threshold: continue if end_page == -1: end_page = start_page + 20 candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images) candidate_images = list(candidate_images) return candidate_images # 使用正则查询符合格式的图像 def image_regularization(self, start_page: int, end_page:int, candidate_images: set): for index in range(start_page, end_page + 1): current_format = self.image_format.format(index) files = glob.glob(os.path.join(self.image_dir, current_format)) filter_files = [file for file in files if not file.endswith('.unk')] candidate_images.update(filter_files) return candidate_images # 返回可能为营业执照或资质证书的pdf2img图像集 def find_candidate_images_pro(self, necessity_interval=None): scanned_dir = self.pdf2img() candidate_images = set() merge_intervals = self.search_license_interval(necessity_interval=necessity_interval) logger.info(f"merge_intervals: {merge_intervals}") for interval in merge_intervals: start_page, end_page = interval if start_page <= self.start_threshold: continue if end_page == -1: end_page = start_page + 20 for index in range(start_page, end_page + 1): img_path = os.path.join(scanned_dir, f'page-{index}.jpg') processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg') if os.path.exists(img_path) and (not os.path.exists(processed_img_path)): processed_img = remove_red_seal(image_path=img_path) cv2.imwrite(processed_img_path, processed_img) candidate_images.add(img_path) candidate_images.add(processed_img_path) candidate_images = list(candidate_images) return candidate_images # 在表格数据中查询是否提取到投标报价表的数据 def find_bid_quotation_form(self): keywords = ["投标报价总表", "投标报价汇总表"] key_column = '增值税金额' tables = [] flag = False for table_block in self.table: page_number = table_block["page_numbers"] table_name = table_block["table_name"] table_name = table_name.replace(' ', '') # 根据关键词找寻table for keyword in keywords: if keyword in table_name: tables = table_block["table"] flag = True break # 再根据关键列名找寻table if len(tables) == 0: column_num = len(table_block["table"]) cnt = 0 while cnt < column_num: column_list = table_block["table"][cnt] for column_name in column_list: if column_name is not None: column_name = column_name.replace("\n", "").replace(" ", "").strip() if key_column in column_name: tables = table_block["table"] flag = True break if '其中' in column_name: cnt += 1 if (not cnt) or flag: break if flag: break # 当前表格中存在投标报价表的信息 if flag: parsed_table = self.extract_table(table=tables) return page_number, parsed_table # 当前表格中不存在投标报价表的信息 return None # 在表格数据中查询是否提取到拟投入本项目人员配备情况表 or 项目管理机构组成表的数据 def find_itempeople_form(self): keywords = ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表'] flag = False # 标记是否通过table_name查询到表格 meta = { "candidate_page": set(), "table_list": [], } for table_block in self.table: if len(table_block["table"]) == 0: continue page_number = table_block["page_numbers"] table_name = table_block["table_name"] table_name = table_name.strip().replace("\n", "").replace(" ", "") for keyword in keywords: if keyword in table_name: meta["table_list"].append({ "page_number":page_number, "table": table_block["table"] }) flag = True break if flag: return meta column_name_list = table_block["table"][0] for column_name in column_name_list: if column_name is not None: column_name = column_name.strip().replace("\n", "").replace(" ", "") if '职务' in column_name or '职称' in column_name: meta["table_list"].append({ "page_number":page_number, "table": table_block["table"] }) break sec_keywords = ['拟投入本项目人员配备情况表', '项目管理机构', '项目管理机构组成表'] # 在outlines中定位项目管理机构等位置 for outline_block in self.outline: page_number = outline_block["page_number"] text = outline_block["title"] text = text.strip().replace("\n", "").replace(" ", "") for sec_keyword in sec_keywords: if sec_keyword in text: if '.' in text: page = text.split('.')[-1] if page.isdigit(): page = eval(page) else: page = page_number meta["candidate_page"].add(page) # 在titles中定位项目管理机构等位置 for title_block in self.title: page_number = title_block["page_number"] text = title_block["text"] text = text.strip().replace("\n", "").replace(" ", "") for sec_keyword in sec_keywords: if sec_keyword in text: if '.' in text: page = text.split('.')[-1] if page.isdigit(): page = eval(page) else: page = page_number meta["candidate_page"].add(page) return meta # 用于解析提取到的表格信息 def extract_table(self, table): row_num = len(table) if row_num == 0: return [], [] column_num = len(table[0]) new_table = [] # first step: 完善列名 cnt = 0 # 从第一行开始 column_list = [] while len(column_list) < column_num and cnt < row_num: current_column_list = table[cnt] for column_name in current_column_list: column_name = str(column_name).strip().replace("\n", "").replace(" ", "") if (column_name != None) and ('其中' not in column_name) and (column_name not in column_list): column_list.append(column_name) if len(column_list) < column_num: cnt += 1 # second step: 填入表格 new_table.append(column_list) for i in range(cnt + 1, row_num): tmp = [] for j in range(column_num): element = table[i][j] tmp.append(element) new_table.append(tmp) return column_list, new_table # 查询pdf总页数 def count_pages(self): reader = PdfReader(self.file_path) return len(reader.pages) # 用于自动创建pdf->image的scanned文件夹 def pdf2img(self): scanned_dir = os.path.join(self.bid_dir, 'scanned') if os.path.exists(scanned_dir): logger.info(f"检测到当前投标文件{self.bid_dir}存在扫描文件夹 ...") else: os.makedirs(scanned_dir, exist_ok=True) logger.info(f"开始转换pdf2img页面") convert_start_time = time.time() try: images = convert_from_path(pdf_path=self.document) for i, image in enumerate(images): image.save(os.path.join(scanned_dir, f'page-{i}.jpg'), 'JPEG') logger.info("convert successfully !") except subprocess.CalledProcessError as e: logger.info(f"convert failure: {e}") convert_cost_time = time.time() - convert_start_time logger.info(f"转化pdf2img花费{convert_cost_time // 60} min {convert_cost_time % 60} sec ...") return scanned_dir class PdfParse_pipeline(): def __init__(self, ocr, # ocr接口 firm_dir, # 存储所有公司的路径 out_path, # 输出地址 ): self.ocr = ocr self.firm_dir = firm_dir self.out_path = out_path def parse_pipeline(self): data = {} for firm_name in tqdm(os.listdir(self.firm_dir)): logger.info(f'processing firm {firm_name} ...') firm_path = os.path.join(self.firm_dir, firm_name) for bid_name in tqdm(os.listdir(firm_path)): if bid_name.endswith('.pdf'): document=os.path.join(firm_path, bid_name) bid_dir = os.path.join(firm_path, bid_name[:-4]) os.makedirs(bid_dir, exist_ok=True) document_data = self.parse_single_document(pdf_path=document) data[firm_name] = document_data # 以下将data的数据存入out_path with open(self.out_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) return data def parse_single_document(self, pdf_path: str): agent = PdfMatcher(file_path=pdf_path) firm_name = agent.firm_name total_pages = agent.total_pages data = { "necessity_interval": [], # 投标函中是否有签字 or 盖章 "has_signature_or_seal": False, "formatting_img": None, # 资质证书 & 营业执照信息 "license_list":[], # 投标报价汇总表 "bid_form": None, # 相关业绩表 "perf_info": [], # 项目经理相关信息 "manager": [], "kw_meta": {} } logger.info("start finding the kw info in directory ...") kw_meta = self.find_kw_from_dc(agent=agent, data=data, total_pages=total_pages) logger.info("start processing the nextiter information ...") # iter = self.parse_nextiter(agent=agent, data=data, total_pages=total_pages) # for signature or seal logger.info("start judging the signature & seal information ...") # self.parse_bid(agent=agent, data=data, total_pages=total_pages) # for license_list logger.info("start finding license information ...") # self.parse_license(agent=agent, data=data, iter=iter, firm_name=firm_name) # for bid_form logger.info("start finding bid form ...") # self.parse_bid_form(agent=agent, data=data) # for perf information logger.info("start finding perf information ...") # self.parse_perf(agent=agent, data=data) # for manager logger.info("start finding manager information ...") self.parse_manager(agent=agent, data=data, kw_meta=kw_meta["manager"]) return data # 从目录中查询是否存在关键词以及该关键字对应页码 def find_kw_from_dc(self, agent, data, total_pages): meta = {} keywords = { "manager": ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表'] } # 初始化 for kw in keywords: meta[kw] = [] scanned_dir = agent.pdf2img() # 目录一般位于前20页 start = 0 end = 20 if total_pages > 20 else total_pages is_enter = False for index in range(start, end): logger.info(f"find kw from index {index} ...") img_path = os.path.join(scanned_dir, f'page-{index}.jpg') processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg') # 去除红章 if not os.path.exists(processed_img_path): processed_img = remove_red_seal(image_path=img_path) cv2.imwrite(processed_img_path, processed_img) # 对处理过红章的页面进行ocr content = self.ocr.get_content(image_path=processed_img_path) image_info = content["rawjson"]["ret"] if not is_enter and self.ocr.search(image_info, '目录'): # 当前为目录页面首页,标记is_enter is_enter = True # 已经进入目录页面 if is_enter: # 整体搜寻关键字 for kw, elements in keywords.items(): pack_info = self.ocr.pack_search(image_info=image_info, key_list=elements) logger.info(pack_info) # 找出对应数值标签 if len(pack_info) > 0: for info in pack_info: word = info["word"] contain_key = info["contain_key"] pos = info["bbox"] # 如果word中包含了页码 if word[-1].isdigit(): label_page = word.split('.')[-1] meta[kw].append( { "element": contain_key, "word": word, "label_page": label_page } ) else: meta[kw].append( { "element": contain_key, "word": word, "label_page": self.ocr.digit_label(image_info=image_info, pos=pos) } ) data["kw_meta"] = meta return meta def parse_nextiter(self, agent, data, total_pages): # 目录一般都会带有关键字:目录 keyword = '目录' # 需要定位下一章的关键字 iter_keywords = { '1': ['资格审查资料', '资格审查材料'], '2': ['其他材料', '其它材料', '其他资料', '其它资料'], '3': ['附件'], '4': ['影印件'] } index_keywords = { '1': ['一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、'], '2': ['一章', '二章', '三章', '四章', '五章', '六章', '七章', '八章', '九章', '十章'] } # 找寻下一层级 def find_next(current_index): logger.info(f"processing current_index: {current_index}") cycle = { "一": "二", "二": "三", "三": "四", "四": "五", "五": "六", "六": "七", "七": "八", "八": "九", "九": "十", "十": "二", } if current_index.isdigit(): next_index = str(eval(current_index) + 1) return next_index next_index = "" # 涉及进位 if len(current_index) == 1: if current_index in cycle.keys(): if current_index == "十": next_index = "十一" else: next_index = cycle[current_index] else: raise ValueError(f"筛选current index {current_index} 有误 ...") return next_index if current_index[-1] == '九': if current_index[0] in cycle.keys(): next_index = cycle[current_index[0]] + '十' else: return "" elif current_index[-1] == '十': next_index = current_index + '一' else: if current_index[-1] in cycle.keys(): next_index = current_index[:-1] + cycle[current_index[-1]] else: return "" return next_index # 用于提取字符串的当前层级,并返回下一层级 def refine(string: str): digit_keywords = "123456789一二三四五六七八九十" string = string.strip().replace(' ', '').replace('(', '').replace(')', '').replace('(', '').replace(')', '') flag = False for digit_kw in digit_keywords: if digit_kw in string: flag = True if not flag: return "" if '、' in string and '章' in string: index_string = string.split('、')[0] current_index = "" next_index = "" is_start = False for c in index_string: if c == "第": is_start = True elif (not is_start) and c in digit_keywords: is_start = True current_index += c elif c == "章": next_index = find_next(current_index) elif is_start and c in digit_keywords: current_index += c return next_index if '、' in string: index_string = string.split('、')[0] next_index = find_next(index_string) return next_index if '章' in string and '第' in string: l = string.find('第') r = string.find('章') index_string = string[l+1:r] next_index = find_next(index_string) return next_index return "" # 传入当前keyword的bounding box,返回其对应的index def find_ocr_index(image_info, bbox: dict): meta = {} candidate_distance = 10000 candidate_word = "" keywords = "123456789一二三四五六七八九十" match_left = bbox['left'] match_right = bbox['right'] match_top = bbox['top'] match_bottom = bbox['bottom'] for info in image_info: word = info['word'].replace(' ', '') left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] right = left + width bottom = top + height for keyword in keywords: if keyword in word and left < match_left and right < match_right: distance = abs(top - match_top) if distance < candidate_distance: candidate_word = word candidate_distance = distance meta["candidate_word"] = candidate_word meta["candidate_distance"] = candidate_distance return meta iter = [] scanned_dir = agent.pdf2img() # 目录一般位于前20页 start = 0 end = 20 if total_pages > 20 else total_pages is_enter = False for index in range(start, end): img_path = os.path.join(scanned_dir, f'page-{index}.jpg') processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg') # 去除红章 if not os.path.exists(processed_img_path): processed_img = remove_red_seal(image_path=img_path) cv2.imwrite(processed_img_path, processed_img) # 对处理过红章的页面进行ocr content = self.ocr.get_content(image_path=processed_img_path) image_info = content["rawjson"]["ret"] if not is_enter and self.ocr.search(image_info, keyword): # 当前为目录页面首页,标记is_enter is_enter = True # 已经进入目录页面 if is_enter: for id, cover_keywords in iter_keywords.items(): meta = self.ocr.pack_search(image_info, cover_keywords) if len(meta) == 0: continue for meta_info in meta: word = meta_info['word'] logger.info(f"processing iter word: {word}") contain_key = meta_info['contain_key'] bbox = meta_info['bbox'] # 查看word所对应序列号 # check word first if '、' in word or ('章' in word and '第' in word): next_index = refine(word) if next_index != "": iter.append({ "current_key": contain_key, "next_index": next_index }) else: # check ocr second meta = find_ocr_index(image_info, bbox) candidate_word = meta["candidate_word"] next_index = refine(candidate_word) iter.append({ "current_key": contain_key, "next_index": next_index }) data["iter"] = iter return iter def parse_bid(self, agent, data, total_pages): # TODO 由于投标函主要出现在前30页,暂时只搜寻前30页 start_page = 0 end_page = 30 if total_pages > 30 else total_pages scanned_dir = agent.pdf2img() key_list = ['一、投标函及投标函附录', '1投标函及投标函附录', '1、投标函及投标函附录', '投标函及投标函附录', '投标函', '一、投标函', '1.投标函', '1投标函', '一投标函', '(一)投标函', '(一)投标函', '(一)、投标函', '(一)、投标函'] for index in range(start_page, end_page + 1): img_path = os.path.join(scanned_dir, f'page-{index}.jpg') # 先判断该页内容是否为投标函 content = self.ocr.get_content(image_path=img_path) image_info = content["rawjson"]["ret"] kw_search_meta = self.ocr.exact_search(image_info, key_list) kw_search_res = self.ocr.font_judge(kw_search_meta) ol_search_res = self.ocr.search(image_info, ['目录']) if (not kw_search_res) or ol_search_res: continue result = self.ocr.signature_recognition(image_path=img_path) if result: data["has_signature_or_seal"] = True data["formatting_img"] = img_path return def parse_license(self, agent, iter, data, firm_name): # 先找寻contain_key的page,再找寻next_index的page necessity_interval = [] # 遍历得到的每一个上下章 for unit_iter in iter: contain_key = unit_iter["current_key"] next_index = unit_iter["next_index"] kw_title_meta = agent.search_in_title(contain_key) iter_title_meta = agent.search_in_title(next_index, digit_limit=True) left = 10000 right = -1 left_kw = "" right_kw = "" # 先确定right page if len(iter_title_meta) == 0: right = agent.total_pages else: for iter_meta in iter_title_meta: page_number = iter_meta["page_number"] iter_text = iter_meta["text"] if page_number < 20: continue else: if page_number > right: right = page_number right_kw = iter_text if right == -1: right = agent.total_pages # 再确定left page if len(kw_title_meta) == 0: continue else: for kw_meta in kw_title_meta: page_number = kw_meta["page_number"] title_text = kw_meta["text"] if page_number < 20 or page_number > right: continue else: if page_number < left: left = page_number left_kw = title_text if left == 10000: continue necessity_interval.append((left, right)) data["necessity_interval"].append( { "left_kw": left_kw, "right_kw": right_kw, "left_page": left, "right_page": right } ) candidate_images = agent.find_candidate_images_pro(necessity_interval=necessity_interval) # candidate_images = agent.find_candidate_images() logger.info(candidate_images) # import pdb; pdb.set_trace() if len(candidate_images) == 0: scanned_dir = agent.pdf2img() for index in range(0, agent.total_pages): img_path = os.path.join(scanned_dir, f'page-{index}.jpg') processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg') if not os.path.exists(processed_img_path): processed_img = remove_red_seal(image_path=img_path) cv2.imwrite(processed_img_path, processed_img) try: response = self.ocr.judge_pro(image_path=processed_img_path, firm_name=firm_name) if response == None or response['qualtified'] == None: continue else: data["license_list"].append({ "license_name": response["license_name"], "license_path": img_path, "license_page": response["license_page"], "start_datetime": response["start_datetime"], "end_datetime": response["end_datetime"] }) except ValueError as e: print(e) else: for img in candidate_images: try: response = self.ocr.judge_pro(image_path=img, firm_name=firm_name) if response == None or response['qualtified'] == None: continue else: data["license_list"].append({ "license_name": response["license_name"], "license_path": img, "license_page": response["license_page"], "start_datetime": response["start_datetime"], "end_datetime": response["end_datetime"] }) except ValueError as e: print(e) def parse_bid_form(self, agent, data): result = agent.find_bid_quotation_form() if result is None: # 先转扫描件 scanned_dir = agent.pdf2img() key_column = '增值税金额' img_list = glob.glob(os.path.join(scanned_dir, '*.jpg')) for img_prefix in img_list: img_name = os.path.basename(img_prefix) if ('roi' in img_name) or ('ink' in img_name): continue img_index = int(img_name.split('-')[1].split('.')[0]) if img_index > 50: continue img_path = os.path.join(scanned_dir, img_name) #TODO 添加对"投标报价汇总表"字样的ocr辅助 expectation = self.ocr.table_parse(image_path=img_path, save_folder=scanned_dir) content = self.ocr.get_content(image_path=img_path) image_info = content["rawjson"]["ret"] kw_res = self.ocr.search(image_info=image_info, key_list=['投标报价汇总表']) table_list = expectation['table']['content'] if len(table_list) > 0: for table in table_list: column_list, parsed_table = agent.extract_table(table=table) for column_name in column_list: if key_column in column_name: data["bid_form"] = { "page": [img_index], "table": parsed_table } return if kw_res: data["bid_form"] = { "page": [img_index] } else: page_number, target_table = result data["bid_form"] = { "page": page_number, "table": target_table } def parse_perf(self, agent, data): perf_meta = agent.search_perf_info() # import pdb; pdb.set_trace() if perf_meta["table"] is not None: data["perf_info"].append({ "perf_page": perf_meta["perf_page_number"], "perf_table": perf_meta["table"] }) else: center_page = 0 if perf_meta["perf_page_number"] != -1: center_page = perf_meta["perf_page_number"] if len(perf_meta["qual_page_number"]) > 0: tmp = 10000 for candidate_page in perf_meta["qual_page_number"]: if candidate_page > agent.start_threshold: tmp = min(tmp, candidate_page) center_page = min(center_page, tmp) scanned_dir = agent.pdf2img() img_list = glob.glob(os.path.join(scanned_dir, 'page-*.jpg')) for img_prefix in img_list: img_name = os.path.basename(img_prefix) if ('roi' in img_name) or ('ink' in img_name): continue img_index = int(img_name.split('-')[1].split('.')[0]) if img_index >= center_page: img_path = os.path.join(scanned_dir, img_name) # 1st step: 移除红色印章 processed_path = os.path.join(scanned_dir, f'page-{img_index}_red_roi.jpg') processed_folder = os.path.join(scanned_dir, 'processed') os.makedirs(processed_folder, exist_ok=True) if not os.path.exists(processed_path): processed_img = remove_red_seal(img_path) cv2.imwrite(processed_path, processed_img) # 2nd step: 调用ocr搜寻关键字 content = self.ocr.get_content(image_path=processed_path) image_info = content["rawjson"]["ret"] if self.ocr.search(image_info, ['类似']): # 3rd step: 识别表格 expectation = self.ocr.table_parse(image_path=processed_path, save_folder=processed_folder) table_list = expectation['table']['content'] data["perf_info"].append({ "perf_page": img_index + 1, "perf_table": table_list }) def parse_manager(self, agent, data, kw_meta=None): keywords = ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表', '职务', '职称'] meta = agent.find_itempeople_form() if len(meta["table_list"]) > 0: # 找到类似表格 data["manager"] = meta["table_list"] else: candidate_page_set = meta["candidate_page"] if len(candidate_page_set) == 0 and (kw_meta is None or len(kw_meta) == 0): logger.info("查询候选项目经理为空, 开始进行全文档搜索") scanned_dir = agent.pdf2img() for index in range(0, agent.total_pages): raw_page = os.path.join(scanned_dir, f'page-{index}.jpg') processed_page = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg') if not os.path.exists(processed_page): processed_img = remove_red_seal(image_path=raw_page) cv2.imwrite(processed_page, processed_img) # 对处理过红章的页面进行ocr content = self.ocr.get_content(image_path=processed_page) image_info = content["rawjson"]["ret"] if self.ocr.search(image_info, keywords): expectation = self.ocr.table_parse(image_path=processed_page, save_folder=scanned_dir) table_list = expectation['table']['content'] if len(table_list) > 0: for table in table_list: column_list, parsed_table = agent.extract_table(table=table) for column_name in column_list: if '职称' in column_name or '职务' in column_name: data["manager"].append(parsed_table) else: spread_set = set() # from candidate_page_set for candidate_page in candidate_page_set: cnt = 0 while cnt <= 20 and candidate_page + cnt < agent.total_pages: spread_set.add(candidate_page + cnt) cnt += 1 # from meta if kw_meta is not None and len(kw_meta) > 0: for unit_meta in kw_meta: label_page = unit_meta["label_page"] if label_page.isdigit(): label_page = int(label_page) cnt = -5 while cnt <= 5 and label_page + cnt < agent.total_pages: spread_set.add(label_page + cnt) cnt += 1 # 给每一个候选图片20区域范围 scanned_dir = agent.pdf2img() for candidate_img in spread_set: candidate_path = os.path.join(scanned_dir, f'page-{candidate_img}.jpg') expectation = self.ocr.table_parse(image_path=candidate_path, save_folder=scanned_dir) table_list = expectation['table']['content'] if len(table_list) > 0: for table in table_list: column_list, parsed_table = agent.extract_table(table=table) for column_name in column_list: if '职称' in column_name or '职务' in column_name: data["manager"].append(parsed_table) if __name__ == "__main__": # [测试demo] start_time = time.time() # 请针对自己的环境进行修改log_path global logger firm_list = ['太原重工'] # firm_list = ['湖北海光'] for firm in firm_list: log_path = f"/home/stf/miner_pdf/interface/test_outdir/manager_test/test_{firm}.log" logger = create_logger(log_path=log_path) # [环境参数] # ocr url url = "http://120.48.103.13:18000/ctr_ocr" # seal_ocr url base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token=" # seal_ocr access_token access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608" # seal request url seal_url = base_url + access_token # seal_ocr headers headers = {'content-type': 'application/x-www-form-urlencoded'} # data_path为存储所有投标公司的起始路径 data_path = "/home/stf/miner_pdf/data/投标公司pdf" # test_data_path为存储测试投标公司的起始路径 test_data_path = "/home/stf/miner_pdf/interface/test_files" # test_out_path存储目前优化代码的测试结果!!! test_out_path = "/home/stf/miner_pdf/interface/outdir/test_out.json" unit_data_path = f"/home/stf/miner_pdf/interface/unit_test/{firm}" # unit_out_path = f"/home/stf/miner_pdf/interface/outdir/unit_{firm}.json" unit_out_path = f"/home/stf/miner_pdf/interface/test_outdir/manager_test/unit_{firm}.json" # pipeline_out_path为执行所有公司pipeline逻辑后的输出位置 # 其为存放营业执照和资质证书位置信息的json文件 pipeline_out_path = "/home/stf/miner_pdf/interface/outdir/test_pipeline.json" # single_out_path为执行单个公司pdf解析逻辑后的输出位置 # 其为存放营业执照和资质证书位置信息的json文件 single_out_path = "/home/stf/miner_pdf/interface/outdir/test_single.json" # ground_truth目前为存储所有非扫描公司在pdf中营业执照与资质证书的json文件 ground_truth = "/home/stf/miner_pdf/ground_truth.json" # 用于区分该公司提供的pdf文件为(扫描件 or 非扫描件) firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx" df = pd.read_excel(firm_excel_file) # 封装好的ocr接口 ocr = OcrAgent(url=url) ocr.integrate_sealagent( url=seal_url, headers=headers ) # 封装好的pipeline pipeline = PdfParse_pipeline( ocr=ocr, firm_dir=unit_data_path, out_path=unit_out_path, ) # start data = pipeline.parse_pipeline() # caculate time cost cost_time = time.time() - start_time logger.info(f"processing {len(data)} documents, total cost {cost_time // 60} min {cost_time % 60} sec ...")