### 解析所有pdf文件并提取信息进行测试的框架 ### PdfExtractAttr作为提取pdf信息的基类 # 子类在其基础上实现匹配功能 # 标准包导入 import os import re import json import re import shutil import pandas as pd import pdb import base64 from io import BytesIO from pprint import pprint # 第三方包导入 import numpy as np import pandas as pd import cv2 import torch import glob import logging import requests import time import datetime from tqdm import tqdm from tools import RefPageNumberResolver from get_info import PdfExtractAttr from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json from PIL import Image import cn_clip.clip as clip from cn_clip.clip import load_from_name, available_models from pdfminer.image import ImageWriter # global envs device = "cuda" if torch.cuda.is_available() else "cpu" clip_version = "ViT-B-16" model, preprocess = load_from_name(clip_version) model.eval() log_path = "/home/stf/miner_pdf/info.log" # log def create_logger(log_path): """ 将日志输出到日志文件和控制台 """ logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s') # 创建一个handler,用于写入日志文件 file_handler = logging.FileHandler( filename=log_path, mode='w') file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) # 创建一个handler,用于将日志输出到控制台 console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(formatter) logger.addHandler(console) return logger logger = create_logger(log_path=log_path) # ocr外部接口 class OcrAgent(): def __init__(self, url): self.url = url self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)' # 不同类型证书资质正则 self.re_dict = { "business_license" : r'营业执照', "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$', "production_license": r'\b[\u4e00-\u9fff]*许可证\b', "qualtifications" : r'\b[\u4e00-\u9fff]*证书', "proof": r'\b[\u4e00-\u9fff]*证明', } # 字迹阈值 self.sign_threshold = 0.05 def get_content(self, image_path): try: with open(image_path, 'rb') as image_file: files = {"file": ("image.jpg", image_file, "image/jpeg")} # files = {"file": ("image.png", image_file, "image/png")} response = requests.post(url, files=files) return response.json() except: raise ValueError(f"传入图像{image_path}已损坏") def remove_red_seal(self, image_path): # 读取图像 input_img = cv2.imread(image_path) # 分离图片的通道 blue_c, green_c, red_c = cv2.split(input_img) #利用大津法自动选择阈值 thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_OTSU) #对阈值进行调整 filter_condition = int(thresh * 1.0) #移除红色的印章 _, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY) # 把图片转回3通道 result_img = np.expand_dims(red_thresh, axis=2) result_img = np.concatenate((result_img, result_img, result_img), axis=-1) return result_img def judge(self, image_path: str, firm_name: str): '''使用正则判断是否属于营业执照或资质证书类型''' # image_prefix = image_path.split('/')[-1][:-4] image_prefix = image_path.split('/')[-1] logger.info(f'processing img: {image_prefix}') # page_number = image_prefix.split('_')[-2] response_item = { "qualtified": None, # 是否为证书 "matched": None, # 是否出现匹配的公司名称 "license_name": None, # 证书名 # "page_number": page_number, # 证书所在页 "start_datetime": None, # 有效起始时间 "end_datetime": None # 有效终止时间 } content = self.get_content(image_path=image_path) image_info = content["rawjson"]["ret"] # 必须包含公司名称信息 if not self.search(image_info=image_info, key=firm_name): return None else: response_item['matched'] = True # 是否匹配营业执照或资质证书 for key, format in self.re_dict.items(): if key == 'business_license': match_name = self.re_match(image_info=image_info, format=format) else: match_name = self.re_search(image_info=image_info, format=format) if match_name and key == 'business_license': response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_license_datetime(image_info=image_info, response_item=response_item) return response_item elif match_name: response_item["qualtified"] = True response_item["license_name"] = match_name response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item) return response_item return response_item # TODO 资质证书有效期定位 def find_certificate_datetime(self, image_info, response_item): # keyword start_keywords = ['颁发日期', '发证日期', '生效日期'] end_keywords = ['终止日期'] priority_keywords = ['有效期', '使用期限', '有效日期'] keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期'] # re format format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?' special_format = r'\d{4}-\d{1,2}-\d{1,2}' # 判断是否存在日期关键字 flag = False keyword_dict = {} for info in image_info: word = info['word'] left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] for keyword in keywords_list: # 该证书存在日期关键字 if keyword in word: flag = True charset_list = info['charset'] for char_dc in charset_list: if char_dc['word'] == keyword[-1]: right = char_dc['rect']['left'] + char_dc['rect']['width'] keyword_dict[keyword] = { "left": left, "top": top, "right": right } if flag: for info in image_info: word = info['word'] if '年' in word or re.search(r'\d', word): left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] if '年' in word: find_list = re.findall(pattern=format, string=word) else: find_list = re.findall(pattern=special_format, string=word) logger.info(f'word {word} has find_list{find_list}') # if self.check: # pdb.set_trace() if len(find_list) == 1: find_string = find_list[0] if '至' in find_string: start_prefix = find_string.split('至')[0].replace('自', '') end_prefix = find_string.split('至')[-1] if '年' in start_prefix: response_item['start_datetime'] = start_prefix if end_prefix != '': response_item['end_datetime'] = end_prefix return response_item # 不存在{至}的情况下通过位置和已有期限关键字来分配日期 else: for k, k_info in keyword_dict.items(): k_left = k_info['left'] k_right = k_info['right'] k_top = k_info['top'] # 捕获关键字 if left == k_left: if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None: response_item['end_datetime'] = find_string elif k in start_keywords and response_item['start_datetime'] is None: response_item['start_datetime'] = find_string break elif left >= k_right and top >= k_top: if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None: response_item['end_datetime'] = find_string elif k in start_keywords and response_item['start_datetime'] is None: response_item['start_datetime'] = find_string elif len(find_list) == 2: start_prefix = find_list[0].replace('自', '') end_prefix = find_list[-1].replace('至', '') if response_item['start_datetime'] is None: response_item['start_datetime'] = start_prefix if response_item['end_datetime'] is None: response_item['end_datetime'] = end_prefix else: logger.info(f'wrong word: {word} ...') else: continue return response_item # 找到营业执照中id与date信息 def find_license_datetime(self, image_info, response_item): for info in image_info: word = info['word'] # id if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20): response_item['id'] = word if word.isdigit() else word[5:] elif bool(re.match(self.datetime_re, word)): split = word.split('至') start_datetime = split[0] end_datetime = split[-1] response_item['start_datetime'] = start_datetime response_item['end_datetime'] = end_datetime elif word == '长期': response_item['start_datetime'] = response_item['end_datetime'] = '长期' return response_item # 在image_info中搜寻word中包含key的内容 def search(self, image_info, key): for info in image_info: word = info['word'] if key in word: return True return False # 在image_info中使用re.search搜寻满足{format}正则的信息 def re_search(self, image_info, format): for info in image_info: word = info['word'] match = re.search(format, word) if match: return match.group(0) return False # 在image_info中使用re.match搜寻满足{format}正则的信息 def re_match(self, image_info, format): for info in image_info: word = info['word'] match = re.match(format, word) if match: return word return False # 用于识别固定位置是否有公司法人签名 def signature_recognition(self, image_path: str): keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:'] key_pos = {} image_prefix = image_path.split('/')[0] image_name = image_path.split('/')[-1][:-4] removed_image_name = image_name + '_roi' + image_path.split('/')[-1][-4:] ink_image_name = image_name + '_ink' + image_path.split('/')[-1][-4:] removed_image_path = os.path.join(image_prefix, removed_image_name) ink_image_path = os.path.join(image_prefix, ink_image_name) if not os.path.exists(removed_image_path): removed_seal_img = self.remove_red_seal(image_path=image_path) cv2.imwrite(removed_image_name, removed_seal_img) content = self.get_content(image_path=removed_image_path) image_info = content["rawjson"]["ret"] for info in image_info: word = info['word'] left = info['rect']['left'] top = info['rect']['top'] width = info['rect']['width'] height = info['rect']['height'] right = left + width bottom = top + height for keyword in keywords: if keyword in word: key_pos[keyword] = { "word": word, "left": left, "right": right, "top": top, "bottom": bottom } break # 如果不存在"投标函"、"法定代表人"等关键字,则返回False if len(key_pos) == 0: return False # 定位到法定代表人所在位置 if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \ ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)): if key_pos.get('法定代表人或其委托代理人:') is not None: l_info = key_pos['法定代表人或其委托代理人:'] l_cnt = 13 l_string = '法定代表人或其委托代理人:' else: l_info = key_pos['法定代表人:'] l_cnt = 6 l_string = '法定代表人:' if key_pos.get('(法定代表人CA电子印章)') is not None: r_info = key_pos['(法定代表人CA电子印章)'] r_string = '(法定代表人CA电子印章)' elif key_pos.get('(法定代表人CA电子印章或签字)') is not None: r_info = key_pos['(法定代表人CA电子印章或签字)'] r_string = '(法定代表人CA电子印章或签字)' else: r_info = key_pos['(签字)'] r_string = '(签字)' # 此时签名应在两者之间 l = l_info['right'] l_word = l_info['word'] r = r_info['left'] r_word = r_info['word'] t = max(l_info['top'], r_info['top']) b = min(l_info['bottom'], r_info['bottom']) - 5 if l_word[-l_cnt:] != l_string or r_word != r_string: return True else: black_ratio = self.ink_recognition( input_img=removed_seal_img, out_path=ink_image_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None): # 此时签名应已包含 if key_pos.get('(法定代表人CA电子印章)') is not None: key = key_pos['(法定代表人CA电子印章)'] elif key_pos.get('(法定代表人CA电子印章或签字)') is not None: key = key_pos['(法定代表人CA电子印章或签字)'] elif key_pos.get('(签字)') is not None: key = key_pos['(签字)'] key_word = key['word'] key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '') if key_word != '': return True return False elif key_pos.get('法定代表人:') is not None: # 此时签名在右边或已包含 word = key_pos['法定代表人:']['word'] l = key_pos['法定代表人:']['left'] r = l + 100 t = key_pos['法定代表人:']['top'] b = key_pos['法定代表人:']['bottom'] - 5 if word[-6:] != '法定代表人:': return True else: black_ratio = self.ink_recognition( input_img=removed_seal_img, out_path=ink_image_path, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False elif key_pos.get('法定代表人或其委托代理人:') is not None: # 此时签名在右边或已包含 word = key_pos['法定代表人或其委托代理人:']['word'] l = key_pos['法定代表人或其委托代理人:']['left'] r = l + 100 t = key_pos['法定代表人或其委托代理人:']['top'] b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5 if word[-13:] != '法定代表人或其委托代理人:': return True else: black_ratio = self.ink_recognition( input_img=removed_seal_img, meta={ "left": l, "right": r, "top": t, "bottom": b } ) if black_ratio >= self.sign_threshold: return True return False else: return False # 用于判断固定位置的长方形框内是否存在签名字迹 def ink_recognition(self, input_img, out_path, meta: dict): left = meta["left"] right = meta["right"] top = meta["top"] bottom = meta["bottom"] crop_img = input_img[top:bottom, left:right, :] cv2.imwrite(out_path, crop_img) gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY) thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU) filter_condition = int(thresh * 0.90) _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV) total_pixels = black_thresh.size black_pixels = np.count_nonzero(black_thresh) black_ratio = black_pixels / total_pixels return black_ratio # seal ocr外部接口,用于提取页面中的印章信息 # 集成签名判断函数 class seal_agent(): def __init__(self, base_url: str, access_token: str, headers: dict, ): self.base_url = base_url self.access_token = access_token self.headers = headers self.request_url = base_url + access_token def seal_recognition(self, img_path): f = open(img_path, 'rb') img = base64.b64encode(f.read()) params = {"image":img} response = requests.post(self.request_url, data=params, headers=self.headers) if response: data = response.json() else: data = {} return data class PdfMatcher(PdfExtractAttr): '''pdf匹配''' def __init__(self, file_path: str): super(PdfMatcher, self).__init__( file_path=file_path ) # 投标书名称 self.bid_name = file_path.split('/')[-1][:-4] # 投标书数据文件夹 self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name) # 公司名称 self.firm_name = file_path.split('/')[-2] # title list title_path = os.path.join(self.bid_dir, "title.json") self.title = load_json(title_path) # outline list outline_path = os.path.join(self.bid_dir, "outlines.json") self.outline = self.parse_outline(out_path=outline_path) # text list text_path = os.path.join(self.bid_dir, "all_texts.json") self.details = self.parse_text(out_path=text_path) # table list table_path = os.path.join(self.bid_dir, "all_tables.json") if os.path.exists(table_path): self.table = load_json(table_path) else: self.tables = self.parse_table(out_path=table_path) # image list self.image_dir = os.path.join(self.bid_dir, "extracted_images") # image format self.image_format = "image_page_{}*" # image filter threshold self.start_threshold = 10 self.distance_threshold = 6 self.search_threshold = 20 self.match_threshold = 44.0 self.degrade_threshold = 42.0 def search_interval(self): '''定位营业执照、资质证书的区间范围''' # 通过关键字模糊定位 keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件'] search_interval = [] # locate in title.json left_pos = -1 # 左指针 right_pos = -1 # 右指针 for title_block in self.title: block_text = title_block['text'].replace(' ', '').strip() # 先进行左区间判定 if left_pos != -1 and '证书' not in block_text: right_pos = title_block['page_number'] search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 for keyword in keywords: if keyword in block_text: # 先进行模糊的outline定位 center_page = None if '.' in block_text: center_page = block_text.split('.')[-1] if center_page.isdigit(): center_page = eval(center_page) left_pos = min(title_block['page_number'], center_page) else: left_pos = title_block['page_number'] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 right_pos = -1 # locate in outlines.json if len(self.outline) > 0: for outline_block in self.outline: if left_pos != -1: right_pos = outline_block["page_number"] right_pos = right_pos if right_pos is not None else -1 search_interval.append((left_pos, right_pos)) left_pos = -1 outline_text = outline_block['title'].strip() for keyword in keywords: if keyword in outline_text: if outline_block["page_number"] is not None: left_pos = outline_block["page_number"] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 搜寻区间合并 search_interval.sort() merge_interval = [] if len(search_interval) > 0: left = -1 right = -1 for interval in search_interval: l, r = interval if r < l: continue if left == -1 and right == -1: left = l right = r elif l <= right: right = r else: merge_interval.append((left, right)) left = l right = r merge_interval.append((left, right)) return merge_interval def find_candidate_images(self): candidate_images = set() merge_intervals = self.search_interval() for interval in merge_intervals: start_page, end_page = interval if start_page <= self.start_threshold: continue if end_page == -1: end_page = start_page + 20 candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images) candidate_images = list(candidate_images) return candidate_images # 定位营业执照图像 def locate_business_license(self): '''locate business license and return image''' keywords = ["资格审查资料", "其它资格审查材料", "资格审查材料"] candidate_pages = [] center_pages = [] candidate_images = set() # locate in title.json for title_block in self.title: block_text = title_block['text'].replace(' ', '').strip() for keyword in keywords: if keyword in block_text: # 先进行模糊的outline定位 center_page = None if '.' in block_text: center_page = block_text.split('.')[-1] if center_page.isdigit(): center_page = eval(center_page) center_pages.append(center_page) candidate_pages.append(title_block['page_number']) # locate in outlines.json if len(self.outline) > 0: for outline_block in self.outline: outline_text = outline_block['title'].strip() for keyword in keywords: if keyword in outline_text: center_pages.append(outline_block["page_number"]) # information match filter_pages = set() if len(center_pages) == 0 and len(candidate_pages) == 0: return None elif len(center_pages) == 0: filter_pages.update(candidate_pages) elif len(candidate_pages) == 0: filter_pages.update(center_pages) else: # center_pages作为锚点,全部加入 filter_pages.update(center_pages) # candidate_page与center_page进行匹配加入 for candidate_page in candidate_pages: if candidate_page <= self.start_threshold: continue for center_page in center_pages: distance = abs(candidate_page - center_page) if distance <= self.distance_threshold: filter_pages.add(min(candidate_page, center_page) + distance // 2) # 得到筛选后的图片集存储于self.candidate_images for filter_page in filter_pages: # candidate_images = self.image_regularization(candidate_images=candidate_images, start_page=max(filter_page-self.search_threshold, 0), end_page=filter_page+self.search_threshold) candidate_images = self.image_regularization(start_page=max(filter_page-self.search_threshold, 0), end_page=filter_page+self.search_threshold, candidate_images=candidate_images) # 获取最终图像的地址 candidate_images = list(candidate_images) target_list = self.exact_match(candidate_images=candidate_images) # return target_path list return target_list # 定位资质证书 def locate_qualtification_certificate(self): '''返回资质证书的图像列表''' # 通过关键字模糊定位 keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','影印件'] search_interval = [] candidate_images = set() # locate in title.json left_pos = -1 # 左指针 right_pos = -1 # 右指针 for title_block in self.title: block_text = title_block['text'].replace(' ', '').strip() # 先进行左区间判定 if left_pos != -1 and '证书' not in block_text: right_pos = title_block['page_number'] search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 for keyword in keywords: if keyword in block_text: # 先进行模糊的outline定位 center_page = None if '.' in block_text: center_page = block_text.split('.')[-1] if center_page.isdigit(): center_page = eval(center_page) left_pos = min(title_block['page_number'], center_page) else: left_pos = title_block['page_number'] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 重置 left_pos = -1 right_pos = -1 # locate in outlines.json if len(self.outline) > 0: for outline_block in self.outline: if left_pos != -1: right_pos = outline_block["page_number"] right_pos = right_pos if right_pos is not None else -1 search_interval.append((left_pos, right_pos)) left_pos = -1 outline_text = outline_block['title'].strip() for keyword in keywords: if keyword in outline_text: if outline_block["page_number"] is not None: left_pos = outline_block["page_number"] # 最终判定 if left_pos != -1: search_interval.append((left_pos, right_pos)) # 搜寻区间合并 search_interval.sort() merge_interval = [] if len(search_interval) > 0: left = -1 right = -1 for interval in search_interval: l, r = interval if r < l: continue if left == -1 and right == -1: left = l right = r elif l <= right: right = r else: merge_interval.append((left, right)) left = l right = r merge_interval.append((left, right)) for interval in merge_interval: start_page, end_page = interval if end_page == -1: end_page = start_page + 20 if start_page <= self.start_threshold: continue candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images) candidate_images = list(candidate_images) target_list = self.search_qualtification_certificate(candidate_images=candidate_images) return target_list # 查询符合格式的图像 def image_regularization(self, start_page: int, end_page:int, candidate_images: set): for index in range(start_page, end_page + 1): current_format = self.image_format.format(index) files = glob.glob(os.path.join(self.image_dir, current_format)) # cut_files = list(map(lambda x: x.split('/')[-1], files)) # filter_files = [file for file in cut_files if not file.endswith('.unk')] filter_files = [file for file in files if not file.endswith('.unk')] candidate_images.update(filter_files) return candidate_images def exact_match(self, candidate_images: list): '''精确匹配营业执照位置''' if len(candidate_images) == 0: return None target_list = [] sim_list = [] for image_path in candidate_images: score = self.get_similarity(image_path=image_path, tamplate=self.bl_tamplate) sim_list.append(score.cpu().numpy()) # top-k > match_threshold sim_list = np.array(sim_list).reshape(len(sim_list)) for i, cos_sim in enumerate(sim_list): if cos_sim > self.match_threshold: target_list.append(candidate_images[i]) # 未找寻到符合当前阈值要求的图像,降低阈值 if len(target_list) == 0: for i, cos_sim in enumerate(sim_list): if cos_sim > self.degrade_threshold: target_list.append(candidate_images[i]) return target_list def search_qualtification_certificate(self, candidate_images: list): '''从candidate images中搜寻是否有符合资质证书的图像''' if len(candidate_images) == 0: return None target_list = [] sim_list = [] for image_path in candidate_images: score = self.get_similarity(image_path=image_path, tamplate=self.qc_tamplate) sim_list.append(score.cpu().numpy()) sim_list = np.array(sim_list).reshape(len(sim_list)) for i, cos_sim in enumerate(sim_list): if cos_sim > self.qc_threshold: target_list.append(candidate_images[i]) return target_list def get_similarity(self, image_path, tamplate): image = preprocess(Image.open(image_path)).unsqueeze(0).to(device) text = clip.tokenize([tamplate]).to(device) with torch.no_grad(): logits_per_image, logits_per_text = model.get_similarity(image, text) return logits_per_image if __name__ == '__main__': start_time = time.time() url = "http://120.48.103.13:18000/ctr_ocr" base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token=" access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608" headers = {'content-type': 'application/x-www-form-urlencoded'} data_path = "/home/stf/miner_pdf/data/投标公司pdf" out_path = "/home/stf/miner_pdf/test.json" ground_truth = "/home/stf/miner_pdf/ground_truth.json" firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx" df = pd.read_excel(firm_excel_file) ocr = OcrAgent(url=url) seal_ocr = seal_agent(base_url=base_url, access_token=access_token, headers=headers) unscanned_firm_list = df[(df['是否为扫描件'] == '否')]['公司名称'].tolist() scanned_firm_list = df[(df['是否为扫描件'] == '是')]['公司名称'].tolist() all_firm_list = unscanned_firm_list + scanned_firm_list data = {} start_time = time.time() # 以下为提取pdf中标题文本 for firm_name in all_firm_list: firm_path = os.path.join(data_path, firm_name) # 在firm_path下创建新文件 for bid_name in tqdm(os.listdir(firm_path)): if bid_name.endswith('.pdf'): # 获取bid_dictionary bid_dir = os.path.join(firm_path, bid_name[:-4]) os.makedirs(bid_dir, exist_ok=True) # 获取bid_dictionary / image_dir image_dir = os.path.join(bid_dir, 'extracted_images') os.makedirs(image_dir, exist_ok=True) document = os.path.join(firm_path, bid_name) logger.info(f'processing document {document} ...') # 提取文档标题存入title_path ''' title_path = os.path.join(bid_dir, 'title.json') main_parse(pdf_path=document, title_path=title_path, image_dir=image_dir) extractor = PdfExtractAttr(file_path=document) extractor.parse_outline(out_path=os.path.join(bid_dir, 'outlines.json')) extractor.parse_text(out_path=os.path.join(bid_dir, 'all_texts.json')) extractor.parse_table(out_path=os.path.join(bid_dir, 'all_tables.json')) ''' # 以下为提取scanned firm的pdf中营业执照、资质证书等信息 # 已经失效 ''' for firm_name in scanned_firm_list: data[firm_name] = {'license_list':[]} firm_path = os.path.join(data_path, firm_name) # 在firm_path下创建新文件 for bid_name in tqdm(os.listdir(firm_path)): if bid_name.endswith('.pdf'): # 获取img_dictionary image_dir = os.path.join(firm_path, 'scanned') document = os.path.join(firm_path, bid_name) logger.info(f'processing document {document} ...') # 统计该pdf文件共转换为多少张扫描图像 total_img = len(os.listdir(image_dir)) logger.info(f'当前文档共扫描出{total_img}张图像 ...') for img in os.listdir(image_dir): img_path = os.path.join(image_dir, img) try: response = ocr.judge(image_path=img_path, firm_name=firm_name) if response == None or response['qualtified'] == None: # logger.info(json.dumps(response, indent=4, ensure_ascii=False)) continue else: data[firm_name]["license_list"].append({ "license_name": response["license_name"], "license_path": img, # "license_page": response["page_number"], "start_datetime": response["start_datetime"], "end_datetime": response["end_datetime"] }) except ValueError as e: logger.info(e) with open(out_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) ''' # 以下原本为提取非扫描件公司的营业执照、资质证书信息 # 现面向所有公司 # for firm_name in unscanned_firm_list: for firm_name in all_firm_list: firm_path = os.path.join(data_path, firm_name) # 在firm_path下创建新文件 for bid_name in tqdm(os.listdir(firm_path)): if bid_name.endswith('.pdf'): # 获取bid_dictionary bid_dir = os.path.join(firm_path, bid_name[:-4]) os.makedirs(bid_dir, exist_ok=True) # 获取bid_dictionary / image_dir image_dir = os.path.join(bid_dir, 'extracted_images') os.makedirs(image_dir, exist_ok=True) document = os.path.join(firm_path, bid_name) logger.info(f'processing document {document} ...') agent = PdfMatcher(document) data[firm_name] = {"license_list":[]} candidate_images = agent.find_candidate_images() if len(candidate_images) == 0: logger.info(f'current firm: {firm_name} is unqualtified ...') else: for img in candidate_images: try: response = ocr.judge(image_path=img, firm_name=firm_name) if response == None or response['qualtified'] == None: # logger.info(json.dumps(response, indent=4, ensure_ascii=False)) continue else: data[firm_name]["license_list"].append({ "license_name": response["license_name"], "license_path": img, "license_page": response["page_number"], "start_datetime": response["start_datetime"], "end_datetime": response["end_datetime"] }) except ValueError as e: logger.info(e) ''' result = agent.locate_business_license() media_time = time.time() logger.info(f'search cost time: {media_time - start_time}') if result is None: logger.info(f'current firm {firm_name} detects None ...') else: for tgt_path in result: response = ocr.get_license_info(image_path=tgt_path, firm_name=firm_name) #TODO 新旧营业执照的逻辑处理 if (response['qualtified'] == True) and (response['matched'] == True): break logger.info(f'detect cost time: {time.time() - media_time}') # 提取图片存入image_dir # 提取文档标题存入title_path # title_path = os.path.join(bid_dir, 'title.json') # main_parse(pdf_path=document, title_path=title_path, image_dir=image_dir) # extractor = PdfExtractAttr(file_path=document) # extractor.parse_outline(out_path=os.path.join(bid_dir, 'outlines.json')) # extractor.parse_text(out_path=os.path.join(bid_dir, 'all_texts.json')) # extractor.parse_table(out_path=os.path.join(bid_dir, 'all_tables.json')) # df.to_excel(firm_excel_file, index=False) ''' # 以下将data的数据存入out_path with open(out_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) # 以下根据ground-truth进行精度测试 business_lic_num = 0 certificate_lic_num = 0 business_lic_cnt = 0 certificate_lic_cnt = 0 true_data = load_json(ground_truth) for firm_name in true_data: y_data = true_data[firm_name]['license_list'] pred_data = data[firm_name]['license_list'] for lic in y_data: if lic['license_name'] == '营业执照': business_lic_num += 1 for i in range(len(pred_data)): if pred_data[i]['license_name'] == '营业执照' and pred_data[i]['license_path'] == lic['license_path']: business_lic_cnt += 1 elif lic['license_name'] == '资质证书': certificate_lic_num += 1 for i in range(len(pred_data)): if pred_data[i]['license_path'] == lic['license_path']: certificate_lic_cnt += 1 else: logger.info('wrong truth data of {}'.format(lic['license_path'])) logger.info(f'营业执照识别准确率为:{business_lic_cnt / business_lic_num}') logger.info(f'资质证书识别率为:{certificate_lic_cnt / certificate_lic_num}') delta = time.time() - start_time logger.info(f"total unscanned document {len(unscanned_firm_list)} cost time: {delta // 60} min {delta % 60} sec ...")