### 解析所有pdf文件并提取信息进行测试的框架 ### PdfExtractAttr作为提取pdf信息的基类 # 子类在其基础上实现匹配功能 # 标准包导入 import os import re import json import re import shutil import pandas as pd import pdb import base64 from io import BytesIO from pprint import pprint # 第三方包导入 import numpy as np import pandas as pd import cv2 import torch import glob import logging import requests import time import datetime from tqdm import tqdm from tools import RefPageNumberResolver from get_info import PdfExtractAttr from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json from PIL import Image from pdfminer.image import ImageWriter # tools function def create_logger(log_path): """ 将日志输出到日志文件和控制台 """ logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s') # 创建一个handler,用于写入日志文件 file_handler = logging.FileHandler( filename=log_path, mode='w') file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) # 创建一个handler,用于将日志输出到控制台 console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(formatter) logger.addHandler(console) return logger # ocr外部接口 class OcrAgent(): def __init__(self, url): self.url = url self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)' # 不同类型证书资质正则 self.re_dict = { "business_license" : r'营业执照', "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$', "production_license": r'\b[\u4e00-\u9fff]*许可证\b', "qualtifications" : r'\b[\u4e00-\u9fff]*证书', "proof": r'\b[\u4e00-\u9fff]*证明', } # 字迹阈值 self.sign_threshold = 0.05 # 获取图像的ocr信息 def get_content(self, image_path): try: with open(image_path, 'rb') as image_file: files = {"file": ("image.jpg", image_file, "image/jpeg")} response = requests.post(self.url, files=files) return response.json() except: raise ValueError(f"传入图像{image_path}已损坏") # 移除图像上的红色印章 def remove_red_seal(self, image_path): # 读取图像 input_img = cv2.imread(image_path) # 分离图片的通道 blue_c, green_c, red_c = cv2.split(input_img) #利用大津法自动选择阈值 thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_OTSU) #对阈值进行调整 filter_condition = int(thresh * 1.0) #移除红色的印章 _, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY) # 把图片转回3通道 result_img = np.expand_dims(red_thresh, axis=2) result_img = np.concatenate((result_img, result_img, result_img), axis=-1) return result_img # 判断图像是否为某公司的营业执照或资质证书信息,并返回提取到的信息 def judge(self, image_path: str, firm_name: str): # 以下实现要求image_path的路径如下例所示: # ./test/image_page_12_0.jpg # 12代表当前图像在pdf中的第12页 # 0代表当前图像为该页提取的第1张图像 image_prefix = image_path.split('/')[-1] logger.info(f'processing img: {image_prefix}') page_number = image_prefix.split('_')[-2] response_item = { "qualtified": None, # 是否为证书 "matched": None, # 是否出现匹配的公司名称 "license_name": None, # 证书名 "license_page": page_number, # 证书所在页 "start_datetime": None, # 有效起始时间 "end_datetime": None # 有效终止时间 } content = self.get_content(image_path=image_path) image_info = content["rawjson"]["ret"] # 必须包含公司名称信息 if not self.search(image_info=image_info, key=firm_name): return None else: response_item['matched'] = True # 是否匹配营业执照或资质证书 for key, format in self.re_dict.items(): if key == 'business_license': match_name = self.re_match(image_info=image_info, format=format) else: match_name = self.re_search(image_info=image_info, format=format) if match_name and key == 'business_license': response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_license_datetime(image_info=image_info, response_item=response_item) return response_item elif match_name: response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item) return response_item return response_item # 资质证书有效期定位 def find_certificate_datetime(self, image_info, response_item): # keyword start_keywords = ['颁发日期', '发证日期', '生效日期'] end_keywords = ['终止日期'] priority_keywords = ['有效期', '使用期限', '有效日期'] keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期'] # re format format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?' special_format = r'\d{4}-\d{1,2}-\d{1,2}' # 判断是否存在日期关键字 flag = False keyword_dict = {} for info in image_info: word = info['word'] left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] for keyword in keywords_list: # 该证书存在日期关键字 if keyword in word: flag = True charset_list = info['charset'] for char_dc in charset_list: if char_dc['word'] == keyword[-1]: right = char_dc['rect']['left'] + char_dc['rect']['width'] keyword_dict[keyword] = { "left": left, "top": top, "right": right } if flag: for info in image_info: word = info['word'] if '年' in word or re.search(r'\d', word): left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] if '年' in word: find_list = re.findall(pattern=format, string=word) else: find_list = re.findall(pattern=special_format, string=word) # logger.info(f'word {word} has find_list{find_list}') # if self.check: # pdb.set_trace() if len(find_list) == 1: find_string = find_list[0] if '至' in find_string: start_prefix = find_string.split('至')[0].replace('自', '') end_prefix = find_string.split('至')[-1] if '年' in start_prefix: response_item['start_datetime'] = start_prefix if end_prefix != '': response_item['end_datetime'] = end_prefix return response_item # 不存在{至}的情况下通过位置和已有期限关键字来分配日期 else: for k, k_info in keyword_dict.items(): k_left = k_info['left'] k_right = k_info['right'] k_top = k_info['top'] # 捕获关键字 if left == k_left: if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None: response_item['end_datetime'] = find_string elif k in start_keywords and response_item['start_datetime'] is None: response_item['start_datetime'] = find_string break elif left >= k_right and top >= k_top: if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None: response_item['end_datetime'] = find_string elif k in start_keywords and response_item['start_datetime'] is None: response_item['start_datetime'] = find_string elif len(find_list) == 2: start_prefix = find_list[0].replace('自', '') end_prefix = find_list[-1].replace('至', '') if response_item['start_datetime'] is None: response_item['start_datetime'] = start_prefix if response_item['end_datetime'] is None: response_item['end_datetime'] = end_prefix else: logger.info(f'wrong word: {word} ...') else: continue return response_item # 营业执照有效期定位 def find_license_datetime(self, image_info, response_item): for info in image_info: word = info['word'] # id if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20): response_item['id'] = word if word.isdigit() else word[5:] elif bool(re.match(self.datetime_re, word)): split = word.split('至') start_datetime = split[0] end_datetime = split[-1] response_item['start_datetime'] = start_datetime response_item['end_datetime'] = end_datetime elif word == '长期': response_item['start_datetime'] = response_item['end_datetime'] = '长期' return response_item # 在image_info中搜寻word中包含key的内容 def search(self, image_info, key): for info in image_info: word = info['word'] if key in word: return True return False # 在image_info中使用re.search搜寻满足{format}正则的信息 def re_search(self, image_info, format): for info in image_info: word = info['word'] match = re.search(format, word) if match: return match.group(0) return False # 在image_info中使用re.match搜寻满足{format}正则的信息 def re_match(self, image_info, format): for info in image_info: word = info['word'] match = re.match(format, word) if match: return word return False # 用于识别固定位置是否有公司法人签名 def signature_recognition(self, image_path: str): keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:'] key_pos = {} image_prefix = image_path.split('/')[0] image_name = image_path.split('/')[-1][:-4] removed_image_name = image_name + '_roi' + image_path.split('/')[-1][-4:] ink_image_name = image_name + '_ink' + image_path.split('/')[-1][-4:] removed_image_path = os.path.join(image_prefix, removed_image_name) ink_image_path = os.path.join(image_prefix, ink_image_name) if not os.path.exists(removed_image_path): removed_seal_img = self.remove_red_seal(image_path=image_path) cv2.imwrite(removed_image_name, removed_seal_img) content = self.get_content(image_path=removed_image_path) image_info = content["rawjson"]["ret"] for info in image_info: word = info['word'] left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] right = left + width bottom = top + height for keyword in keywords: if keyword in word: key_pos[keyword] = { "word": word, "left": left, "right": right, "top": top, "bottom": bottom } break # 如果不存在"投标函"、"法定代表人"等关键字,则返回False if len(key_pos) == 0: return False # 定位到法定代表人所在位置 if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \ ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)): if key_pos.get('法定代表人或其委托代理人:') is not None: l_info = key_pos['法定代表人或其委托代理人:'] l_cnt = 13 l_string = '法定代表人或其委托代理人:' else: l_info = key_pos['法定代表人:'] l_cnt = 6 l_string = '法定代表人:' if key_pos.get('(法定代表人CA电子印章)') is not None: r_info = key_pos['(法定代表人CA电子印章)'] r_string = '(法定代表人CA电子印章)' elif key_pos.get('(法定代表人CA电子印章或签字)') is not None: r_info = key_pos['(法定代表人CA电子印章或签字)'] r_string = '(法定代表人CA电子印章或签字)' else: r_info = key_pos['(签字)'] r_string = '(签字)' # 此时签名应在两者之间 l = l_info['right'] l_word = l_info['word'] r = r_info['left'] r_word = r_info['word'] t = max(l_info['top'], r_info['top']) b = min(l_info['bottom'], r_info['bottom']) - 5 if l_word[-l_cnt:] != l_string or r_word != r_string: return True else: black_ratio = self.ink_recognition( input_img=removed_seal_img, out_path=ink_image_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None): # 此时签名应已包含 if key_pos.get('(法定代表人CA电子印章)') is not None: key = key_pos['(法定代表人CA电子印章)'] elif key_pos.get('(法定代表人CA电子印章或签字)') is not None: key = key_pos['(法定代表人CA电子印章或签字)'] elif key_pos.get('(签字)') is not None: key = key_pos['(签字)'] key_word = key['word'] key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '') if key_word != '': return True return False elif key_pos.get('法定代表人:') is not None: # 此时签名在右边或已包含 word = key_pos['法定代表人:']['word'] l = key_pos['法定代表人:']['left'] r = l + 100 t = key_pos['法定代表人:']['top'] b = key_pos['法定代表人:']['bottom'] - 5 if word[-6:] != '法定代表人:': return True else: black_ratio = self.ink_recognition( input_img=removed_seal_img, out_path=ink_image_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False elif key_pos.get('法定代表人或其委托代理人:') is not None: # 此时签名在右边或已包含 word = key_pos['法定代表人或其委托代理人:']['word'] l = key_pos['法定代表人或其委托代理人:']['left'] r = l + 100 t = key_pos['法定代表人或其委托代理人:']['top'] b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5 if word[-13:] != '法定代表人或其委托代理人:': return True else: black_ratio = self.ink_recognition( input_img=removed_seal_img, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False else: return False # 用于判断固定位置的长方形框内是否存在签名字迹 # 用于识别图像固定位置黑色字迹所占比例,并将该位置的图像截取保存 def ink_recognition(self, input_img, out_path, meta: dict): left = meta["left"] right = meta["right"] top = meta["top"] bottom = meta["bottom"] crop_img = input_img[top:bottom, left:right, :] cv2.imwrite(out_path, crop_img) gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY) thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU) filter_condition = int(thresh * 0.90) _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV) total_pixels = black_thresh.size black_pixels = np.count_nonzero(black_thresh) black_ratio = black_pixels / total_pixels return black_ratio # 提供pdf解析,并基于提取文本信息进行位置匹配 class PdfMatcher(PdfExtractAttr): # file_path为提供的pdf文件路径 def __init__(self, file_path: str): super(PdfMatcher, self).__init__( file_path=file_path ) # 投标书名称 self.bid_name = file_path.split('/')[-1][:-4] # 投标书数据文件夹 self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name) # 公司名称 self.firm_name = file_path.split('/')[-2] # title list title_path = os.path.join(self.bid_dir, "title.json") # image list self.image_dir = os.path.join(self.bid_dir, "extracted_images") if (not os.path.exists(title_path)) or (not os.path.exists(self.image_dir)): os.makedirs(self.image_dir, exist_ok=True) self.main_parse(pdf_path=file_path, title_path=title_path, image_dir=self.image_dir) self.title = load_json(title_path) # outline list outline_path = os.path.join(self.bid_dir, "outlines.json") self.outline = self.parse_outline(out_path=outline_path) # text list text_path = os.path.join(self.bid_dir, "all_texts.json") self.details = self.parse_text(out_path=text_path) # table list table_path = os.path.join(self.bid_dir, "all_tables.json") if os.path.exists(table_path): self.table = load_json(table_path) else: self.tables = self.parse_table(out_path=table_path) # image format self.image_format = "image_page_{}*" # image filter threshold self.start_threshold = 10 self.distance_threshold = 6 self.search_threshold = 20 # 用于定位营业执照、资质证书的页面范围 def search_interval(self): '''定位营业执照、资质证书的区间范围''' # 通过关键字模糊定位 keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件'] search_interval = [] # locate in title.json left_pos = -1 # 左指针 right_pos = -1 # 右指针 for title_block in self.title: block_text = title_block['text'].replace(' ', '').strip() # 先进行左区间判定 if left_pos != -1 and '证书' not in block_text: right_pos = title_block['page_number'] search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 for keyword in keywords: if keyword in block_text: # 先进行模糊的outline定位 center_page = None if '.' in block_text: center_page = block_text.split('.')[-1] if center_page.isdigit(): center_page = eval(center_page) left_pos = min(title_block['page_number'], center_page) else: left_pos = title_block['page_number'] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 right_pos = -1 # locate in outlines.json if len(self.outline) > 0: for outline_block in self.outline: if left_pos != -1: right_pos = outline_block["page_number"] right_pos = right_pos if right_pos is not None else -1 search_interval.append((left_pos, right_pos)) left_pos = -1 outline_text = outline_block['title'].strip() for keyword in keywords: if keyword in outline_text: if outline_block["page_number"] is not None: left_pos = outline_block["page_number"] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 搜寻区间合并 search_interval.sort() merge_interval = [] if len(search_interval) > 0: left = -1 right = -1 for interval in search_interval: l, r = interval if r < l: continue if left == -1 and right == -1: left = l right = r elif l <= right: right = r else: merge_interval.append((left, right)) left = l right = r merge_interval.append((left, right)) return merge_interval # 返回可能为营业执照或资质证书的图像集 def find_candidate_images(self): candidate_images = set() merge_intervals = self.search_interval() for interval in merge_intervals: start_page, end_page = interval if start_page <= self.start_threshold: continue if end_page == -1: end_page = start_page + 20 candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images) candidate_images = list(candidate_images) return candidate_images # 使用正则查询符合格式的图像 def image_regularization(self, start_page: int, end_page:int, candidate_images: set): for index in range(start_page, end_page + 1): current_format = self.image_format.format(index) files = glob.glob(os.path.join(self.image_dir, current_format)) filter_files = [file for file in files if not file.endswith('.unk')] candidate_images.update(filter_files) return candidate_images class PdfParse_pipeline(): def __init__(self, ocr, # ocr接口 firm_dir, # 存储所有公司的路径 out_path # 输出地址 ): self.ocr = ocr self.firm_dir = firm_dir self.out_path = out_path def parse_pipeline(self): data = {} for firm_name in tqdm(os.listdir(self.firm_dir)): logger.info(f'processing firm {firm_name} ...') firm_path = os.path.join(self.firm_dir, firm_name) for bid_name in tqdm(os.listdir(firm_path)): if bid_name.endswith('.pdf'): document=os.path.join(firm_path, bid_name) bid_dir = os.path.join(firm_path, bid_name[:-4]) os.makedirs(bid_dir, exist_ok=True) document_data = self.parse_single_document(pdf_path=document) data[firm_name] = document_data # 以下将data的数据存入out_path with open(self.out_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) return data def parse_single_document(self, pdf_path: str): agent = PdfMatcher(file_path=pdf_path) firm_name = agent.firm_name data = { "license_list":[] } candidate_images = agent.find_candidate_images() if len(candidate_images) == 0: pass else: for img in candidate_images: try: response = ocr.judge(image_path=img, firm_name=firm_name) if response == None or response['qualtified'] == None: continue else: data["license_list"].append({ "license_name": response["license_name"], "license_path": img, "license_page": response["license_page"], "start_datetime": response["start_datetime"], "end_datetime": response["end_datetime"] }) except ValueError as e: print(e) return data if __name__ == "__main__": # [测试demo] start_time = time.time() # 请针对自己的环境进行修改log_path global logger log_path = "/home/stf/miner_pdf/interface/test_logs/info.log" logger = create_logger(log_path=log_path) # [环境参数] # ocr url url = "http://120.48.103.13:18000/ctr_ocr" # seal_ocr url base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token=" # seal_ocr access_token access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608" # seal_ocr headers headers = {'content-type': 'application/x-www-form-urlencoded'} # data_path为存储所有投标公司的起始路径 data_path = "/home/stf/miner_pdf/data/投标公司pdf" # test_data_path为存储测试投标公司的起始路径 test_data_path = "/home/stf/miner_pdf/interface/test_files" # pipeline_out_path为执行所有公司pipeline逻辑后的输出位置 # 其为存放营业执照和资质证书位置信息的json文件 pipeline_out_path = "/home/stf/miner_pdf/interface/outdir/test_pipeline.json" # single_out_path为执行单个公司pdf解析逻辑后的输出位置 # 其为存放营业执照和资质证书位置信息的json文件 single_out_path = "/home/stf/miner_pdf/interface/outdir/test_single.json" # ground_truth目前为存储所有非扫描公司在pdf中营业执照与资质证书的json文件 ground_truth = "/home/stf/miner_pdf/ground_truth.json" # 用于区分该公司提供的pdf文件为(扫描件 or 非扫描件) firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx" df = pd.read_excel(firm_excel_file) # 封装好的ocr接口 ocr = OcrAgent(url=url) # 封装好的pipeline pipeline = PdfParse_pipeline( ocr=ocr, firm_dir=test_data_path, out_path=single_out_path ) # start data = pipeline.parse_pipeline() # caculate time cost cost_time = time.time() - start_time logger.info(f"total cost {cost_time // 60} min {cost_time % 60} sec ...")